Introduction

This is the user guide for the Thespis actor library. The actor model is an entire programming paradigm. Thus if you have no experience with it, there is some ground to cover before becoming fluid in designing software with this model. This guide will provide you with the following information:

  1. Some references to good information about the actor model itself
  2. A detailed tour of both the features and the design choices of Thespis.
  3. A set of design patterns that can help you solve common problems with the actor model and more specifically with Thespis.

What is the actor model?

The actor model is a program design model tailored to asynchronous concurrent processing. It was first conceived in 1973 by Carl Hewitt. Wikipedia has a thorough introduction.

To simplify, the actor model is an object oriented model which encapsulates not only data but also behavior. That is an actor is an object which can hold local mutable state, but other actors cannot access the state, neither call methods on the actor. The only way to interact with it is by sending it a message.

This is the key advantage of the actor model. Since no direct method calls take place, there is no shared memory access. There is no need for locking as only the actor itself will be able to access it's state. The actor processes one message at a time. Thus during message processing it is always guaranteed to be the only piece of the program accessing it's state and no synchronization is needed.

You might have found out that people struggle to use the OOP model in Rust. Rust doesn't allow you to cheat when it comes to memory access. The compiler is very strict and it turns out its hard to write traditional OOP with a strict compiler. When you add concurrency and multi-threading to the mix it becomes downright impractical. The actor model solves exactly this problem as memory synchronization is simple eliminated from the equation.

For the rest of this book, it is assumed that you have some basic understanding of how the actor model works. If this is new for you, please check the following resources:

The Thespis API is inpired by actix. If you are already familiar with actix, it will be easy to pick up thespis.

What is thespis?

When working on asynchronous projects in Rust I discovered the actor model thanks to actix. I immediately found the model a very good fit for both Rust and asynchronous design. I started to implement software using the actix library and soon ran into a number of issues:

  • it was in futures 0.1 (has been updated by now)
  • doesn't work on wasm
  • documentation was lackluster
  • code base big and complicated
  • didn't have remote actors
  • my reasoning didn't seem to match with the actix lead developer

I tried briefly to evaluate what it would take to implement the features I wanted in actix, but it turned out to be complicated. Thus I set off rewriting from scratch. It has taken a long time, but I think the result is still relevant and worth it.

In the mean time, so many actor libraries have popped up that I have lost track of what each one exactly offers, you'll have to do your own research. Here is a list: lib.rs/search?q=actor.

Features

  • Interfaces: Interface and implementation are separate. The behavior of an actor is described in the thespis crate with traits. thespis_impl has a reference implementation. However, libraries can expose an actor API without having to pull in an implementation, just by implementing the traits. Clients can choose their implementation and aren't obliged to use the reference implementation.
  • works on Wasm: Note that async rust on embedded is still in it's infancy and I have not looked into it specifically, so no promises there for now. Thespis also uses boxing extensively, which might be a problem on #[no_std].
  • minimal API design As an example compared to actix, there is no MessageResponse trait, no Context object, no ActorFuture, no DNS resolver, no System nor Arbiter, no ActorStream, ... yet it does about everything you can do with actix and more. The code base (without remote actors) is about 1.2k SLOC.
  • abstractions: The address in thespis implements the Sink trait, so you can easily chain it with streams. Make an actor observable with pharos and you have a pub-sub pattern. You can plug in the channel of your choice between the address and the Mailbox (Mailbox only requires you give it a Stream). You can even have different channels for different addresses to the same Mailbox. You can use combinators on the receiving end to have different priorities depending on the address.
  • full control: you can interact at a low level with the address and the mailbox of an actor, you can change their implementation for your own as long as they implement the required traits.
  • remote actors: thespis_remote is a library that allows you to communicate with other processes and send messages to a remote process as if it was an actor in the local application. As far as I can tell, kay is the only other Rust actor library that has remote actors.
  • supervision: thespis_impl supports supervision without even needing a Supervisor type.
  • performant: I need to run some more tests, but we seem to be about twice as fast as actix in most scenarios. There is one notable exception. Under contention, that is one receiver with many senders on different threads, actix is about 3 times faster. Somehow they spread more work on the sender threads, and in this case the receiver thread is the bottleneck, so that works really well.
  • executor agnostic
  • well tested and documented

In thespis, interface and implementation are 2 different libraries. The idea is that the interface defines the contract of what an actor is in terms of the Rust type system, but that you can swap out the implementation with something else yet remain compatible with other code that uses actors. You could for example write an implementation which uses actix under the hood if you wanted to create interop.

It also means that a library that wants to expose an actor API does not need to depend on the implementation, just the interface to implement the appropriate traits.

What's missing

  • skipping messages: The actor doesn't have access to the queue of messages. It can't observe them and decide what to process next. You could of course implement this by having a channel that allows for inspection of what's in it's buffer, but the Mailbox of thespis_impl does not deal with this for you.

Pitfalls Checklist

The actor model is a very convenient programming paradigm. It automatically solves many of the harder problems in concurrent software design. However, there are a few pitfalls, so here is a check list of things the compiler won't catch for us:

1.Deadlocks

The original actor model is very simple. If you want to have a request-response type message, you had to send your own address along with the message so the recipient knows how to respond to you. Thanks to Rust's futures, we can easily create a request-response and thespis makes this convenient for you with Address::call. However, if you await a response whilst processing a message, a deadlock can arise.

As actors only process one message at a time. If you have a cyclic dependency, your program can deadlock. That is if actor A depends on actor B to to process it's message, but in order to process A's request B also needs to call back A, both actors will deadlock, as they are waiting on each other and will no longer process any messages ever after.

You can run into this problem through intermediate actors, so A and B might not be calling each other directly.

You can also run into this problem with a single actor when using bounded channels. If the mailbox of the actor is currently full and it sends itself a message, it will deadlock. You should generally spawn such a send to avoid blocking processing of the current message.

Another deadlock issue can arise when an actor is the gatekeeper of a connection (to the network or other components). That is if the mailbox of one actor accumulates both incoming and outgoing messages. If incoming messages saturate the system, and fill up the gatekeeper's mailbox, no responses can flow out. Hence no place is made for new incoming messages and everything blocks. You might want to read up on this issue in this article which has some nice diagrams to explain the problem and on the trio forum. This problem can be solved by not using the channels for backpressure, or by using a priority channel. Thespis_impl has an example demonstrating this last solution. Note that by using a priority channel, there is no need for unbounded channels.

2. Memory consumption

The actor model has highly concurrent message passing. If many messages are in flight, they will all consume memory. The most obvious risk is having unbounded channels to communicate between address and mailbox. If there is no back pressure, the unbounded channel can use unbounded amounts of memory.

But even with bounded channels you want to keep an eye on memory consumption. If you spawn large volumes of actors, you probably want to use a channel that doesn't allocate it's full capacity. Channels based on linked lists rather than array buffers could potentially reduce a lot of memory consumption.

3. Performance

The main downside of the actor model is a performance overhead compared to lower level approaches. Message passing has some price and depending on your application it might be a poor fit. If you have hotspots in your code, and you need to do a lot of small operations, like calling getters on other objects, the overhead of message passing might be undesirable.

Remember that the purpose of this model is to have an actor for everything that you want to run concurrently. However, you can have a module for which an actor is the facade to the outside world, but internally uses a synchronous logic. You do however want to reach a critical threshold of actors. If you have CPU cores x 4 components that run concurrently and that normally are rarely blocked, eg. most of them will be able to make progress most of the time, adding more concurrency to your program might not be the most useful for performance.

That being said, you shouldn't worry about it to much. Rust is a very performant language. Even with the overhead of message passing, it will be much faster than most other languages, and remember how much useful software has been written in those languages. In 99% of the time performance shouldn't be an issue.

As always, first write your software with clean design, then benchmark to see if there is a problem. If there is, profile to see what's your bottleneck.

4. Blocking

We all know that async code shouldn't block. That is we shouldn't block the thread with either to much CPU intensive work, or by waiting on things like locks. That's because other tasks might also be working on the same thread.

However it's important to also think about blocking the task, even if you aren't blocking the thread. An actor that thespis::Address::calls another actor will not continue processing messages until that other actor sends back a response. Given that that actor might have a backlog of messages in it's queue, and it might itself wait for other actors whilst processing those, things could severely slow down. In practice this is not a problem if you have enough actors that can make progress at any given time. You will always have a maximal usage of the system's resources, but...

If you have a specific bottleneck, one or two actors that are crucial in your program that can not keep up with the rest, they become the slowest link in the chain.

Certain actors should spawn sub tasks rather than blocking on them. Eg. an actor that processes incoming requests from the network. If the back-end processing of those messages is async, this actor should probably continue to process other messages while the back-end generates a response. Again, it depends on the situation. If you count serving many connections concurrently and have one actor per connection, maybe it's fine for that actor to do but one thing at a time.

Note that futures::SinkExt::send also blocks the task, but only until the message has been delivered into the mailbox of the receiving actor. It doesn't wait until the message is processed, but will block if the mailbox is full.

5. Backpressure

In any concurrent system special care always must be taken when it comes to back pressure. If your system get's flooded with incoming messages faster than they can be processed, you can have unbounded memory consumption (unbounded channels), deadlocks (bounded channels, see above) as well as CPU exhaustion. You must have a conscious plan for how backpressure is provided to slow down incoming messages. Sometimes bounded channels can be a solution if you are careful not to deadlock. Other times you will need to build a custom back pressure mechanism to to limit the number of in flight requests (this is what thespis_remote does).

You can also wrap the channel you pass to thespis_impl with stream_throttle.

Handling overload is an interesting article from the Erlang world examining this rather difficult problem.

6. Actor Lifetime

Thespis follows a model very similar to the Rust memory model. The mailbox for an actor will stop running when all addresses to this actor have been dropped and all outstanding messages have been processed. This is neat, but it does oblige you to be very conscious about where addresses are lingering, or your program won't terminate, or you might have a memory leak.

The initial ambition was to say an address is always valid, and thus sending can conveniently be infallible. In practice this doesn't work, since an actor might panic while processing a message. When using remote actors, the network or the remote process might go down and on top of that, most channel implementations have a fallible send.

I kept this lifetime management, because usually having addresses around to actors that are no longer running is a sign of sloppy coding. thespis_impl has support for a weak address, which will not keep the mailbox alive. This is particularly handy for situations like an actor which needs its own address, but shouldn't keep itself alive. Also be wary of two actors which have each other's address.

One way to sidestep this is to pull the plug. That is if you cancel the future running the mailbox, well it's terminated, however this is not recommended. If you really want an actor to stop it's own mailbox, regardless of other components that might still want to use it, you can let it spawn itself so it can have a JoinHandle that it can use to abort the mailbox.

futures::stream::abortable also let's you terminate a channel receiver manually. That's another way you can stop a mailbox.

Re-entrancy

Some actor implementations do not adhere to the one message at a time. Thespis does not have this issue. It can to some extend solve the deadlock issue described above, but it creates problems of it's own. I think actix does this with the ActorFuture, but I haven't checked in a while, so take that with a grain of salt and double check.

!Send Actors

In principle thespis should be unconcerned by whether actors and message types are Send. Both types are user supplied and the user has control over what executor they use and on what threads they run actors.

In practice it's not so simple. Rust requires one to be specific about Sendness in a number of type signatures, namely for a type in a Box. This means that we would have to double the whole interface and implementation if we were to support !Send messages. I have chosen not to do this for now.

A compromise is made with !Send actors. It turns out we can support them with reasonable boilerplate. For this, Handler has 2 methods, handle and handle_local. Mailbox implementations should provide a way to be spawned locally, and call handle_local in that case.

The interface keeps handle as a required method, but handle_local does call handle by default. This means there is no change in API for Send actors, but when you have a !Send actor, you must implement handle_local and provide an implementation for handle with an unreachable. This should never be called.

Example

This is a basic example of a !Send actor:

//! Spawn an actor that is !Send.
//
use
{
   thespis         :: { *                                         } ,
   thespis_impl    :: { Addr                                      } ,
   futures         :: { task::LocalSpawnExt, executor::LocalPool  } ,
   std             :: { marker::PhantomData, rc::Rc, error::Error } ,
};


#[ derive( Actor, Debug ) ] struct MyActor { i: u8, nosend: PhantomData<Rc<()>>}
#[ derive( Clone, Debug ) ] struct Ping( String )   ;


impl Message for Ping
{
   type Return = String;
}


impl MyActor
{
   async fn add( &mut self, mut x: u8 ) -> u8
   {
      x += 1;
      x
   }
}


impl Handler<Ping> for MyActor
{
   // Implement handle_local to enable !Send actor and mailbox.
   //
   #[async_fn_local] fn handle_local( &mut self, _msg: Ping ) -> String
   {
      // We can still access self across await points and mutably.
      //
      self.i = self.add( self.i ).await;
      dbg!( &self.i);
      "pong".into()
   }

   // It is necessary to provide handle in case of a !Send actor. It's a required method.
   // For Send actors that implement handle, handle_local is automatically provided.
   //
   #[async_fn] fn handle( &mut self, _msg: Ping ) -> String
   {
      unreachable!( "This actor is !Send and cannot be spawned on a threadpool" );
   }
}


fn main() -> Result< (), Box<dyn Error> >
{
   let mut pool = LocalPool::new();
   let     exec = pool.spawner();

   let actor    = MyActor { i: 3, nosend: PhantomData };
   let mut addr = Addr::builder().spawn_local( actor, &exec )?;

   exec.spawn_local( async move
   {
      let ping = Ping( "ping".into() );

      let result_local = addr.call( ping.clone() ).await.expect( "Call" );

      assert_eq!( "pong".to_string(), result_local );
      dbg!( result_local );

   })?;

   pool.run();

   Ok(())
}

thespis_impl

This is the reference implementation for the interface defined in the thespis crate. This chapter goes over it's features and provides you with example code. The most basic example, hello world:

use
{
   thespis         :: { *            } ,
   thespis_impl    :: { *            } ,
   async_executors :: { AsyncStd     } ,
   std             :: { error::Error } ,
};


#[ derive( Actor ) ]
//
struct MyActor;


struct Hello( String );

impl Message for Hello
{
   type Return = String;
}


impl Handler< Hello > for MyActor
{
   #[async_fn] fn handle( &mut self, _msg: Hello ) -> String
   {
      "world".into()
   }
}


#[async_std::main]
//
async fn main() -> Result< (), Box<dyn Error> >
{
   // .start here spawns your mailbox/actor immediately on the given executor and
   // detaches the joinhandle. You can also use the `spawn..` functions on the builder
   // in order to get a JoinHandle which you should await as it will drop the mailbox
   // when dropped.
   //
   let mut addr = Addr::builder().start( MyActor, &AsyncStd )?;

   let result = addr.call( Hello("hello".into()) ).await?;

   assert_eq!( "world", result );

   Ok(())
}

Discussion

Let's quickly take a tour of the anatomy of this simple program:


#![allow(unused)]
fn main() {
#[ derive( Actor ) ]
//
struct MyActor;
}

Actor is a trait defined in the thespis crate. It has no required methods, so you can easily derive it. MyActor here is what generally holds the (mutable) state of your actor. In this simple example there is no state, but otherwise you can manipulate it from within the implementation of Handler<T>. The mailbox will take ownership of your actor and after that you can only communicate with it by sending messages through the address you get back. Once you give it to the mailbox you can no longer call methods on it.


#![allow(unused)]
fn main() {
struct Hello( String );

impl Message for Hello
{
   type Return = String;
}
}

Hello is a message type. The type system will guarantee that you can never send a message type to an actor unless it implements Handler for that type and the type implements the Message trait. As you will have to implement this trait for your message types, you will have to wrap types that are not defined in your crate in order to use them as a message. Here we wrap String. If your handler might panic, please make sure the message type is UnwindSafe, as the mailbox will call catch_unwind on the handler method. This allows us to elegantly allow supervising of actors. All together it is recommended that your handlers don't panic, rather return a Result if they need to be fallible. Nevertheless, messages in the actor model are meant to be data and not have any shared resources like locks or references in them.

The associated type is the return type of the handler. When using Address::call your actor can return a value to the caller, making it easy to implement request-response type communication, mimicking a method call. Sending a message to an actor is always asynchronous. Note: we could also have written -> <Hello as Message>::Return as the return type here. In any case, it needs to be the same type.


#![allow(unused)]
fn main() {
impl Handler< Hello > for MyActor
{
   #[async_fn] fn handle( &mut self, _msg: Hello ) -> String
   {
      "world".into()
   }
}
}

Here we define that MyActor can process messages of type Hello. The body of the function does the actual processing. As you can see it receives a &mut self, even though we know that all messages are sent asynchronously. This is the main advantage of the actor model. Even though any place in your code that has this actor's address can easily send messages, you never need thread sync like locks or the infamous Rc<Refcell> on your data. Only this actor can access it's own state directly and the only way to communicate with it is through sending messages. Further more the mailbox will make the actor process one message at a time, so there is never shared access to the mutual state.

Using plain Object Oriented Programming in async Rust with methods that access state is very difficult, since as soon as you spawn any task, that task cannot hold any references to anything outside of it and to make matters worse, you shouldn't hold a mutex across an await point. The actor model sidesteps these problems, as any code that needs to communicate with an actor only needs the address, not a reference to the actor itself. Addr implements clone, so you can have many places of your program talk to the actor.

The async_fn macro deals with the fact that Rust doesn't support async trait methods at the moment. It does this in a very similar way as the async-trait crate, but it outputs much simpler code, making it compatible with a hand written version of this method, which was not possible with async-trait.

A handwritten version would look like:


#![allow(unused)]
fn main() {
impl Handler< Hello > for MyActor
{
   fn handle( &mut self, _msg: Hello ) -> Return<'_, String>
   {
      Box::pin( async
      {
         "world".into()
      })
   }
}
}

Where Return is defined as:


#![allow(unused)]
fn main() {
/// A boxed future that is `Send`, shorthand for async trait method return types.
//
pub type Return<'a, R> = Pin<Box< dyn Future<Output = R> + 'a + Send >>;
}

Note that within this handler you can await as it is asynchronous, but that while it is waiting, this actor will not process any more messages.


#![allow(unused)]
fn main() {
let mut addr = Addr::builder().start( MyActor, &AsyncStd )?;
}

We use a convenience builder to create the address we will use to send messages to our actor, and tell it to generate a default mailbox for it and spawn it on the provided executor.

The AsyncStd type comes from the async_executor crate which provides a uniform interface for executors, allowing us to be executor agnostic. We could just as well have given it a tokio executor or one from the futures crate, or wasm-bindgen on Wasm.

Note that this function takes our actor by value as we shouldn't access it anymore directly once it starts processing messages.


#![allow(unused)]
fn main() {
let result = addr.call( Hello("hello".into()) ).await?;
}

We use Address::call to send a message to our actor. This method will return to us a future that will resolve to the answer our handler returns. Note that Addr also implements futures_sink::Sink. You can use the combinators from the futures crate to forward an entire Sink into the address, as long as the actor implements Handler for the type the stream produces. The send method from the Sink trait will drop the returned value and will return to us as soon as the message is delivered to the mailbox, without waiting for the actor to process the message.

Thus you can also use the call method even if you don't want to return any value to be sure that the message has been processed, where as send is more like throwing a message in a bottle. You will still get back pressure from send as it will block when the channel between the Addr and the mailbox is full (as long as it's not an unbounded channel that is).

In the next chapter we will take a look at desugaring the builder and manually create our mailbox and address.

Weak and Strong addresses.

It doesn't figure in the basic example, but the mailbox of the actor stops when all addresses to it are dropped. The JoinHandle from the executor will return your actor to you if you want to re-use it later. When the mailbox is stopped because your actor panicked, you will retrieve the mailbox instead and you can instantiate a new actor and spawn it on the same mailbox, so all addresses remain valid. This is further elaborated in the chapter on supervision.

As the mailbox future returns your actor, you must be conscious when you rely on your actor being dropped to stop other actors. The order in which you await the mailboxes can matter and sometimes you must explicitly wrap the call in drop.

Thespis_impl also has weak addresses. These addresses don't keep the mailbox alive. It is handy when for example the actor needs it's own address. In this case you don't necessarily want it to keep itself alive. Creating a weak address is simple:


#![allow(unused)]
fn main() {
// Does not consume addr, hence it's not called downgrade.
//
let weak_addr = addr.weak();

// You can create a strong address from a weak one as long as the mailbox is still open.
// If all strong addresses were already dropped at this point, you will get an error instead.
//
let strong = match weak.strong()
{
   Ok(addr) => addr,
   Err(e)   => {} // -> ThesErr::MailboxClosed.
};
}

Important: The way the strong count works is that the mailbox only stops when the channel returns Poll::Pending. That is it will continue to process messages after all strong addresses are dropped. However, WeakAddr will refuse to take more messages from this point. Once the channel returns Poll::Pending, the mailbox checks the strong count. If it is zero it exits.

When the mailbox is already Pending, it will be woken up when the last strong address is dropped.

This can be confusing if you use Poll::Pending for other purposes. For example the stream_throttle crate allows us to throttle a channel receiver. However, that will make it return Poll::Pending even though the channel isn't actually empty. If there are no strong addresses, the mailbox will stop and drop those messages. As most channels and stream wrappers don't properly implement or forward Stream::size_hint, we have no other way to check for an empty channel but to check for Poll::Pending.

Desugar Addr::builder()

Even if you will mostly be using the convenience builder to start your actors, its good to have an understanding of how things work at a lower level. So what would things look like if we wanted to manually create an actor? We will keep the simple example from the last chapter but desugar it. We already showed the desugaring of the async_fn macro. Here we show a full working example with the main function as desugared as we can. Just know that there are intermediate steps on the builder API if you only want to override certain parameters.

use
{
   thespis         :: { *                         } ,
   thespis_impl    :: { *                         } ,
   async_executors :: { AsyncStd, SpawnHandleExt  } ,
   std             :: { error::Error              } ,
   futures         :: { channel::mpsc             } ,
};


#[ derive( Actor ) ]
//
struct MyActor;


struct Hello( String );

impl Message for Hello
{
   type Return = String;
}


impl Handler< Hello > for MyActor
{
   #[async_fn] fn handle( &mut self, _msg: Hello ) -> String
   {
      "world".into()
   }
}


#[async_std::main]
//
async fn main() -> Result< (), Box<dyn Error> >
{
   let (tx, rx)  = mpsc::channel( 5 );
   let tx        = Box::new( tx.sink_map_err( |e| Box::new(e) as DynError ) );
   let mb        = Mailbox::new( Some("HelloWorld".into()), Box::new(rx) );
   let mut addr  = mb.addr( tx );
   let actor     = MyActor;

   let handle = AsyncStd.spawn_handle( mb.start( actor ) )?;

   let result = addr.call( Hello( "hello".into() ) ).await?;

   assert_eq!( "world", dbg!(result) );

   // This allows the mailbox to close. Otherwise the await below would hang.
   //
   drop( addr );

   // The JoinHandle will allow you to recover either your actor or the mailbox.
   //
   let actor = match handle.await
   {
      MailboxEnd::Actor(a) => a,

      // This would happen if your actor had panicked while handling a message.
      //
      MailboxEnd::Mailbox(_mailbox) => unreachable!(),
   };

   Ok(())
}

Channels

In thespis you can choose what channel will be used for communication between the address and the mailbox. Out of the box the builder supports futures channels, in bounded and unbounded forms. But you might want to try a different channel type. One interesting application is if you want to use a channel that overwrites older messages instead of providing back pressure, like the one in the ring-channel crate.

The abstractions the thespis types work on are defined as:


#![allow(unused)]

fn main() {
/// Interface for T: Sink + Clone
//
pub trait CloneSink<'a, Item, E>: Sink<Item, Error=E> + Unpin + Send
{
   /// Clone this sink.
   //
   fn clone_sink( &self ) -> Box< dyn CloneSink<'a, Item, E> + 'a >;
}


impl<'a, T, Item, E> CloneSink<'a, Item, E> for T

   where T: 'a + Sink<Item, Error=E> + Clone + Unpin + Send + ?Sized

{
   fn clone_sink( &self ) -> Box< dyn CloneSink<'a, Item, E> + 'a >
   {
      Box::new( self.clone() )
   }
}


/// A boxed error type for the sink.
//
pub type DynError = Box< dyn std::error::Error + Send + Sync >;

/// Type of boxed channel sender for Addr.
//
pub type ChanSender<A> = Box< dyn CloneSink< 'static, BoxEnvelope<A>, DynError> >;

/// Type of boxed channel receiver for Mailbox.
//
pub type ChanReceiver<A> = Box< dyn futures::Stream<Item=BoxEnvelope<A>> + Send + Unpin >;
}

Thus the sender must implement Sink, Clone, Unpin and Send and it's error type must be Box< dyn std::error::Error + Send + Sync >. The receiver must be infallible and implement Stream, Send and Unpin. If your favorite channel implementation does not provide the Sink interface on it's sender, you can often wrap them and implement the Sink yourself.

Mailbox and Addr

Next we create our mailbox and address:


#![allow(unused)]
fn main() {
let mb       = Mailbox::new( Some("HelloWorld".into()), Box::new(rx) );
let mut addr = mb.addr( tx );
}

The first parameter on Mailbox::new is an optional name. This will be used in logging to help you identify which actor is doing what. Every mailbox created in the process will also have a unique numeric id, but you can also name them to make it easier to understand your logs. Addr also exposes both the id() and name(). If two addresses return the same id, they both talk to the same mailbox and thus actor.

We feed both ends of the channel to the constructors of Mailbox and its addr method. You may notice that we just created both the mailbox and the address without even having instantiated an actor yet. That is because we only really need an actor when we start the mailbox. This has an interesting property. If we want our actor to have a copy of it's own address, we can just pass it along in it's constructor, since as soon as we start it we can only communicate to it through messages. It would be a bit of a pain to have to create a specific message type to give the actor it's own address.

Starting the mailbox


#![allow(unused)]
fn main() {
let actor  = MyActor;
let handle = AsyncStd.spawn_handle( mb.start( actor ) )?;
}

Next we create an actor and pass it to Mailbox::start. This method returns a future that you can spawn however way you like on the executor of your choice.

We use the AsyncStd wrapper from async_executors here. One essential difference with using async-std directly is that the joinhandle this returns will cancel the future when you drop it, which is a way you can stop an actor. The recommended way is to drop all addresses to the actor.

Stopping the actor


#![allow(unused)]
fn main() {
drop( addr );
handle.await;
}

As described above, to drop an actor we drop all addresses to it. Now we can await the mailbox which will terminate.

Supervising an actor

A common pattern in actor software is supervision. The idea is to monitor a hierarchy of actors and if any one crashes, it's supervisor will spawn a new one to replace it. This is a strategy for more robust software. In terms of Rust actors, I would rather suggest you try to make your handlers panic free, and let them return a Result that you can handle if they are fallible, but that is not always possible and you can use the supervision pattern with thespis.

The following example is relatively straightforward. The main feature allowing supervising in thespis is that the mailbox future returns the mailbox to you if the actor panics while processing a message. This means that you can just instantiate a new actor and spawn the mailbox again and it will be fully operational. All addresses to it will just remain valid.

Warning: This does mean that mailbox uses catch unwind on your handlers. That means your messages should be unwindsafe, but as you shouldn't be using shared mutability anyway this should already be the case.

use
{
   thespis           :: { *                    } ,
   thespis_impl      :: { *                    } ,
   tracing           :: { *                    } ,
   futures::task     :: { Spawn, SpawnExt      } ,
   std               :: { error::Error         } ,
   async_executors   :: { AsyncStd, JoinHandle } ,
};


#[ derive( Actor ) ] struct Counter;

struct Add(usize);


impl Message for Add  { type Return = usize; }


// This is a silly actor, if you send it an even number, it panics.
// Otherwise it returns your number to you.
//
impl Handler< Add > for Counter
{
   #[async_fn] fn handle( &mut self, msg: Add ) -> usize
   {
      if msg.0 % 2 == 0 { panic!(); }

      msg.0
   }
}


// Actor that can supervise other actors. It will start the actor for the first time
// if it's not already started.
//
#[ derive( Actor ) ]
//
struct Supervisor
{
   // We will need to spawn the mailbox again if the actor panics, so we need an executor.
   // If you want to make this more hierarchical, you can use a nursery from the async_nursery
   // crate to tie all these subtasks to the lifetime of the supervisor and prevent them from
   // getting orphaned.
   //
   exec: Box<dyn Spawn + Send>
}


// The message we will be sending.
//
struct Supervise<A: Actor>
{
   mailbox : Option< JoinHandle<MailboxEnd<A>>> > ,

   // A closure that knows how to instantiate the supervised actor.
   // You could also require that A: Default.
   //
   create: Box< dyn FnMut() ->A + Send > ,
}

impl<A: Actor + Send> Message for Supervise<A>
{
   type Return = Option< Addr<A> >
}


// Note how the actor type is a generic parameter, so this supervisor works for
// actors of any type.
//
impl<A: Actor + Send> Handler< Supervise<A> > for Supervisor
{
   #[async_fn] fn handle( &mut self, mut actor: Supervise<A> ) -> Option< Addr<A> >
   {
      let mut addr = None;

      let mut mb_handle = if actor.mailbox.is_none()
      {
         let (addr_new, mb_handle) = Addr::builder().start_handle( (actor.create)(), &AsyncStd ).unwrap();

         addr = Some(addr_new);

         mb_handle
      }

      else { actor.mailbox.take().unwrap() };


      let supervisor = async move
      {
         // This is where the magic happens. Every time the handle resolves, we spawn again
         // and replace it with a new handle.
         //
         // When this returns MailboxEnd::Actor, it means the actor has stopped naturally and we don't respawn it.
         //
         while let MailboxEnd::Mailbox(mb) = mb_handle.await
         {
            mb_handle = mb.start_handle( (actor.create)(), &AsyncStd ).unwrap();
         }
      };

      self.exec.spawn( supervisor ).unwrap();

      addr
   }
}


#[async_std::main]
//
async fn main() -> Result< (), Box<dyn Error> >
{
   tracing_subscriber::fmt::Subscriber::builder()

      .with_max_level( Level::DEBUG )
      .init()
   ;

   let mut supervisor = Addr::builder().start( Supervisor{ exec: Box::new( AsyncStd ) }, &AsyncStd )?;


   // Here we use a closure to create new actors, but if you don't need to capture
   // anything from the environment you might as well just implement `Default` for
   // your actor type.
   //
   let create = Box::new( ||
   {
      debug!( "Creating a new Counter" );
      Counter
   });

   let supervise = Supervise
   {
      create,
      mailbox: None,
   };

   let mut addr = supervisor.call( supervise ).await?.unwrap();

   // Both of these will make the actor panic:
   //
   assert!(matches!( addr.call( Add(10) ).await, Err( ThesErr::ActorStoppedBeforeResponse{..} ) ));
   assert!(matches!( addr.send( Add(10) ).await, Ok(()) ));

   // Yet, our actor is still responding.
   //
   assert_eq!( addr.call( Add(11) ).await, Ok(11) );

   Ok(())
}

Processing messages concurrently

In principle an actor processes one message at a time. This restriction only makes sense however if the actor has mutable state. This avoids all synchronization problems. But what if your actor doesn't have mutable state? In that case we can process messages concurrently without any issues.

The following example also shows how you can tie the lifetime of the sub-tasks to the lifetime of the actor by using a nursery:

use
{
   thespis           :: { *                                                 } ,
   thespis_impl      :: { *                                                 } ,
   std               :: { error::Error                                      } ,
   futures           :: { FutureExt, task::{ SpawnError }                   } ,
   async_executors   :: { AsyncStd, SpawnHandle, SpawnHandleExt, JoinHandle } ,
   async_nursery     :: { Nursery, NurseErr, Nurse, NurseExt                } ,
};


type DynError = Box<dyn Error + Send + Sync>;


#[ derive( Actor ) ]
//
struct MyActor
{
   // We store the nursery_handle on our actor. That way the lifetime of all
   // tasks inside the nursery is tied to the lifetime of our actor. If we drop
   // the actor, all subtasks that are still running will be dropped.
   //
   // Alternatively we could have a Handler for a Stop message, which would await
   // the nursery_handle to wait for all subtasks to finish before dropping this actor.
   // That last one can even be combined with a timeout to limit how long we wait for
   // subtasks to finish naturally before dropping them.
   //
   nursery: Box< dyn Nurse<()> + Send >,
   _nursery_handle: JoinHandle<()>,
}


struct Ping;

impl Message for Ping { type Return = Result<(), NurseErr>; }


impl MyActor
{
   pub fn new( exec: impl SpawnHandle<()> + Clone + Send + 'static ) -> Result<Self, SpawnError>
   {
      let (nursery, output) = Nursery::new( Box::new( exec.clone() ) );

      let _nursery_handle = exec.spawn_handle( output )?;
      let nursery         = Box::new( nursery );

      Ok( Self
      {
          nursery        ,
         _nursery_handle ,
      })
   }
}


impl Handler<Ping> for MyActor
{
   // For this usecase we don't use the `async_fn` macro since we don't want our entire
   // handler to be in the returned future. We want to set some thing up first.
   //
   fn handle( &mut self, _msg: Ping ) -> Return<'_, <Ping as Message>::Return >
   {
      // If self had properties wrapped in Arc, we could clone them here to pass them
      // into the future.
      //
      // However we can't actually pass the reference to self in if we want it
      // to run concurrently.
      //
      // In theory you can have something in an Arc<Mutex>> or atomic variables, but
      // you really must be careful. As these run concurrently, values will change in the
      // middle of processing which is a footgun.
      //
      // If we would need to update self with the result of an async operation, we can
      // store our own address on self, clone that to pass it into the spawned task
      // and send the result to ourselves.
      //
      let processing = async move
      {
         // do something useful.
      };

      // Processing will now run concurrently.
      //
      let result = self.nursery.nurse( processing );

      // If spawning failed, we pass that back to caller.
      // We are now immediately ready to process the next message even while processing is
      // still running.
      //
      // We return the result of processing to the caller.
      //
      async move { result }.boxed()
   }
}


#[async_std::main]
//
async fn main() -> Result< (), DynError >
{
   let     actor = MyActor::new( Box::new(AsyncStd) )?;
   let mut addr  = Addr::builder().start( actor, &AsyncStd )?;

   // Admittedly, this looks a bit weird. Call is fallible, and it returns a result over
   // the NurseErr, since the handler needs to spawn and spawning is fallible.
   //
   addr.call( Ping ).await??;

   Ok(())
}

Abstract the Actor type

The last chapters cover the most straight forward use of thespis. But how does it all compose? What if need to store a list of addresses to different actors that accept a certain type of message? What if I have to send 2 different message types to this heterogeneous list? Addr is actually Addr<A>. It's generic over a type of actor, thus it can send all the message types this actor implements Handler for, but that does mean we can not store a list of addresses to different actors.

The solution to this is the trait Address<M> from the interface library thespis. We can store a Box< dyn Address<M, Error=E> + Send + Unpin > and thus we can store addresses to different actors in the same collection. Because this type is unwieldy, thespis has a shorthand: BoxAddress<M, E>. The error type comes from the channel Sender. thespis_impl wraps the errors from different channels in ThesErr, so generally if you use the Addr type, that's the error type you should use.

Of course, if your actor types are known at compile time and you don't have to many of them you can also use an enum:


#![allow(unused)]
fn main() {
enum Postman
{
   Foo( Addr<Foo> ),
   Bar( Addr<Bar> ),
}
}

However, you will have to match and de-structure the enum to actually send anything. And mind you that sometimes boxing doesn't have any measurable overhead, so the inconvenience might well not be worth it.

Multi dimensional

But what if you have several message types and you will have to send them to several types of actors? Unfortunately we cannot currently express Box< dyn Address<Foo> + Address<Bar> > in rust. There is a piece of boilerplate that solves this however:


#![allow(unused)]
fn main() {
struct Foo;
struct Bar;

impl Message for Foo { type Return = (); }
impl Message for Bar { type Return = (); }


trait AddressFooBar: Address<Foo, Error=ThesErr> + Address<Bar, Error=ThesErr> {}

impl<T> AddressFooBar for T

   where Self: Address<Foo, Error=ThesErr> + Address<Bar, Error=ThesErr>
{}
}

Now we can have Box< dyn AddressFooBar > and be able to send both Foo and Bar messages. The trait_set crate can make this more streamlined.

Unknown unknowns

Sometimes it's not even known in your library which types of actors and which types of messages you will have to handle, because they are user defined. Yet you still need to store a bunch addresses. You will need some other information at runtime that indicates which type the actor and message are, to enable downcasting.

In this case you can store the addresses as Box< dyn Any > and then downcast them at runtime to the right type. We can store a collection eg. HashMap of Box< dyn Any > and downcast them to Box< dyn Address<M, E> >. You will need to store (eg. in the keys of your hashmap) what type you are actually holding to downcast it. See the recipient_any example.

Cloning trait objects

The clone trait is not object safe in Rust, so you can't make a trait object from a trait that requires implementers to also implement Clone. For this reason, the Address trait has a clone_box method that requires implementors to be able to produce a BoxAddress<M>. This way if you have a boxed address you can still clone it conveniently.

Debugging

Debugging asynchronous programs can be a bit of a challenge. thespis_impl tries to give you the relevant information and tooling to succeed in seeing exactly what is going on in your program. The main feature is generating useful logs about what's going on so you can check what each concurrent component is doing.

thespis_impl uses tracing for it's logging. Its possibility of instrumenting futures and executors with spans (through tracing-futures) makes it uniquely suitable for logging in async applications.

Whenever the Mailbox runs, it will enter a Span identifying your actor by id and name. Log events within Addr and WeakAddr will also be within such span. This will help you identify which actor messages are coming from. It is important you choose a subscriber that prints span information.

In order to turn on logging, setup a basic tracing subscriber. You can add this to the beginning of your main function:


#![allow(unused)]
fn main() {
let _ = tracing_subscriber::fmt::Subscriber::builder().init();
}

One thing to note is that if within your handler you spawn on an executor, you will lose the association with the current Span. However you can use tracing::current_span() to get the Span and now you can use tracing-futures to instrument the future you want to spawn. Alternatively you can create a new Span and set the actor's Span as the parent.

In any case, anything you log in your handler will be tagged with the actor's span, but if you spawn new tasks, you will have to manually include a span if you want to avoid losing the association with the actor.

Using the log crate

If your application uses the log crate, but you want to receive the log events emitted by thespis_impl, please refer to the relevant documentation of the tracing crate.

Visualizing logs

The usual problem with async logs is that you will get all log lines interleaved. This makes it very hard to reason about what is going on. The tracing spans thespis_impl adds allow us to know which actor a log event is associated with. This means we can separate out logs per actor and put them side by side.

I wrote a little web app called tracing_prism that lets you visualize your logs in columns. By using the names or id's of your actors as filters, you can see the flow of several concurrent components side by side. For best results, generate your logs in json format:


#![allow(unused)]
fn main() {
let _ = tracing_subscriber::fmt::Subscriber::builder()
	.json()
	.init()
;
}

Backpressure

Backpressure is an important concept in asynchronous message based systems. Backpressure allows receivers to slow down senders when they cannot process messages/requests fast enough. This limits the memory consumption and improves latency.

The most basic form of back pressure in thespis can be achieved by using bounded channels for mailboxes. This will make sender tasks wait before putting messages in a mailbox that is full. Thereby slowing down parts of the process that are outpacing other parts. There is a number of situations in which this can work well, but there is one particular trap. By creating a circular shape with one actor acting as a gate to the outside world, receiving and sending to the outside, you can create a deadlock.

The issue is described at length with illustrations in this article. Please go read that and then come back.

The solution to this problem in it's simplest form is indeed to have a separate outgoing queue for the gateway actor. In thespis this is relatively simple with the use of futures::stream::select_with_strategy. There is also a demonstration of this in the examples directory.

However, this is only the most simple form of this problem. This is a closed circuit with one node that acts as a gateway. Eg. manages the incoming and outgoing messages. So when we have a priority queue, the incoming messages do not fill up the outgoing queue and as soon as the processing actor (or pipeline for that matter) can offload a response, they will get a new message from their mailbox. This free slot will propagate all the way back to the gateway. The gateway can now offload the current message to the processing actor and will now processe the outgoing message first (as it'a priority queue). Thus, the system cannot deadlock.

However imagine the gateway is a client connection to our server. And imagine we can have more than 1 client connected at the same time (well, it's a big motivator for async architecture, isn't it). Now our solution no longer works. As now all the client connections are competing for new free slots in the processing actor/pipeline. If it so happens that the gateway which received outgoing messages in their outgoing queue loses this competition, it's possible that their outgoing queue fills up and hopla. Deadlock 2.0.

A solution for this problem is to forgo the mailboxes for back pressure and to use a separate system, like a semaphore. Gateways need permits before sending more requests into the system. This works, but it's not trivial to define exactly what is the right number of permits here, especially if someone might come along later and change the architecture of the system.

Also, it only solves our exact problem. Yes, things can get worse. What if the processing pipeline isn't exactly linear, but a complex application? What if some of those actors can actually create new messages that end up in the system? You'd have to make absolutely sure that they reserve a slot in the semaphore before doing so, if not the semaphore no longer accurately keeps track of the number of messages that are currently in the system. What if they can create new actors?

An oft suggested solution to all of this is to use unbounded channels. There are however several problems with them. First of all, it means no more backpressure. In itself that's fine as we established that using the mailbox as a backpressure mechanism in this case is a bad idea. However more fundamentally the problem is that unbounded channels don't exist. They would require unbounded memory, which most systems don't have, as well as create unbounded latency which is generally not desirable.

Another solution is to let the gateway spawn a new task for every incoming message, making it completely resistant against this type of deadlock. However here an external back pressure mechanism like a semaphore becomes really important, otherwise we have a situation similar to unbounded channels, which doesn't really work.

There is in my opinion no one size fits all solution here. You must carefully think your architecture through, and be conscious about your queue-sizes.

Introduction to thespis_remote.

Thespis_remote adds support for remote actors to the thespis actor model. It follows the same rather low level approach as the rest of thespis. That means it provides the ability to send actor messages over one connection. The connection thespis_remote works over is a Sink and Stream of a given wire format. The wire format has an encoder/decoder so works over anything that implements AsyncRead/AsyncWrite.

There is no "system" or service discovery integrated. It requires you to connect two processes by any means of your choice and provide the above interfaces to thespis_remote. From there thespis_remote provides:

  • RemoteAddr which allows you to send messages as if you were sending to an actor in the same process (only difference, send does not guarantee in order delivery within the remote process).
  • ServiceMap, a trait and provided reference impl that knows how to deserialize your actor messages and dispatch them to the right actor. Note that you can only set one actor per message type per service map, so you are actually sending to a remote process rather than to an individual actor in that process. However you can easily wrap your message with an actor id and provide more precise delivery if you want.
  • WireFormat, a trait and reference impl (using CBOR) for the actual wire format used.
  • Peer, an actor that manages a connection endpoint. You provide it with a service map to "serve" over the connection and for sending messages, you pass it's address to the constructor of RemoteAddr, implying that you know that the other side of the connection managed by this Peer exposes these services. This last part is the only thing we cannot guarantee in the type system, as the compiler does not know about the process on the other end of your connection.

These are the four core components of the library.

thespis_remote also provides relaying of messages. Further some features are included to deal with back pressure and to make it resistant to attacks like DOS and slow loris.

Apart from this guide you can also consult the API documentation and the examples.

Scope

The scope of thespis_remote is specifically as described above. On the low end of the scope this means there is no management of your connection. When the connection fails, Peer will generate an event for you to react on. You have to re-connect and re-setup your Peer and service map. This limitation is necessary to keep the scope of the project reasonable as many different types of connections can be used. You can obviously write your own wrapper that provides automatic reconnect and maybe in the future something like that will be provided by the thespis project, but it won't be in thespis_remote.

On the high end of the scope, thespis_remote does not handle any application logic. Most prominently authentication/authorization comes to mind. What if a user has to authenticate before being allowed to send certain messages to your service. It generally comes down to 2 key components: prevent MITM by end to end encryption (do this on the connection level) and sending along some proof of identity (think cookie in HTTP land). thespis_remote does not have any instrumentation for this and it will be up to you to handle this in your message types.

You might feel cheated that you can't address individual actors on a remote process. There are good reasons for this however and nothing stops you from implementing individual actor addressing on top of thespis_remote. In thespis everything is as much as possible statically type checked. By considering a process that exposes it's possibility to receive messages of certain types, we adhere to a public interface like a contract. It allows a remote process to statically create addresses to send us messages, rather than having to send over addresses at runtime.

Actors can accept several types of messages, but some of those might not be meant for external use. If we were to expose an address to a specific actor type as is, all of it's interface would be remotely exposed. We would also need to compile in the actor type in the remote process, where with the current design only message types need to be shared between the two processes.

thespis_remote is minimal in the sense that it creates the fundamental building block for remote actor communication. Systems, discovery and individual actor addressing can all be implemented on top of it.

Several components of the library can easily be customized. The traits WireFormat and ServiceMap allow you to create your own specialized implementations if desired.

ServiceMap

A service map is a type that knows how to deliver messages to individual actors. It dispatches if you will. You can tell Peer to expose a set of services remotely by bundling them in a ServiceMap and registering that map.

This makes it possible to expose different sets of services on different connections. For the moment it is not possible to dynamically change what services are exposed at runtime. The problem here is that we consider the service map a contract between two processes, so if all of a sudden we would stop providing certain services, the remote process might run into errors. This might change in the future, with some thought of how we inform a remote process of what services are available at any given time. One purpose might be to deal with authentication, enabling certain services only once a user has authenticated. However for now you have to deal with authentication yourself by adding a token in your messages. The receiving actor doesn't actually know which connection a message comes from.

There are three implementations of ServiceMap provided by thespis_remote:

  • service_map!, a macro
  • RelayMap, used for relaying messages to a different process.
  • PubSub, for relays to implement a publish/subscribe architecture.

service_map!

This is a macro because as it needs to deserialize your actor messages, it needs to know it's types, but that can only be provided inside your crate, not in thespis_remote. Thus it gives you all that functionality that needs to be in your crate to make things convenient so you don't have to write that yourself. It's main use looks like:


#![allow(unused)]
fn main() {
use
{
   thespis_remote :: { CborWF, service_map    } ,
   serde          :: { Serialize, Deserialize } ,
};

// Compared to local actor messages, remote once must also implement Serialize and Deserialize from Serde.
//
#[ derive( Serialize, Deserialize, Debug ) ] pub struct Add( pub i64 );
#[ derive( Serialize, Deserialize, Debug ) ] pub struct Show;

impl Message for Add  { type Return = ();  }
impl Message for Show { type Return = i64; }


// Remember that WireFormat is a trait. _thespis_remote_ is generic over the actual type, but it surely is part
// of the contract between 2 processes what wire format is being used. So you have to specify it for the macro.
//
service_map!
(
   namespace  : my_fancy_app_com ;
   wire_format: CborWF           ;
   services   : Add, Show        ;
);
}

The meaning of the parameters:

  • namespace: this will be transformed into a module in your code. It is also used to uniquely identify services to avoid name collisions.
  • wire_format: The wire format used for connections that expose this service map, in this case CborWF, the default implementation provided by thespis_remote.
  • services: The message types that will be served by this service map.

Usually you will put this macro as well as the message types mentioned in a separate crate that can be compiled into both processes that will communicate to eachother.

You can of course interact with such process also from binaries written in different languages as long as they correctly speak the wire format.

Usage of service_map! looks like this on the side that exposes these services:


#![allow(unused)]
fn main() {
use
{
   thespis         :: { Actor, async_fn } ,
   thespis_impl    :: { Addr            } ,
   async_executors :: { AsyncStd        } ,
};

// Let's imagine a simple actor that can receive `Sum` and `Show`.
//
#[ derive(Actor) ] pub struct Sum( pub i64 );

impl Handler<Add> for Sum
{
   #[async_fn] fn handle( &mut self, msg: Add ) -> ()
   {
      self.0 += msg.0;
   }
}


impl Handler< Show > for Sum
{
   #[async_fn] fn handle( &mut self, _msg: Show ) -> i64
   {
      self.0
   }
}

// Create mailbox for our handler and start it using async-std as the executor.
// The type of addr_handler is `Addr<Sum>`.
//
let addr_handler = Addr::builder().spawn( Sum(0), &AsyncStd )?;

// Create a service map.
//
let mut sm = my_fancy_app_com::Services::new();

// Register our handler. In this case the same actor will handle both types of messages.
//
sm.register_handler::<Add >( addr_handler.clone_box() );
sm.register_handler::<Show>( addr_handler.clone_box() );

// Register sm with a peer. See next chapter.
// ...
}

On the side that wants to use the service, you can obtain a RemoteAddr that accepts all the message types declared in the service_map macro. This cannot be statically verified by the compiler as the other side is generally another process. So you basically declare that you know that the process on the other end accepts messages of this type. Apart from this, everything is statically type checked in thespis.


#![allow(unused)]
fn main() {
// Call the service and receive the response.
//
let mut addr = my_fancy_app_com::RemoteAddr::new( peer_addr );

assert_eq!( Ok(()), addr.call( Add(5) ).await );
assert_eq!( Ok(()), addr.send( Add(5) ).await );
assert_eq!( Ok(10), addr.call( Show   ).await );
}

RemoteAddr works pretty much like thespis::Addr, except for the error type which will be PeerErr instead of ThesErr because a lot more things can go wrong when dealing with messaging over a network connection and you probably want to know what went wrong when it does.

RelayMap

A RelayMap has the same place in the workflow as Services created by the service_map! macro, but instead of delivering to a local actor it passes on the message to a Peer that is connected to another process. This allows for transparent relaying to other backend services.

To create the RelayMap, you give it a ServiceHandler and a list of ServiceIDs that should be relayed. Then you register it as a service map with the Peer that is listening for the incoming requests just as Services.

The ServiceHandler is an enum that is either an address to a Peer or a closure that will provide an address on a case by case basis. The latter option allows you to do load balancing or other runtime checks/logs before producing the address.

For a working code example, check the relay example for thespis_remote.

PubSub

  • TODO

Peer

The Peer is the workhorse of thespis_remote. It manages a connection for you once you give it an Stream + Sink of the wireformat. It is generic over a WireFormat.

On the incoming path what it does looks like:

  1. It spawns a loop that reads incoming messages from the connection and deserializes them into the wire format struct. If then looks at what type of message it is to apply back pressure to calls but no to other types of message.
  2. This loop sends those to the Peer as messages, as the peer is an actor.
  3. Unless the message is an error or a reponse to a request we made, we look up if we have a service map that handles this message type. We then spawn a task and tell the service map to deliver the message. This prevents the Peer from being blocked while a message is being processed. It means it can still process outgoing messages as well as process several incoming messages concurrently.

When an outgoing message is a call (request/response) the peer will send a oneshot channel back to the caller. Next it will store the sender of that channel with the ConnID, which later allows detecting that an incoming message is a response to this request and send the answer through the oneshot channel.

Peer does not do connection management like reconnect on loss etc. It is observable and will emit an event if the connection closes. It's up to you to handle that and potentially reconnect.

The easiest way to create the peer is from an AsyncRead/AsyncWrite. Wireformats must have a method for that. Lets have a look:


#![allow(unused)]
fn main() {
use 
{
	async_executors::AsyncStd,
	thespis_remote::{ CborWf },
};

let (peer, peer_mb, peer_addr) = CborWF::create_peer
( 
	"server"             , // a name for this connection
	tcp_connection       , // A transport that implements AsyncRead/AsyncWrite
	1024                 , // Max read message size for the codec (in bytes)
	1024                 , // Max write message size for the codec (in bytes)
	Arc::new( AsyncStd ) , // An executor to spawn tasks
	None                 , // an optional semaphore for backpressure
	None                 , // an optional grace period to finish outstanding tasks when closing down

).expect( "create peer" );
}

See the documentation on the WireFormat trait for some more explanation. You can also use the slightly more low level Peer::new if you want more control. That expects a Stream/Sink of the wire format as well as 2 thespis addresses. One for incoming messages and one for outgoing messages, as it allows you to set up a priority queue with the futures::stream::select_with_strategy combinator making sure outgoing messages are prioritized. This is especially useful if your process accepts requests to improve latency and lower memory consumption. It can also lower the risk of deadlocks, but that will be further explained in the chapter on backpressure.

Once the peer is created as shown above, we can continue the example from the chapter on ServiceMap.


#![allow(unused)]
fn main() {
use 
{
	async_executors::AsyncStd,
	std::sync::Arc,
};

// Register service map with peer
//
peer.register_services( Arc::new( sm ) );

// Start the peer actor
//
let peer_handle = AsyncStd.spawn_handle( peer_mb.start(peer) ).expect( "start mailbox of Peer" );

// Wait for the connection to close, which will automatically stop the peer.
//
peer_handle.await;

}

Backpressure

Peer is a gateway in the sense described in the earlier chapter on backpressure. It uses both a semaphore as well as a priority channel.

Further more Peer spawns a new task for each incoming message. This guarantees it doesn't deadlock. As processing messages is asynchronous, and might even be relayed to other processes, this also means we can keep processing incoming messages concurrently rather than blocking and doing it serially.