Inside F#

Brian's thoughts on F# and .NET

An RSS Dashboard in F#, part two (IObservables)

Posted by Brian on December 19, 2009

Previous posts in this series:

Last time I introduced my RSS Dashboard app.  Today I’ll cover the first technology piece of the app: IObservables.  I’ll discuss what IObservables are and show code to create a ‘source’ of observations and hook up clients to listen to that source.

(As an introductory aside, throughout my prose I tend to use a few sets of words synonymously.  I use “event” or “observation” or “message” to refer to an OnXXX call.  I use “source” or “observable” to refer to an originator of a series of these events, which I sometimes call an “event stream”.  And I use “listener” or “observer” or “subscriber” to refer to a consumer of these events.  I use these words interchangeably simply to break up otherwise monotonous prose, not to convey individual/differentiated meanings among these terms.)

Overview of IObservable

At core, IObservable is just about two new small interfaces, IObservable<T> and IObserver<T>, being added to .Net 4.0.  We’ll come to the details of those interfaces shortly.  What is exciting and useful about IObservable is that these interfaces admit very nice programming models, including LINQ (e.g. for C#) and the Observable module (for F#), that provide useful combinators for transforming and using event streams.  The Reactive Extensions for .Net (a.k.a. “Rx”) from DevLabs is a library that realizes this programming model.  From that web site:

Rx is a superset of the standard LINQ sequence operators that exposes asynchronous and event-based computations as push-based, observable  collections via the new .NET 4.0 interfaces IObservable<T> and IObserver<T>.  These are the mathematical dual of the familiar IEnumerable<T> and IEnumerator<T> interfaces for pull-based, enumerable collections in the .NET framework.

The IEnumerable<T> and IEnumerator<T> interfaces allow developers to create reusable abstractions to consume and transform values from a wide range of concrete enumerable collections such as arrays, lists, database tables, and XML documents. Similarly, Rx allows programmers to glue together complex event processing and asynchronous computations using LINQ queries over observable collections such as .NET events and APM-based computations, PFx concurrent Task<T>,  the Windows 7 Sensor and Location APIs, SQL StreamInsight temporal event streams , F# first-class events, and async workflows.

Just as the IEnumerable interface enables you to easily write a C# LINQ query or an F# pipeline to say, for instance, “for each integer in this collection, where the value is in some range, print that number”, you can similarly use the IObservable interface over an event stream, for instance “for each mouse click in my app, where the click is in this region, pop up this window”.  That is, you can treat asynchronous event streams like collections, and write the same kind of code you would write to iterate over each element of a collection, to react to each event in a stream.

The IObservable<T> and IObserver<T> interfaces

Here is how the interfaces appear in F#:

type IObserver<'T> =
    abstract OnNext : value : 'T -> unit
    abstract OnError : error : exn -> unit
    abstract OnCompleted : unit -> unit

type IObservable<'T> =
    abstract Subscribe : observer : IObserver<'T> -> System.IDisposable

An IObserver<T> is an object that is listening for events of type T.  Each time a new event happens, OnNext() is called with that value.  If the event stream ends, then OnCompleted() is called.  If there is an error, OnError() is called with the exception.  If you think of the array containing the numbers 1 to 10 as an IObservable, then an IObserver subscribed to that array would get the series of calls OnNext(1), OnNext(2), … OnNext(10), OnCompleted().  The “messaging contract” of the IObserver interface is that you’ll get 0 or more OnNext() calls, optionally followed by at most one OnError() or OnCompleted() call.  This is analogous to an IEnumerable dual (in a foreach loop, you’ll iterate over 0 or more elements, and either end successfully by finishing the whole collection, or unsuccessfully by throwing an exception (or possibly not at all if the underlying enumerator gets stuck in an infinite loop)).

An IObservable<T> is an object that is a source of events of type T, to which listeners can subscribe.  The Subscribe() method returns an IDisposable object that is used to unsubscribe – if a IObserver wants to stop listening, it calls Dispose() on that object to request to be disconnected.

An example

Let’s consider an example that shows how to hook up bits of code to listen to an observable.  For the moment, we’ll assume we have an “ObservableSource” class, which is a source of observable events with a straightforward API.  Shortly, we’ll look at how to implement that class.

Here’s an example that demonstrates wiring up a couple observers which I’ll refer to as A and B; I’ll walk through the interesting bits.

// create a source
let source = new ObservableSource<int>()

// get an IObservable from the source
let obs = source.AsObservable 

// add a simple subscriber
let unsubA = obs |> Observable.subscribe (fun x -> printfn "A: %d" x)

// send some messages from the source
source.Next(1)    // A: 1
source.Next(2)    // A: 2

// add a more interesting subscriber
let unsubB =
    obs
    |> Observable.filter (fun x -> x%2=0) // only evens
    |> Observable.subscribe (fun x -> printfn "B: %d" x)

// send more messages from the source
source.Next(3)    // A: 3
source.Next(4)    // A: 4    B: 4

// have subscriber A unsubscribe
unsubA.Dispose()
    
// send more messages from the source
source.Next(5)    //
source.Next(6)    // B: 6

First we create an ObservableSource<int> and get an IObservable<int> out of it, which I call ‘obs’.  We know from the definition of IObservable I showed before that I could call .Subscribe() on it and pass an IObserver<int>.  Since in practice it is often the case that subscribers will only be interested in OnNext() calls (and just be happy to ignore OnError/OnCompleted calls), there is a method in the F# Observable module that does just this.  Observable.subscribe() takes a function as an argument, and subscribes to an IObservable with an IObserver that runs the passed-in function for each OnNext() call, but does nothing for the OnError() or OnCompleted() calls.  In this instance, I pass a lambda that will print “A: n” for whatever n the observable sends.  Recall that subscribing returns an IDisposable object that can be used to unsubscribe; I call this object ‘unsubA’, since I can use it to unsubscribe listener A.

Now that someone is listening, let’s create some observations.  The API on my ObservableSource class has a method Next() that publishes a value to the observable.  So when Next(1) is called, my listener gets the message and “A: 1” is printed.  Similarly for “A: 2”.  Nothing too exciting yet.

Next I wire up a second listener, B.  This one will print “B: n”, but before we subscribe, we run a filter that screens things out so that B will only see those observations whose values are even integers (x mod 2 = 0).  The analogies with sequences should be clear; just as you can use Seq.filter on an IEnumerable, you can use Observable.filter on an IObservable.  (And the analogy carries to C#, where one could use Enumerable.Where() or a LINQ query with ‘where’ on IEnumerables, or use Observable.Where() or a LINQ query with ‘where’ on IObservables using Rx.)  This particular example is trivial. but it gives a taste of what makes the programming model so powerful; we can use the same kinds of query transforms/combinators we use on IEnumerables on IObservables.

So now when I call Next(3), A gets the message, but it’s filtered out before reaching B.  But then when I call Next(4), both A and B get that message.

By calling unsubA.Dispose(), I detach listener A.  Thus Next(5) is not sent to A, and is filtered out for B.  Next(6) reaches B.

So there you go, now you have a sense of what it’s like to write a tiny bit of code using observables.  Next let’s take a look at how to write a source.

An ObservableSource class (first try)

To get a feel for how to author a source, here’s a small class that shows how.  This first example does not deal with issues of thread-safety, exceptions, or the IObserver messaging contract, but serves as a useful skeleton on which to build such a class (we’ll see this shortly).  Here’s the code:

type ObservableSource<'T>() =
    let mutable key = 0
    let mutable subscriptions = Map.empty : Map<int,IObserver<'T>>
    let next(x) = 
        subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> v.OnNext(x))
    let completed() = 
        subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> v.OnCompleted())
    let error(e) = 
        subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> v.OnError(e))
    let obs = 
        { new IObservable<'T> with
            member this.Subscribe(o) =
                let k = key
                key <- key + 1
                subscriptions <- subscriptions.Add(k, o)
                { new IDisposable with 
                    member this.Dispose() = 
                        subscriptions <- subscriptions.Remove(k) } }
    member this.Next(x) = next x
    member this.Completed() = completed()
    member this.Error(e) = error e
    member this.AsObservable = obs

This is pretty straightforward.  The main job of such a source is to keep a record of all the subscribers, so it knows who is listening that it needs to forward each message to.  I keep a Map (recall that an F# Map is the immutable/persistent version of a Dictionary) of all the subscribers, where I give each subscriber a unique integer key.  When someone subscribes, I add that IObserver into the subscriptions Map under the current key ‘k’, and then increment the next key ‘key’ (so that each subscriber gets a different key).  An IDisposable is returned that, when called, removes that subscriber with key ‘k’.  When someone calls Next(x), I iterate over all the subscribers and call OnNext(x), and similarly for Completed() and Error().  The IObservable object itself is offered up as a property on the source object.

This little source class is sufficient to use for the example code of the previous subsection.  However to make it really robust, there is a bit to be fleshed out.  Let’s take a look at things that could go wrong, and how we can better ensure that things go right.

The “contracts” of IObservables

In the ObservableSource class of the previous section, there are three kinds of potential errors that the code does nothing to prevent.  First, the code does nothing to try to enforce the IObservable “messaging contract”.  A user of the class could call Completed() twice, for example, and the class would not say boo.  The class could keep track of whether Completed() or Error() has been called yet, as it is an error on the part of the user to try to call Next/Completed/Error after the observable has ‘finished’.

Second, the code does nothing to handle exceptions from the IObservers that subscribe.  In fact that is proper – there is no reasonable kind of error handling that can be done when an IObserver method throws – the source has no idea what a subscriber may be up to.  It is in fact an programming error for an IObserver method to throw, and in many cases this is likely to lead the app crashing (dying is awesome).  I could detect this condition as well.

Finally, the class is not thread-safe.  There’s no documentation about how the class is intended to be used, nor information about the threading contracts that subscribing IObservers ought to expect.  Now it turns out that IObservables can serve up a variety of threading contracts to their IObserver subscribers, and the IObservable interface itself is agnostic on this point.  So it is useful for individual IObserver objects to document the threading model they provide to IObservers.  The two most useful contracts in practice are what I’ll label “serialized” and “UI Thread”:

  • “serialized” – though calls to OnNext/OnCompleted/OnError may be made on any thread, each call is serialized in the sense that the next call never arrives before the prior call completes.  That is, you may have OnNext be called on thread 1, and then finish, and then have another OnNext call on thread 2.  But you won’t have OnNext get called on thread 1, and then while still inside the body of OnNext, have OnNext get called again on thread 2 (concurrency).  And you won’t have OnNext get called on thread 1, and then have OnNext re-enter on thread 1 before the first call completed (reentrancy).  Dealing with either concurrency or reentrancy is a burden to IObservers, and so it is useful for IObservables to guarantee these conditions won’t happen.
  • “UI Thread” – all calls to OnNext/OnCompleted/OnError will come on the UI thread and not be reentrant.  This is the “strongest” threading contract, and it useful in a variety of scenarios involving GUI apps.

In the case of my ObservableSource class, I wish to aim for the “serialized” contract.

In addition to the threading “contract” issues, the code in the previous section also contains simple races.  If two concurrent Subscribe() calls come along, then it’s possible for two different subscribers to get the same key.  It’s also possible to “lose” one of the subscribers when updating the subscribers Map.  And if a Subscribe() call is concurrent with an unsubscribing Dispose() call, it’s also possible to screw up the subscribers Map.  These are the standard problems one encounters when using mutable state.

To sum up then, the potential problems all involve the “contracts” at the interface boundaries: the IObserver messaging contract, the IObserver exception contract, the IObserver expected-threading contract, and the threading contract of the ObservableSource class itself.

A real ObservableSource class

Let’s deal with all the issues identified in the previous section.

To deal with the IObserver “messaging contract” issue, I’ll have the ObservableSource class keep track of whether the IObserver is finished (if Completed or Error has been called), and assert if one of the messaging methods is called after it’s finished.

To deal with the IObserver “exception contract” issue, I’ll have the ObservableSource class assert if any of its subscribers throws.

To deal with the IObserver “expected-threading contract”, I’ll document that the user of the ObservableSource class must call Next/Completed/Error in a serialized fashion, and that subscribers can expect the serialized contract.

To deal with the races due to the mutable state inside the ObservableSource class, I’ll add locking around the critical sections.

The result is this improved class:

/// Utility class for creating a source of 'serialized' IObserver events.
type ObservableSource<'T>() =
    let protect f =
        let mutable ok = false
        try 
            f()
            ok <- true
        finally
            Debug.Assert(ok, "IObserver methods must not throw!")
    let mutable key = 0
    // Why a Map and not a Dictionary?  Someone's OnNext() may unsubscribe, so 
    // we need threadsafe 'snapshots' of subscribers to Seq.iter over
    let mutable subscriptions = Map.empty : Map<int,IObserver<'T>>
    let next(x) = 
        subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> 
            protect (fun () -> v.OnNext(x)))
    let completed() = 
        subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> 
            protect (fun () -> v.OnCompleted()))
    let error(e) = 
        subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> 
            protect (fun () -> v.OnError(e)))
    let thisLock = new obj()
    let obs = 
        { new IObservable<'T> with
            member this.Subscribe(o) =
                let k =
                    lock thisLock (fun () ->
                        let k = key
                        key <- key + 1
                        subscriptions <- subscriptions.Add(k, o)
                        k)
                { new IDisposable with 
                    member this.Dispose() = 
                        lock thisLock (fun () -> 
                            subscriptions <- subscriptions.Remove(k)) } }
    let mutable finished = false
    // The source ought to call these methods in serialized fashion (from
    // any thread, but serialized and non-reentrant)
    member this.Next(x) =
        Debug.Assert(not finished, "IObserver is already finished")
        next x
    member this.Completed() =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        completed()
    member this.Error(e) =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        error e
    // The IObservable object returned here is threadsafe; you can subscribe 
    // and unsubscribe (Dispose) concurrently
    member this.AsObservable = obs

Now the code deals with all these issues.  Much of the burden still falls to the user/‘driver’ of the ObservableSource object, but now at least the documentation is clear about the contract between the source and the listeners, and there are some asserts to make it clear when things are running afoul.

This class was a bit of effort for me to write, but in doing so I was forced to learn all about the IObserver/IObservable interfaces and their contracts.  And now I have a useful reusable class to show for it.  The RSSDashboard app is an opportunity for me to learn new technologies, and I wanted to ensure that I got a thorough understanding of these guys.

Next time

Now that we have the basics of observables down, we’re ready to create “observable feeds”, that is, objects that listen to RSS feeds and publish their results as IObservables.  More useful reusable components ahead!

Leave a comment