Useful/Interesting Methods #1: Observable.WhenEach

posted by Craig Gidney on October 23, 2012

This series of posts is inspired by a post by Jon Skeet, where he explains a method I suggested and called “OrderByCompletion”. Each post will be about some conceptually simple and interesting function that I implemented as part of my day to day programming.

Code and tests for each method in the series will be published to the GitHub repository Strilanc/Methods.

Reactive Extensions for .Net is great, and I’ve been using it more and more in the UI code I write. It’s just downright better than events, especially with respect to keeping the code for a feature or functionality isolated to a single place. Recently, I found myself with an observable sequence of observables representing mouse drags, and I wanted to observe the drop points. Basically this situation:

static IObservable<Point> GetDragDropPoints(IObservable<IObservable<Point>> drags) {
    IObservable<Task<Point>> eventualDropPoints = drags.Select(pts => pts.LastAsync().ToTask());
    IObservable<Point> result = ???
    return result;
}

Keep in mind that drags can overlap (e.g. consider a multi-touch screen) and can be cancelled (e.g. another app steals focus).

I’m reasonably sure the method I want doesn’t exist, although there are a lot of methods in the Rx library that almost do what I need. The closest is Observable.Concat, which takes an observable sequence of tasks and observes their results. However, the results are forced to be in the same order as the tasks are received, meaning overlapping drags would work incorrectly. Also, Observable.Concat (and almost all of the other candidates) combine awaiting completion with unwrapping the result, forcing exceptions and cancellation to be propagated into the observable (killing it). Actually, even if it does already exist, I want to implement something fun so lets dive in.

The method I want to implement is a lot like the Enumerable.OrderByCompletion method from Jon Skeet’s post. Both conceptually take a streams of tasks and “re-order” the streams to make consuming the tasks easier. However, an Observable.OrderByCompletion method is basically a waste of time to write because users using it end up in the same situation as before. They still have to await the tasks before using the result. The fact that the tasks will happen to complete in order accomplishes basically nothing in terms of code differences. I want a bit more: a method that takes advantage of the asynchronous nature of observables to delay forwarding observed tasks until they’re actually completed.

Implementation

First, the hardest part: naming. Existing task methods that operate on collections are called “WhenAll” and “WhenAny”. The method I want is conceptually similar, but it forwards observed tasks when each is ready. To make it sound similar to the other task methods and also somewhat self-descriptive, I will call it “WhenEach”. With name in hand, we can move on to the actual implementation.

When implementing or consuming an observable there’s two important considerations to be conscious of. First, observables may be concurrent (in Rx 2.0). Multiple results may be arriving concurrently (and in our case those results are tasks that may also be completing concurrently) and we should try to forward them concurrently as well. Second, observables must never send more items after sending completion/faulted. In the context of WhenEach, these two considerations are what force the most code. We must delay completing/faulting until all the tasks we’ve seen have completed and been forwarded, and ideally we should do it without synchronizing the forwarding of results.

Alright, that’s enough summarizing and preparing. Code:

///<summary>Forwards tasks from the underlying observable, after they've completed, potentially out of order.</summary>
public static IObservable<T> WhenEach<T>(this IObservable<T> observable) where T : Task {
    if (observable == null) throw new ArgumentNullException("observable");
    return new AnonymousObservable<T>(observer => {
        if (observer == null) throw new ArgumentNullException("observer");

        // what to do when last task finishes
        Action sendDone = observer.OnCompleted;
        // number of 'pendings', including the unfinished observable and unfinished tasks
        var pendingCount = 1;
        Action markOnePendingCompleted = () => {
            if (Interlocked.Decrement(ref pendingCount) == 0) 
                sendDone();
        };

        return observable.Subscribe(
            task => {
                if (task == null) {
                    observer.OnNext(null);
                    return;
                }
                Interlocked.Increment(ref pendingCount);
                task.ContinueWith(x => observer.OnNext(task),
                         TaskContinuationOptions.ExecuteSynchronously)
                    .ContinueWith(x => markOnePendingCompleted(),
                         TaskContinuationOptions.ExecuteSynchronously);
            },
            ex => {
                sendDone = () => observer.OnError(ex);
                markOnePendingCompleted();
            },
            markOnePendingCompleted);
    });
}

As the code hints at but does not really explain, completion is tracked/delayed by using a ‘pending count’ modified with atomic operations. The underlying observable contributes 1 pending count, which is taken away when it completes or faults. Each observed task also contributes a pending count, and then takes it away after the task completes and has been forwarded. When the pending count hits 0, we forward the underlying observable’s completion (or fault). Otherwise the code is boring null precondition checks, a special case to forward null tasks, a re-assign-action trick to avoid duplicate code in the OnError and OnComplete callbacks, and ensuring observer.OnNext incorrectly throwing an exception doesn’t ruin the pending count.

Example

To demonstrate how the method works, consider this example code:

var r = new Random();
Enumerable.Range(0, 100)
    .Select(e => Task.Delay(TimeSpan.FromSeconds(e * 0.1 + r.NextDouble()))
                        .ContinueWith(x => e))
    .ToObservable()
    .WhenEach()
    .Where(e => e.IsCompleted)
    .Select(e => e.Result)
    .Subscribe(Console.WriteLine);

What do we expect this to do? Well, it generates 100 tasks with delayed results. The base delay increases linearly, but a random offset is included to cause some of the tasks to be out of order with respect to the duration of their delay. Those tasks are observed by WhenEach, which forwards them as they complete to a filter that verifiers they’ve completed, and then their result is forwarded to the console. We expect the output to be the numbers from 0 to 99 in a globally increasing but locally random order. I ran this code and…

1
4
0
2
9
3
7
5
6
8
11
14
10
13
15
12
18
17
23
*snip*
... 

Cool! It seems to work.

Summary

Observable.WhenEach doesn’t have quite the same “that’s impossible!” vibe as OrderByCompletion. The implementation is straightforward, without any conceptual leaps. But… I like it all the same. I hope you find it useful. I certainly did:

return drags
    .Select(pts => pts.LastAsync().ToTask());
    .WhenEach()
    .Where(e => !e.IsCancelled) // note: should never be faulted, fail in that case
    .Select(e => e.Result);

If this method or a better variant already exists, please tell me. Rx is a gigantic library and I often find myself reinventing the wheel when I’m not careful. For awhile I even assumed Rx didn’t work in windows store projects, because windows store projects can’t referencing normal class libraries, but naturally the Rx devs are good at their job so it’s a portable library.

Comments on Reddit


Twisted Oak Studios offers consulting and development on high-tech interactive projects. Check out our portfolio, or Give us a shout if you have anything you think some really rad engineers should help you with.

Archive

More interesting posts (14 of 33 articles)

Or check out our Portfolio.