Introducing RX For Collections

Dynamic Data

Dynamic Data is a portable class library that brings the power of reactive (rx) to collections.

A collection that mutates can have ads, updates, and removes. Out-of-the-box, rx does nothing to manage any changes in a collection. That is why Dynamic Data exists. In Dynamic Data, collection changes are notified via an observable changeset that is the heart of the system. An operator receives these notifications and then applies some logic and subsequently provides its own notifications. In this way, operators can be chained together to apply powerful and often very complicated operations with some very simple fluent code.

The benefit of at least 40 operators, which are borne from pragmatic experience, is that the management of in-memory data becomes easy and it is no exaggeration to say it can save thousands of lines of code by abstracting complicated and often repetitive operations.

Getting started

As stated in the blurb, Dynamic Data is based on the concept of an observable changeset. The easiest way to create one is directly from an observable.

IObservable<T> myObservable;
IObservable<IEnumerable<T>> myObservable;

// Option 1: Create an observable where items are identified using the hash code.
var mydynamicdatasource = myObservable.ToObservableChangeSet();

// Option 2: Specify a key like this.
var mydynamicdatasource = myObservable.ToObservableChangeSet(t => key);

The problem with the preceding is the collection will grow forever so there are overloads to specify size limitations or expiry times (not shown).

To have much more control over the root collection then we need a local data store that has the requisite crud methods. Like the preceding, the cache can be created with or without specifying a key as in the following.

// 1. Create a cache where items are identified using the hash code.
var mycache = new SourceCache<TObject>();

// 2. Or specify a key like this
var mycache = new SourceCache<TObject, TKey>(t => key);

One final out-of-the-box way to create an observable change set is if you are doing UI work and have an observable collection, you can do this.

var myObservableCollection = new ObservableCollection<T>();

// Option 1: Create a dynamic data source using hash code for item identification
var myDynamicDataSource = myObservableCollection.ToObservableChangeSet();

// Option 2: Specify a key selector to identify items in the dynamic data source
var myDynamicDataSource = myObservableCollection.ToObservableChangeSet(t => key);

One other point worth making here is any steam can be covered as a cache.

var myCache = someDynamicDataSource.AsObservableCache();

This cache has the same connection methods as a source cache but is read-only.

Examples

Now you know how to create a Dynamic Datastream, here are a few quick-fire examples based on the assumption that we already have an observable changeset. In all of these examples the resulting sequences always exactly reflect the items in the cache, in other words, adds, updates, and removes are always propagated.

Example 1. filters a stream of live trades, creates a proxy for each trade, and orders the result by the most recent first. As the source is modified the observable collection will automatically reflect changes.

// Dynamic data has its own take on an observable collection (optimized for populating)
var list = new ObservableCollectionExtended<TradeProxy>();

var myOperation = someDynamicDataSource
    .Filter(trade => trade.Status == TradeStatus.Live)
    .Transform(trade => new TradeProxy(trade))
    .Sort(SortExpressionComparer<TradeProxy>.Descending(t => t.Timestamp))
    .ObserveOnDispatcher()
    .Bind(list)
    .DisposeMany()
    .Subscribe();

Also, TradeProxy is disposable and DisposeMany() ensures items are disposed of when no longer part of the stream.

Example 2. produces a stream that is grouped by status. If an item changes status it will be moved to the new group and when a group has no items the group will automatically be removed.

var myOperation = someDynamicDataSource
    .Group(trade => trade.Status) // This is NOT Rx's GroupBy
    .Subscribe(changeSet => // do something with the groups
    {
        // Handle the changeSet or perform operations on grouped data
    });

Example 3. Suppose I am editing some trades and I have an observable on each trade that validates but I want to know when all the items are valid then this will do the job.

IObservable<bool> allValid = somedynamicdatasource
    .TrueForAll(trade => trade.IsValidObservable, (trade, isValid) => isValid);

This operator flattens the observables and returns the combined state in one line of code.

Example 4. will wire and un-wire items from the observable when they are added, updated, or removed from the source.

var myOperation = someDynamicDataSource.Connect()
    .MergeMany(trade => trade.ObservePropertyChanged(t => t.Amount))
    .Subscribe(observableOfAmountChangedForAllItems =>
    {
        // Do something with IObservable<PropertyChangedEventArgs>
    });

More Information

I could go on endlessly but this is not the place for the full documentation. I promise this will come but for now, I suggest downloading my WPF sample app (links above) since I intend it to be a "living document" and I promise it will be continually maintained.


Similar Articles