Skip to content

Ampe

Ampe or engine is the "holder" (owner) and allocator of all tofu resources

  • ChannelGroups
  • Messages

Consider it the GPA of tofu.


Ampe creation

Example of Ampe creation
pub fn createDestroyAmpe(gpa: Allocator) !void {
    // Create engine implementation object
    const rtr: *Reactor = try Reactor.Create(gpa, DefaultOptions);

    // Destroy it after return or on error
    defer rtr.*.Destroy();

    // Create ampe interface
    const ampe: Ampe = try rtr.*.ampe();

    _ = ampe;

    // No need to destroy ampe itself.
    // It is an interface provided by Reactor.
    // It will be destroyed via  rtr.*.Destroy().
}

where

Note

You can create multiple engines per process.


Interface

Ampe is represented by the following interface:

Brief version of the Interface
pub const Ampe = struct {
    pub fn create(ampe: Ampe) status.AmpeError!ChannelGroup {...}
    pub fn destroy(ampe: Ampe, chnls: ChannelGroup) status.AmpeError!void {...}

    pub fn get(ampe: Ampe, strategy: AllocationStrategy) status.AmpeError!?*message.Message {...}
    pub fn put(ampe: Ampe, msg: *?*message.Message) void {...}

    pub fn getAllocator(ampe: Ampe) Allocator {...}

Just a reminder: all methods are thread-safe.

The first two methods, create/destroy, manage a ChannelGroup. You don't need to know what that is yet; just make a note of it.

The next two methods require additional explanation, so let's move on to the Message Pool.


Message Pool

Ampe supports a Message Pool mechanism to improve system performance.

The get operation retrieves an existing message from the pool or creates a new one. The choice is determined by the strategy: AllocationStrategy parameter:

pub const AllocationStrategy = enum {
    poolOnly, // Tries to get a message from the pool. Returns null if the pool is empty.
    always,   // Gets a message from the pool or creates a new one if the pool is empty.
};

null isn't error

Returned by get null is absolutely valid value, null returned if the pool is empty and the strategy is poolOnly.

get returns error if

  • allocation failed
  • engine performs shutdown

Opposite put operation returns message to the pool and sets it's value to null. If engine performs shutdown or pool is full, message will be destroyed, means all allocated memory silently will be released.

    var msg: ?*Message = try ampe.get(tofu.AllocationStrategy.poolOnly);
    defer ampe.put(&msg);

Because null returned by get is valid value , it's also valid value for put: if msg == null, put does nothing.

NAQ: *?*message.Message - WTH???

*?* (address of optional pointer) idiom allows to prevent reusing of released or moved to other thread objects(structs). In our case - Messages.

ampe.put(&msg):

  • returns msg to the pool
  • set msg to null

As result:

  • every further put will be successful
  • every further attempt to use msg without check will fail

You will see usage of *?* in different places during our journey.

Pool configuration

Pool configuration is determined by

pub const Options = struct {
    initialPoolMsgs: ?u16 = null,
    maxPoolMsgs: ?u16 = null,
};
initialPoolMsgs - is the number of messages in the pool created during initialization of engine

maxPoolMsgs - is the maximal number of the messages

Do you remember ?

If ... pool is full, message will be destroyed

means if number of the messages in the pool == maxPoolMsgs, message will be destroyed.

Tofu provides default pool configuration:

Just example of configuration, it isn't recommendation
    pub const DefaultOptions: Options = .{
        .initialPoolMsgs = 16,
        .maxPoolMsgs = 64,
    };

Pool configuration is used during creation of engine:

    // Create engine implementation object with default pool configuration 
    var rtr: *Reactor = try Reactor.Create(gpa, DefaultOptions);

Just clarification - you don't deal with pool destroy, it will be destroyed during destroy of engine.