Asynchronous Message Router in C#

Introduction


This article introduces some techniques in developing applications that can route data / messages asynchronously to more than recipient. Following are the famous GOF patterns that are used in this article.


1.       Observer Pattern

2.       Singleton Pattern


Technique Defined

 

Client will not send the messages to the recipient. Instead they send the messages to a Controller class, who notifies the recipients upon receiving the messages. Controllers are message routers. Message is considered to be the target object that the recipients would be interested in. Recipients are called "observers" who awaits message signals from the controller. To receive the messages, the observers should register themselves with the controller. Entire architecture works in a "publish-subscribe" model.

 

Below diagram explains the architecture.

 

 

Building the Code

 

1.       Abstract out the behaviors of all the observers.

public abstract class IObserver

{

public abstract void ProcessMessage(string strMessage);

}

 

2.       Subclass the observer implementations. I have defined an audit class that logs all the incoming messages. This will be one of the observers that are registered to the router class.

public class MessageLogger : IObserver

{

    private static StreamWriter sw = null;

    private string strTempFilePath = string.Empty;

 

    public MessageLogger()

    {

        string strTempFilePath = base.FilePath + "\\" + DateTime.Now.ToString("MMddyyyy");

        if (!Directory.Exists(strTempFilePath.Trim()))

            Directory.CreateDirectory(strTempFilePath.Trim());

        FilePath = strTempFilePath + "\\MessageLog.txt";

    }

 

    public override void ProcessMessage(string strMessage)

    {

        lock (this)

        {

            sw = new StreamWriter(base.FilePath, true);

            sw.WriteLine("[" + DateTime.Now.ToString("MM/dd/yyyy") + " " + DateTime.Now.ToLongTimeString() + "] " +

                "Message        : " + strMessage.Trim());

            sw.Flush();

            sw.Close();

        }

    }

 

}


3.      
Now let's construct the Router class.

 

It should expose a method for the observers to register themselves to get notified when the message arrives.

 

Router class should keep a holder (collection) to have all the observer's instances.

private static List<IObserver> lstObservers;

               

public void RegisterObserver(IObserver objObserverImpl)

{

   if (!lstObservers.Contains(objObserverImpl))

   {

      lstObservers.Add(objObserverImpl);

   }

}    

 

Next method would be exposing a method (PrcoessMessage(string)) for the client to communicate the incoming messages for routing.

 

delegate void RouterDelegate(string  strMessage);

 

public void PrcoessMessage(string strMessage)

{

RouterDelegate dlgtRouter = new RouterDelegate(this. NotifyMessage);

    dlgtRouter.BeginInvoke(strMessage, null, null); 

}

 

 

The above delegate refers to NotifyMessage() method which will be invoked asynchronously.  NotifyMessage() method would then notifies the observers registered sequentially.

 

private void NotifyMessage(string strMessage)

{

    foreach (IObserver observer in lstObservers)

    {

        observer.ProcessMessage(strMessage);

    }

}

               

 

4.       We should restrict the creation of multiple instances of the Router class because,

 

Router should maintain the observers' state and when a new message arrives it should be able to get the registered observers to send them the notification. Container is used to store the observers' handles. So this container should be declared static.

 

We will be using Singleton pattern to get the single instance.

 

private static object objLock = new object();

private static Router objRouter = null;

 

// to ensure no one is creating an object

private Router()

{

    lstObservers = new List<IObserver>();

}

 

// the public Instance property everyone uses to access the Router

public static Router Instance

{

    get

    {

        // If this is the first time we're referring to the singleton object, the private variable will be null.

        if (objRouter == null)

        {

            /* for thread safety, lock an object when instantiating the new Router object. This prevents

             * other threads from performing the same block at the same time.

             */

            lock (objLock)

            {

                objRouter = new Router ();

            }

        }

        return objRouter;

    }

}

 

 

Putting all together…..

 

public class Router

{

        private static List<IObserver> lstObservers;

        private static object objLock = new object();

        private static Router objRouter = null;

 

delegate void RouterDelegate(string  strMessage);

 

        // to ensure no one is creating an object

        private Router()

        {

            lstObservers = new List<IObserver>();

        }

 

        // the public Instance property everyone uses to access the Router

public static Router Instance

{

    get

    {

        // If this is the first time we're referring to the singleton object, the private variable will be null.

        if (objRouter == null)

        {

            /* for thread safety, lock an object when instantiating the new Router object. This prevents

             * other threads from performing the same block at the same time.

             */

            lock (objLock)

            {

                objRouter = new Router ();

            }

        }

        return objRouter;

    }

}

 

 

public void RegisterObserver(IObserver objObserverImpl)

{

   if (!lstObservers.Contains(objObserverImpl))

   {

      lstObservers.Add(objObserverImpl);

   }

}

 

public void ProcessMessage(string strMessage)

{

RouterDelegate dlgtRouter = new RouterDelegate(this. NotifyMessage);

    dlgtRouter.BeginInvoke(strMessage, null, null); 

}

 

 

private void NotifyMessage(string strMessage)

{

    foreach (IObserver observer in lstObservers)

    {

        observer.ProcessMessage(strMessage);

    }

}

}

 

 

Lets build a client code now.

 

Router objRouter = Router.Instance;

objRouter.RegisterObserver(new MessageLogger());

// To test Asynchrous operation…we have put in a for loop

for (int i = 0; i <= 10000; i++)

objRouter.ProcessMessage("Message" + i.ToString());