Cancellable

4 min Kamil Rusin

Recently I’m investing a lot of time into developing a game server in Rust. I started with implementing network layer based on WebSockets. It’s far from being ready, but I developed a helper crate for creating detached, cancellable services.

# Game server backstory

The idea of developing an authoritative game server always seemed appealing to me. Network programming, however, has many pitfalls:

  • Server need to validate all use inputs to protect game state from bad actors.
  • Ill-formed data sent by one client should not deny service for another player.
  • Async programming is hard in general.

So, in the past I have had many attempts to develop a game server. Each improving on mistakes made in the previous one.

  • spectrum-old – A real-time multiplayer browser game,
  • Fusion-cpp – This is the source code of the server for the Fusion game,
  • [private repo],
  • [private repo],
  • [private repo].

And now I’m working on another. :thumbs_up:

This time, improvements over the previous one are creating implementation that depend on traits and organizing TODOs to a single GitHub project.

xkcd: Making Progress

Making Progress

xkcd #1906

# Cancellable crate

Network functionalities in game servers (listeners, TCP streams, etc.) await for some input and usually yield a result.

ComponentInputOutput
Listenernew incoming connectionClient object
TCP Streamdata packageparsed ClientMessage model
Ping servicetimer ticknew ping frame

We can define a trait that will describe common interface for all of them:

1#[async_trait::async_trait]
2pub trait Cancellable {
3    type Result;
4    type Error;
5
6    async fn run(&mut self) -> Result<CancellationResult<Self::Result>, Self::Error>;
7}

Method run performs a single unit of work of the service. Internally it can await for the input to be available and then return its result. If the returned value is Err(Self::Error) then the service completes. If it succeeds, then it should return Ok(CancellationResult). CancellationResult is an enum defined as follows:

 1pub enum CancellationResult<T> {
 2    Item(T),
 3    Continue,
 4    Break,
 5}
 6
 7impl<T> CancellationResult<T> {
 8    pub fn item(t: impl Into<T>) -> Self {
 9        Self::Item(t.into())
10    }
11}

Enum’s variant control whether the service will continue to perform its work. If the service produces a value, then it should wrap it as CancellableResult::Item(t); it’s also a signal that the service should continue to work. If no value is available, but the service should continue then it returns CancellableResult::Continue (similar to Poll::Pending).

If the service finishes its work successfully (e.g. when the peer closes the connection) then the service should return CancellableResult::Break.

Cancellable trait has methods for spawning the service as a detached, background task:

 1#[async_trait::async_trait]
 2pub trait Cancellable {
 3    // ...
 4
 5    async fn spawn(mut self, cancellation_token: CancellationToken) -> CancellableHandle<Self> {
 6        // ...
 7    }
 8
 9    async fn spawn_with_callback<F>(
10            mut self,
11            cancellation_token: CancellationToken,
12            mut callback: F,
13        ) -> CancellableHandle<Self>
14        where
15            F: FnMut(Self::Result) -> Result<(), Self::Result>
16        {
17            // ...
18        }
19}

Both return a handle, which can be awaited for the service to complete, once it has been cancelled!

If the service produces results, then it can be spawned with spawn_with_callback, to consume them. If the callback returns Err(Self::Result) then the service completes immediately.

This setup offers a way of detaching services which perform work “on their own”, but sometimes services need to accept additional data. An example is TCP stream: it reads data packages from a peer and consumes them via callback. However, if the server decides the connection should be terminated, then the service should complete its work.

Enter…

# Communicating with detached service

When we spawn the service task we already get a handle:

1let token = CancellableToken::new();
2
3let handle = service.spawn(token.clone()).await;

The handle can be used as an interface to send data to its service.

1handle.update(ConnectionStatus::TerminatedByServer(reason));

The actual interface needs to be implementation-dependent – defined in the Cancellable trait. By easily extending the trait we get:

1#[async_trait::async_trait]
2pub trait Cancellable {
3    // ...
4    type Handle;
5
6    async fn new_handle(&mut self) -> Self::Handle;
7}

When the service is spawned (either by spawn or spawn_with_callback), the method will call new_handle to construct the handle. The handle is owned by CancellableHandle, which implements Deref for Self::Handle type. With that setup, we can define a channel by which spawner can communicate with spawnee.

I like the final product, so I’ve packaged it as a crate. It’s available on crates.io.

🌊

Interested in my work?

Consider subscribing to the RSS Feed or joining my mailing list: madebyme-notifications on Google Groups .


Disclaimer: Only group owner (i.e. me) can view e-mail addresses of group members. I will not share your e-mail with any third-parties — it will be used exclusively to notify you about new blog posts.