Message Flows
tofu is async. Understanding the message flow helps you write correct code.
The Two Queues
tofu uses two internal queues:
Your Code tofu Engine Network
│ │ │
├──post()──► [Send Queue] ─┼──► socket.write() ──────────►
│ │ │
◄──waitReceive()── [Recv Queue] ◄── socket.read() ◄────────┤
│ │ │
- Send Queue: Messages waiting to be sent
- Recv Queue: Messages received and waiting for you
post() Is Not send()
post() puts a message in the send queue. It returns immediately.
const bhdr = try chnls.post(&msg);
// Message is in queue. Not sent yet.
// bhdr contains the assigned channel and message_id
What post() returns
The BinaryHeader with assigned values:
channel_number- assigned channel (for HelloRequest/WelcomeRequest)message_id- assigned ID (if you left it at 0)
The actual send happens on tofu's internal thread. You find out the result via waitReceive().
Async Completion
Every operation completes asynchronously via waitReceive().
WelcomeRequest flow
You tofu OS
│ │ │
├─post(WelcomeRequest)──────►│ │
│ returns channel_number │ │
│ ├──create socket────────────►│
│ ├──bind()───────────────────►│
│ ├──listen()─────────────────►│
│ │◄──────────────────success──┤
│◄──waitReceive()────────────┤ │
│ WelcomeResponse │ │
│ status=0 (success) │ │
HelloRequest flow
Client tofu Server
│ │ │
├─post(HelloRequest)────────►│ │
│ returns channel_number │ │
│ ├──connect()────────────────►│
│ ├──send(HelloRequest)───────►│
│ │◄──────────HelloRequest─────┤
│ │ ├─waitReceive()
│ │◄──────────HelloResponse────┤
│◄──waitReceive()────────────┤ │
│ HelloResponse │ │
Failure Paths
When something fails, you get the original message back with error status.
Connection failure
var helloReq: ?*Message = try ampe.get(.always);
try addr.format(helloReq.?);
_ = try chnls.post(&helloReq);
var resp: ?*Message = try chnls.waitReceive(timeout);
defer ampe.put(&resp);
if (resp.?.isFromEngine()) {
// Connection failed - this is your HelloRequest returned
const sts = tofu.status.raw_to_status(resp.?.bhdr.status);
// sts could be: connect_failed, invalid_address, etc.
}
Check origin first
origin == application→ message from peerorigin == engine→ status notification from tofu
Send failure
If the network fails after connection, you get your message back:
_ = try chnls.post(&request);
var resp: ?*Message = try chnls.waitReceive(timeout);
if (resp.?.isFromEngine() and resp.?.bhdr.status != 0) {
// This is your request, returned with error
const sts = tofu.status.raw_to_status(resp.?.bhdr.status);
// sts could be: send_failed, channel_closed, peer_disconnected
}
Message Ownership
Messages move between you and tofu.
After post()
var msg: ?*Message = try ampe.get(.always);
defer ampe.put(&msg); // Safe - put() handles null
// ... fill message ...
_ = try chnls.post(&msg);
// msg is now null - tofu owns it
// defer will call put(null) which is a no-op
After waitReceive()
var resp: ?*Message = try chnls.waitReceive(timeout);
defer ampe.put(&resp); // You must return it
// You own the message until you put() it
defer handles both cases
Always use defer ampe.put(&msg) right after get().
It works whether you post (msg becomes null) or keep the message.
Pool Empty Notification
When the pool runs low, tofu sends a signal:
var msg: ?*Message = try chnls.waitReceive(timeout);
defer ampe.put(&msg);
if (msg.?.isFromEngine()) {
const sts = tofu.status.raw_to_status(msg.?.bhdr.status);
if (sts == .pool_empty) {
// Pool needs more messages
// The signal message itself goes back to pool via defer
}
}
NAQ: What should I do when pool is empty?
Return messages to pool faster. The pool_empty signal message itself
goes back to pool when you put() it, which helps immediately.
Channel Closed Notification
When a channel closes (by you or peer), tofu notifies:
if (msg.?.isFromEngine()) {
const sts = tofu.status.raw_to_status(msg.?.bhdr.status);
if (sts == .channel_closed) {
const closed_ch = msg.?.bhdr.channel_number;
// Remove this channel from your tracking
}
}
You get channel_closed for:
- ByeRequest/ByeResponse completion
- ByeSignal
- Peer disconnect
- Network failure
Timeout Handling
waitReceive() can timeout:
var msg: ?*Message = try chnls.waitReceive(5 * tofu.waitReceive_SEC_TIMEOUT);
if (msg == null) {
// Timeout - nothing received within 5 seconds
}
Constants:
waitReceive_INFINITE_TIMEOUT- wait foreverwaitReceive_SEC_TIMEOUT- 1 second- Multiply for longer timeouts
Main Loop Pattern
A typical receive loop:
while (running) {
var msg: ?*Message = try chnls.waitReceive(tofu.waitReceive_SEC_TIMEOUT);
if (msg == null) {
// Timeout - do housekeeping, check shutdown flag
continue;
}
defer ampe.put(&msg);
// Check origin first
if (msg.?.isFromEngine()) {
const sts = tofu.status.raw_to_status(msg.?.bhdr.status);
switch (sts) {
.pool_empty => continue,
.channel_closed => {
removeChannel(msg.?.bhdr.channel_number);
continue;
},
else => {
handleError(msg);
continue;
},
}
}
// Application message from peer
switch (msg.?.bhdr.proto.opCode) {
.HelloRequest => handleNewConnection(msg, chnls),
.Request => handleRequest(msg, chnls),
.Signal => handleSignal(msg),
.ByeRequest => handleClose(msg, chnls),
else => {},
}
}
Summary
| What you do | What happens |
|---|---|
post(&msg) |
Message goes to send queue, msg becomes null |
waitReceive() |
Returns next message from recv queue (or null on timeout) |
| Success | You get the expected response from peer |
| Failure | You get your message back with error status, origin=engine |