http://blogs.clariusconsulting.net/kzu

Daniel Cazzulino's Blog

Go Back to
kzu′s Latest post

A general purpose event bus leveraging Reactive Extensions

I talked before on the usefulness of having rich events published throughout your domain. In that blog post, I also showed a very simple implementation.

I realized that there seems to be quite some interest in the event  (or message) bus pattern as an application architecture pattern and not necessarily in the usual enterprise integration style. This means that you leverage the concept of an event bus for internal application decoupling (along the lines of an Event Collaboration), and not for integration with external systems. The difference is that everything happens in-memory, there is no intermediate persistent message queue, there are not message/event routers, partition, scalability requirements or anything like that. It’s a very simple way to make your components more loosely coupled.

In this post I’ll introduce a minor tweak to the original implementation of an event stream (I wanted to remove the Bus word from it as it confuses people that think it’s an enterprise style message bus) to make it more useful and general purpose.

To recap: the event stream is consumed by subscribers by simply subscribing to the desired event type:

var subscription = eventStream.Of<ProductPurchased>().Subscribe(….)

Or by performing a more complicated reactive query over the events:

var query =
    from discharged in events.Of<PatientLeftHospital>()
    from admitted in events.Of<PatientEnteredHospital>()
    where
        admitted.PatientId == discharged.PatientId &&
        (admitted.When - discharged.When).Days < 5
    select admitted;

var subscription = query.Subscribe(...)

My initial implementation would only invoke subscribers that register for the specific event being pushed through the stream:

partial class EventStream : IEventStream
{
    private ConcurrentDictionary<Type, object> subjects = new ConcurrentDictionary<Type, object>();

    public void Push<TEvent>(TEvent @event)
    {
        Guard.NotNull(() => @event, @event);

        var subject = this.subjects.Find(@event.GetType()) as Subject<TEvent>;
        if (subject != null)
            subject.OnNext(@event);
    }

    public IObservable<TEvent> Of<TEvent>()
    {
        return (IObservable<TEvent>)subjects.GetOrAdd(typeof(TEvent), type => new Subject<TEvent>());
    }
}

The Find extension method on ConcurrentDictionary simply does a TryGetValue on it and returns null if not found. We need to be smarter there to make this event stream more useful.

It’s probably that we’ll want to subscribe to more generic events (say for tracing purposes, for some global statistics, etc.), so you should be able to subscribe to (say) EventArgs and get all events that derive from it. Or do it based on interfaces, etc.

The change to generalize it is pretty simple too: we just make the Push method look for all subjects that have compatible event types:

public void Push<TEvent>(TEvent @event)
{
    Guard.NotNull(() => @event, @event);

    var eventType = @event.GetType();

    var compatible = subjects.Keys
        .Where(subjectEventType => subjectEventType.IsAssignableFrom(eventType))
        .Select(subjectEventType => subjects[subjectEventType]);

    foreach (dynamic subject in compatible)
    {
        subject.OnNext((dynamic)@event);
    }
}

We cast the subjects from object to dynamic so that we can do a dynamic invocation, because we don’t have a common interface for all. They all implement ISubject<T>. We also need to make the event itself dynamic, so that the runtime will pick the right method signature given both the subject type and the event type (might involve covariant cast, etc.).

 

In my opinion, the coolest part of having this type of event bus is that you can now layer on top the concept of schedulers (i.e. introduce async event listeners, or even create your own custom scheduler and perform crazy things like out of process queue-based events if you wanted) with no effort and keeping the same simple public interface.

Both the IEventStream and EventStream implementation are very straightforward and simple. I have put them together as source code NETFx nugets so you can get any updates to this post or future posts concerning these classes in the future. They are separate because in your application you may have a separate assembly for all your interfaces and separate ones for the implementations. In that case, you’ll install the interface nuget in the former, and the other in one of the implementation ones. For convenience, I also made an aggregated package that installs both interface and implementation on the same project. To make either the interface or the implementation public (they are both partial internal by default, to avoid increasing your API surface unnecessarily), you just add a partial declaration marking them public. The interface nuget even comes with a Visibility.cs file which you can just uncomment to get make the interface public:

// In order to make types introduced by this package public
// uncomment the following:

//namespace System.Reactive
//{
//    public partial interface IEventStream { }
//}

 

Enjoy!

Comments

5 Comments

  1. Sounds much like a message passing system like MPI to allow concurrent processing of massive amounts of data. The intial posts looked more at Event Sourcing for a database backend which by the way reminds me to SQL server transaction logs which databases use since decades.
    If you combine both you get a system which can store an massive event stream to a persistent store and use an parallel automated approach to resolve conflicts and check consistency of the event stream with the goal to dump the event stream and create the actual “clean” objects out of the stream and keep some of them for auditing and tracing and some statistics generation….

  2. Yes, you could use it for that. I’ll also show next how you can use it to free you from the rigidness of database schemas without having to throw your ORM/SQL solution to the trash can…

  3. I love the subtle hints of renal failure each flavored coffee offers.

  4. Hi Daniel,

    I’m just starting out with Rx and an in memory message bus was one of the first things I wanted to try. I have a question, how do you typically raise the events from within your domain objects?

    I’m used to doing something like “DomainEvents.Raise(new PatientAdmittedEvent())” from within a domain object. This would internally invoke an event bus (we don’t use standard .NET events).

    I saw on one of your future posts that you read all the events from a domain object after making changes and publish these to the Event Stream. The thing I don’t like about this approach is that I would not want to publish every event that occurs within the domain object.

    Many thanks,
    Ben