Cancellable

Table Of Contents

Recently I’m investing a lot of time into developing a game server in Rust. I started with implementing network layer based on WebSockets. It’s far from being ready, but I developed a helper crate for creating detached, cancellable services.

Game server backstory

The idea of developing an authoritative game server always seemed appealing to me. Network programming, however, has many pitfalls:

  • Server need to validate all use inputs to protect game state from bad actors.
  • Ill-formed data sent by one client should not deny service for another player.
  • Async programming is hard in general.

So, in the past I have had many attempts to develop a game server. Each improving on mistakes made in the previous one.

  • spectrum-old – A real-time multiplayer browser game,
  • Fusion-cpp – This is the source code of the server for the Fusion game,
  • [private repo],
  • [private repo],
  • [private repo].

And now I’m working on another. :thumbs_up:

This time, improvements over the previous one are creating implementation that depend on traits and organizing TODOs to a single GitHub project.

xkcd: Making Progress

Making Progress

xkcd #1906

Cancellable crate

Network functionalities in game servers (listeners, TCP streams, etc.) await for some input and usually yield a result.

ComponentInputOutput
Listenernew incoming connectionClient object
TCP Streamdata packageparsed ClientMessage model
Ping servicetimer ticknew ping frame

We can define a trait that will describe common interface for all of them:

1#[async_trait::async_trait]
2pub trait Cancellable {
3    type Result;
4    type Error;
5
6    async fn run(&mut self) -> Result<CancellationResult<Self::Result>, Self::Error>;
7}

Method run performs a single unit of work of the service. Internally it can await for the input to be available and then return its result. If the returned value is Err(Self::Error) then the service completes. If it succeeds, then it should return Ok(CancellationResult). CancellationResult is an enum defined as follows:

 1pub enum CancellationResult<T> {
 2    Item(T),
 3    Continue,
 4    Break,
 5}
 6
 7impl<T> CancellationResult<T> {
 8    pub fn item(t: impl Into<T>) -> Self {
 9        Self::Item(t.into())
10    }
11}

Enum’s variant control whether the service will continue to perform its work. If the service produces a value, then it should wrap it as CancellableResult::Item(t); it’s also a signal that the service should continue to work. If no value is available, but the service should continue then it returns CancellableResult::Continue (similar to Poll::Pending).

If the service finishes its work successfully (e.g. when the peer closes the connection) then the service should return CancellableResult::Break.

Cancellable trait has methods for spawning the service as a detached, background task:

 1#[async_trait::async_trait]
 2pub trait Cancellable {
 3    // ...
 4
 5    async fn spawn(mut self, cancellation_token: CancellationToken) -> CancellableHandle<Self> {
 6        // ...
 7    }
 8
 9    async fn spawn_with_callback<F>(
10            mut self,
11            cancellation_token: CancellationToken,
12            mut callback: F,
13        ) -> CancellableHandle<Self>
14        where
15            F: FnMut(Self::Result) -> Result<(), Self::Result>
16        {
17            // ...
18        }
19}

Both return a handle, which can be awaited for the service to complete, once it has been cancelled!

If the service produces results, then it can be spawned with spawn_with_callback, to consume them. If the callback returns Err(Self::Result) then the service completes immediately.

This setup offers a way of detaching services which perform work “on their own”, but sometimes services need to accept additional data. An example is TCP stream: it reads data packages from a peer and consumes them via callback. However, if the server decides the connection should be terminated, then the service should complete its work.

Enter…

Communicating with detached service

When we spawn the service task we already get a handle:

1let token = CancellableToken::new();
2
3let handle = service.spawn(token.clone()).await;

The handle can be used as an interface to send data to its service.

1handle.update(ConnectionStatus::TerminatedByServer(reason));

The actual interface needs to be implementation-dependent – defined in the Cancellable trait. By easily extending the trait we get:

1#[async_trait::async_trait]
2pub trait Cancellable {
3    // ...
4    type Handle;
5
6    async fn new_handle(&mut self) -> Self::Handle;
7}

When the service is spawned (either by spawn or spawn_with_callback), the method will call new_handle to construct the handle. The handle is owned by CancellableHandle, which implements Deref for Self::Handle type. With that setup, we can define a channel by which spawner can communicate with spawnee.

I like the final product, so I’ve packaged it as a crate. It’s available on crates.io.

🌊

Interested in my work?

Consider subscribing to the RSS Feed or joining my mailing list: madebyme-notifications on Google Groups .


Disclaimer: Only group owner (i.e. me) can view e-mail addresses of group members. I will not share your e-mail with any third-parties — it will be used exclusively to notify you about new blog posts.