Emulating Actors in C# with Async/Await

posted by Craig Gidney on January 29, 2013

The actor model is an approach to concurrency based on having small single-threaded objects (actors), that can only interact via concurrent messages (as opposed to shared state, locks, condition variables, etc). This isolation not only makes it easier to reason about the actors, but allows for different actors to be distributed across different machines without any significant change in architecture. It’s no coincidence that Erlang, a language with a reputation for reliability and scalability, is based on the actor model.

Not surprisingly, there are several projects related to bringing the actor model (or actor-like things) to .Net. There’s the (cancelled) Axum language (a former DevLabs project), the Task Parallel Library Dataflow library, the Concurrency and Coordination Runtime, and plenty of small stuff.

In this post, I hope to explain a nice way to emulate actors in C#, with a minimum of cruft. To do so, we’ll take advantage of the new-in-C#5 async/await functionality. Basically: “messages” will be calls to, and results from, asynchronous methods posted to message queues (with the help of some machinery).

Motivating Example

Lets start with an example of our destination. A queue implemented as an actor:

public sealed class QueueActor<T> {
    // (note: we need to implement the ActorSynchronizationContext class)
    private readonly ActorSynchronizationContext _messageQueue
        = new ActorSynchronizationContext();
    private readonly Queue<T> _items = new Queue<T>();

    public async Task EnqueueAsync(T item) {
        // (note: we need to implement a custom awaiter to allow awaiting the message queue)
        await _messageQueue;
        _items.Enqueue(item);
    }
    
    // (note: using my option type (see post) for the result)
    public async Task<May<T>> TryDequeueAsync() {
        await _messageQueue;
        if (_items.Count == 0) return May.NoValue;
        return _items.Dequeue();
    }
}

This queue has two public methods: EnqueueAsync and DequeueAsync. Both start by awaiting the message queue. This will immediately return a task result to the caller, representing the eventual result, and post the rest of the method into the message queue (to be executed when its turn comes). EnqueueAsync will eventually place an item into the queue, whereas DequeueAsync will eventually try to remove an item from the queue. If either method were to throw an exception, then the exception would be propagated into the task result. If you’ve used the async/await functionality before, everything here should be familiar (except perhaps the semantics of awaiting something that isn’t a task).

Using this actor queue looks exactly like using a normal queue, except you need to await the changes:

// the async keyword allows using await
// we return a task, instead of void, so callers can await us finishing
async Task UseQueue() {
    var q = new QueueActor<int>;

    // sending messages and waiting for the responses
    await q.EnqueueAsync(1);
    May<int> r1 = await q.TryDequeueAsync(); // r1 will contain 1
    May<int> r2 = await q.TryDequeueAsync(); // r2 will contain no value

    // spamming messages, then later checking for the responses
    Task t3 = q.EnqueueAsync(2);
    Task<May<int>> t4 = q.TryDequeueAsync();
    Task<May<int>> t5 = q.TryDequeueAsync();
    await t3; // if our enqueue had failed somehow, this would rethrow the exception
    var r5 = await t5; // r5 will contain no value
    var r4 = await t4; // r4 will contain 2
}

This example shows two ways to interact with our queue actor. You can invoke and await methods one by one. This lets you use intermediate results to make decisions. Alternatively, you can invoke a series of methods and then only await the results afterwards. This is faster, because there are fewer round trips to and from the actor, but easier to get wrong. For example, it’s easier to forget to await one of the results, which (if it might fail) can result in an unobserved exception that will terminate the program (unless you have an unobserved task exception handler setup). Also, if one of the early methods fails, you’ve still called the later methods in the sequence (oops…).

So, summarizing what we’ve learned from this example, our actors will work as follows:

  • The actor uses a message queue to take care of running things in order and without overlap (on some other thread, without blocking callers).
  • Messages are represented by methods that return tasks.
  • Each actor method starts by awaiting the message queue. This causes the rest of the method to be run later. The caller will immediately get a result, a task, representing the actual eventual result of the method.
  • Callers access the eventual response to their messages by awaiting the tasks they receive (or by using ContinueWith/Wait).

Note that each individual actor method actually represents three “messages”: the request from the caller (“please enqueue an item”), the later response to the caller (“I enqueued your item”), and the implicit await-is-done-message to the caller (“Note to self: they enqueued my item, deal with it”). Having these different representations for a message (a method call, a task result, an implicit return-to-context) might not be in-theory-clean, but it’s incredibly convenient in practice. Being able to await results allows usage that looks like idiomatic C#, while allowing all kinds of concurrent interactions (with much less risk of accidentally ending up in unexpected states).

There’s two things we need to implement in order to make this all work: the message queue type (ActorSynchronizationContext) and a custom awaiter to allow awaiting message queues (SynchronizationContextAwaiter).

Actor Contexts

In .Net, a synchronization context roughly corresponds to a strategy for running methods. For example, the UI synchronization context will take “posted” methods and queue them to be run on the UI thread. The default synchronization context, on the other hand, runs methods on the thread pool.

Why are synchronization contexts important? Because, by default, awaiting a task will resume execution in the same context. This means that if you’re in the UI context, and you await a task, the rest of your method will still execute in the UI context. This is much more convenient than having to manually Invoke your way back after every asynchronous operation (like you used to have to do). Since our actor methods will return tasks, giving each actor a synchronization context allows them to send messages and await responses without accidentally leaving their individual contexts.

The main requirement that an actor synchronization context must satisfy is exclusion. Only one message may be processed at a time. All our actor synchronization context will actually do is wrap an underlying context to ensure posted methods are being run one by one instead of all at once. Doing this efficiently is quite tricky but, luckily, most of the really tricky stuff is already packaged into the ConcurrentQueue class. We just need to make sure that what we put in the queue gets run:

public sealed class ActorSynchronizationContext : SynchronizationContext {
    private readonly SynchronizationContext _subContext;
    private readonly ConcurrentQueue<Action> _pending = new ConcurrentQueue<Action>();
    private int _pendingCount;
    
    public ActorSynchronizationContext(SynchronizationContext subContext = null) {
        this._subContext = subContext ?? new SynchronizationContext();
    }

    public override void Post(SendOrPostCallback d, object state) {
        if (d == null) throw new ArgumentNullException("d");
        _pending.Enqueue(() => d(state));

        // trigger consumption when the queue was empty
        if (Interlocked.Increment(ref _pendingCount) == 1) 
            _subContext.Post(Consume, null);
    }
    private void Consume(object state) {
        var surroundingContext = Current;
        try {
            // temporarily replace surrounding sync context with this context
            SetSynchronizationContext(this);

            // run pending actions until there are no more
            do {
                Action a;
                _pending.TryDequeue(out a); // always succeeds, due to usage of _pendingCount
                a.Invoke(); // if an enqueued action throws... well, that's very bad
            } while (Interlocked.Decrement(ref _pendingCount) > 0);

        } finally {
            SetSynchronizationContext(surroundingContext); // restore surrounding sync context
        }
    }

    public override void Send(SendOrPostCallback d, object state) {
        throw new NotSupportedException();
    }
    public override SynchronizationContext CreateCopy() {
        return this;
    }
}

The above code manages running the consume method via the _pendingCount field. The _pendingCount field is atomically incremented after enqueuing actions, and atomically decremented after dequeuing (and invoking) actions. Initially, exactly one producer will be the one to increment _pendingCount from 0 to 1. They’re in charge of triggering the start of consumption. The consumer can only decrement from 1 to 0 if no more actions have finished being enqueued. It’s possible for an action to be in the queue when the decrement-to-0 happens, but only if there’s a producer about to increment from 0 to 1 and re-trigger consumption! The consumer can just stop when it looks like the queue is empty, even if it’s not!

(Self-indulgence: I was really happy when I thought of the above strategy for managing consumption. The way I used to approach the problem required the consumer to try to re-acquire before they could exit, which was ugly because a thread could technically be live-locked in an infinite loop of releasing, seeing a change, re-acquiring, and seeing the change had already been handled.)

There is a bit of boilerplate here. The actor context doesn’t support synchronous entrance (Send), can’t have CreateCopy returning a copy of the base class (a thread-pool context), and needs posted methods to see the actor context, instead of the underlying context, as the current synchronization context. There are also other methods that we could override, but they’re not documented very well and I’m not actually sure where they’re even used… (If anyone could enlighten me, that would be appreciated. What the heck is OperationCompleted for? Why is Wait virtual?).

In any case, now that we have our synchronization context, we want to use it.

Awaiting Contexts

At the moment, without a custom awaiter, we would need to nest the bodies of our actor methods inside of lambda expressions passed to their message queue’s post method. We can avoid such problem by making synchronization contexts awaitable, with the understanding that this means ‘enter the context’ instead of the usual ‘once this task is done, resume on the current context’ semantics of awaiting.

Making a class awaitable involves giving it a ‘GetAwaiter’ method. The GetAwaiter method can be a member method or an extension method, as long as the compiler can find it. The type returned by GetAwaiter must implement the INotifyCompletion interface, have an IsCompleted property, have an OnCompleted method, and have a GetResult method.

IsCompleted determines whether or not awaiting can be skipped. The result of tasks that are already complete can be extracted immediately, instead of registering a callback. OnCompleted registers a callback to run once the awaited thing is complete (if it’s already complete, the callback runs immediately). GetResult is used to get the value or rethrow the exception contained in the awaited thing, once it has completed. The type of GetResult corresponds to the type of value the awaited thing will contain. In our case it will be void.

There is more information on writing custom awaiters available elsewhere. For our purposes, what I’ve covered is enough to cobble something together:

public sealed class SynchronizationContextAwaiter : INotifyCompletion {
    private readonly SynchronizationContext _context;
    public SynchronizationContextAwaiter(SynchronizationContext context) {
        if (context == null) throw new ArgumentNullException("context");
        _context = context;
    }
    public bool IsCompleted {
        get {
            // always re-enter, even if already in the context
            return false;
        }
    }
    public void OnCompleted(Action action) {
        // resume inside the context
        _context.Post(x => action(), null);
    }
    public void GetResult() {
        // no value to return, no exceptions to propagate
    }
}
public static class SynchronizationContextExtensions {
    public static SynchronizationContextAwaiter GetAwaiter(this SynchronizationContext context) {
        if (context == null) throw new ArgumentNullException("context");
        return new SynchronizationContextAwaiter(context);
    }
}

With this code included in our project, we’re able to await our message queues (and other synchronization contexts). The compiler will take care of calling GetAwaiter and registering the “rest of the method” as the callback passed to the OnCompleted method. We don’t have to worry about those details anymore.

Using our new powers

Now that I’ve explained the underlying machinery, we can move on to a more complicated example. For some reason, people always use bank accounts for concurrency examples. So… I guess we’ll do that:

public sealed class BankAccountActor {
    private readonly ActorSynchronizationContext _messageQueue
        = new ActorSynchronizationContext();
    
    private decimal _balance;

    public async Task<decimal> CheckBalanceAsync() {
        await _messageQueue;
        return _balance;
    }
    private void WriteToTransactionLog(object entry) {
        // ... save to persistent storage ...
        throw new NotImplementedException();
    }
    public async Task<IReadOnlyList<object>> GetTransactionLogAsync() {
        await _messageQueue;
        throw new NotImplementedException();
    }

    public async Task<decimal> DepositAndGetNewBalanceAsync(decimal amount, object transactionId) {
        // note: any thrown exceptions will be packaged into the resulting task
        if (amount <= 0) throw new ArgumentOutOfRangeException("amount", "amount <= 0");
        await _messageQueue;
        
        var newBalance = _balance + amount;
        _balance = newBalance;

        WriteToTransactionLog(new {type = "deposit", id = transactionId, amount});
        return _newBalance;
    }

    public async Task<decimal> WithdrawAndGetNewBalanceAsync(decimal amount, object transactionId) {
        if (amount <= 0) throw new ArgumentOutOfRangeException("amount", "amount <= 0");
        await _messageQueue;
        
        var newBalance = _balance - amount;
        if (newBalance < 0) {
            WriteToTransactionLog(new { type = "failed withdrawal", id = transactionId, amount });
            throw new InsufficientFundsException(_balance, amount);
        }
        _balance = newBalance;

        WriteToTransactionLog(new { type = "successful withdrawal", id = transactionId, amount });
        return _balance;
    }
}

Lets review three things this example doesn’t get wrong.

  1. The balance is stored as a decimal, instead of a float or double. This ensures that, if you start with a balance of 1$, and withdraw 10¢ ten times, you don’t end up with a non-zero balance (like -0.0000015¢, enjoy those fees!). This is absolutely necessary if you’re going to be handling money.
  2. Transactions are logged to persistent storage before they are reported as completed. If the system crashes, it’s possible to reconstruct what was going on as well as whether or not withdrawals/deposits/etc completed or not. This is absolutely necessary if you’re going to be handling money.
  3. The deposit/withdraw operations return the new balance. Having access to a snapshot of the immediately-after and/or of the immediately-before state is often extremely useful. For example, Interlocked.CompareExchange returns the previous value in the referenced location, allowing you to determine whether and why the exchange did, or did not, occur.

Of course, I still have to implement the most important operation for banks to support: transfer funds from one account to another. But this makes more sense on a ‘bank actor’ than the bank account actors, and introduces a complication: the transfer can’t be done atomically.

If we were using locks, then we could acquire the locks of both accounts (but not in a potentially reversed order!), ensuring the system can never be viewed in a state where the money is in neither account. This is tempting, but remember that this approach would break as soon as we wanted to do a bank-to-bank transfer, from one computer system to another. The actor model may not allow us to atomically transfer funds, but neither does reality. There’s an unavoidable communications delay, where the withdrawn funds have yet to be deposited.

Since the transfer can’t be done atomically, we need a different strategy to ensure that money is not lost. Actually, the account implementation already has one: logging. All of the work is basically done already:

// inside a BankActor ...
public async Task TransferAsync(BankAccountActor source,
                                BankAccountActor destination,
                                decimal amount,
                                object transactionId) {
    if (source == null) throw new ArgumentNullException("source");
    if (destination == null) throw new ArgumentNullException("destination");
    if (amount <= 0) throw new ArgumentOutOfRangeException("amount", "amount <= 0");
    await _messageQueue;

    // note: if the source has insufficient funds, the exception will be propagated
    // note: the source will write the withdrawal to persistent storage
    // note: after awaiting, we will be posted back onto the message queue automatically
    await source.AttemptWithdrawAndGetNewBalanceAsync(amount, id);
        
    // note: the destination will write the deposit to persistent storage
    // note: technically we should be logging here to, but it would clutter the example
    await destination.DepositAndGetNewBalanceAsync(amount, id);
}

If this method were to crash at any point, the logging from the bank accounts would make it possible to reconstruct the state of the transfer. Is there no log entry of the withdrawal in the source's account logs? Then the transfer didn't even get started. Is there a log entry in the source account's log, but not in the destination? Then the transfer was incomplete when the system crashed, and needs to be rolled back or completed. Is there a log entry in the source account's log and also in the destination account's log? Then the transfer completed successfully, and nothing needs to be done.

Note that it takes time to get used to "thinking with actors". The best approaches to problems change, especially if your old solution involved multiple locks. For example, because actors act like distributed systems, time and ordering become more complicated (see: vector clocks).

(Unfortunately, I can't start discussing and explaining all of the interesting intricacies of using actors or we'd be here for days!)

Summary

The actor model is a way of approaching concurrency that is highly reliable and distributable. We can emulate actors in C#, in a very concise way, by taking advantage of the new async/await functionality.

Each actor becomes an instance of a class with a message queue (ActorSynchronizationContext). You "send a message" to the instance by invoking one of its public asynchronous methods, and "receive a message" back by awaiting the result. A couple custom types make this possible.

---

Discuss on Reddit, Hacker News

---


Twisted Oak Studios offers consulting and development on high-tech interactive projects. Check out our portfolio, or Give us a shout if you have anything you think some really rad engineers should help you with.

Archive