Learn about Asynchronous Events in C#

Abstract

This is a tutorial article on Asynchronous Events invocation in C#. We discuss treading issues related to the usage of Events/EventHandlers in C#. The intended audience is Intermediate C# programmers and above.

Introduction

Modern language like C# has integrated the Event mechanism, which has practically integrated the Observer pattern into language mechanisms.

The fact that the Event mechanism, in reality, provides Synchronous calls is often overlooked and not enough emphasized. Programmers often have the illusion of parallelism, which is not reality and is an important issue in today’s multi-core-processors world. We next provide analysis and solutions to multithreading problems.

The code presented is tutorial, demo-of-concept level and for brevity does not handle or show all variants/problematic issues.

The event mechanism provides Synchronously calls on a single thread

What needs to be emphasized is that in a call.

if (SubjectEvent != null)
{
    SubjectEvent(this, args);
}
//or
SubjectEvent?.Invoke(this, args);

subscribed EventHandlers are being invoked Synchronously on a single thread. That has some not-so-obvious consequences.

  • EventHandlers are executed in sequence, one after another, in the order they are subscribed to the event.
  • That means that objects/values in earlier subscribed EventHandler are updated earlier than in other EventHandlers, which might have consequences for program logic
  • Call to a certain EventHandler blocks the thread until all work in that EventHandler is completed
  • If an Exception is thrown in a certain EventHandler, all EventHandlers subscribed after that one will not be executed

We will demo that in an example. The plan is to create 3 EventHandlers, each taking 10 seconds to finish, and to monitor threads on which each EventHandler is running, and the total time taken. We will output each ThreadId that is relevant for this example to see how many threads are being used.

public class EventArgsW : EventArgs
{
    public string StateW = null;
}
public class EventWrapper
{
    public event EventHandler<EventArgsW> EventW;
    public string StateW;
    public void Notify()
    {
        Console.WriteLine("Notify is running on ThreadId:{0}",
            Thread.CurrentThread.ManagedThreadId);
        EventArgsW args = new EventArgsW();
        args.StateW = this.StateW;
        EventW?.Invoke(this, args);
    }
}
public class HandlerWrapper
{
    private string name;
    private string StateW;
    private ManualResetEvent mrs;
    public HandlerWrapper(string name, ManualResetEvent mrs)
    {
        this.name = name;
        this.mrs = mrs;
    }
    public void Handler(object subject, EventArgsW args)
    {
        Console.WriteLine("Handler{0} is running on ThreadId:{1}",
            name, Thread.CurrentThread.ManagedThreadId);
        Worker(subject, args);
    }
    private void Worker(object subject, EventArgsW args)
    {
        Console.WriteLine("Handler{0}.Worker is running on ThreadId:{1}, i:0",
            name, Thread.CurrentThread.ManagedThreadId);
        StateW = args.StateW;
        for (int i = 1; i <= 2; ++i)
        {
            Thread.Sleep(5000);
            Console.WriteLine("Handler{0}.Worker is running on ThreadId:{1}, i:{2}",
                name, Thread.CurrentThread.ManagedThreadId, i);
        }
        mrs.Set();
    }
}
internal class Client
{
    public static void Main(string[] args)
    {
        Console.WriteLine("Client is running on ThreadId:{0}",
            Thread.CurrentThread.ManagedThreadId);

        ManualResetEvent[] mres = new ManualResetEvent[3];
        for (int i = 0; i < mres.Length; i++) mres[i] = new ManualResetEvent(false);

        EventWrapper s = new EventWrapper();
        s.EventW += (new HandlerWrapper("1", mres[0])).Handler;
        s.EventW += (new HandlerWrapper("2", mres[1])).Handler;
        s.EventW += (new HandlerWrapper("3", mres[2])).Handler;
        // Change subject state and notify observers
        s.StateW = "ABC123";
        var timer = new Stopwatch();
        timer.Start();
        s.Notify();
        ManualResetEvent.WaitAll(mres);
        timer.Stop();
        TimeSpan timeTaken = timer.Elapsed;
        string tmp1 = "Client time taken: " + timeTaken.ToString(@"m\:ss\.fff");
        Console.WriteLine(tmp1);
        Console.ReadLine();
    }
}

The execution result is

Execution result

As can be seen from the execution result, EventHandlers run one after another, all on thread Id=1, the same thread as the Client is running on. It took 30.059 seconds to finish all work.

Asynchronous Events using TPL

Using Task Parallel Library (TPL) we can make our EventHandlers run asynchronously on separate threads. Even more, if we want to free the Client thread from any work (let’s say our Client is UI thread), we can raise Event (dispatch EvanHandlers invocations) on a separate thread from the Client thread. Here is the new implementation.

The new solution code is here.

public class EventArgsW : EventArgs
{
    public string StateW = null;
}
public class EventWrapper
{
    public event EventHandler<EventArgsW> EventW;
    public string StateW;
    public void Notify()
    {
        Task.Factory.StartNew(
            () => {
                Console.WriteLine("Notify is running on ThreadId:{0}",
                Thread.CurrentThread.ManagedThreadId);
                EventArgsW args = new EventArgsW();
                args.StateW = this.StateW;
                EventW?.Invoke(this, args);
            });
    }
}
public class HandlerWrapper
{
    private string name;
    private string StateW;
    private ManualResetEvent mrs;
    public HandlerWrapper(string name, ManualResetEvent mrs)
    {
        this.name = name;
        this.mrs = mrs;
    }
    public void Handler(object subject, EventArgsW args)
    {
        Console.WriteLine("Handler{0} is running on ThreadId:{1}",
            name, Thread.CurrentThread.ManagedThreadId);
        Task.Factory.StartNew(
            () => Worker(subject, args)); ;
    }
    private void Worker(object subject, EventArgsW args)
    {
        Console.WriteLine("Handler{0}.Worker is running on ThreadId:{1}, i:0",
            name, Thread.CurrentThread.ManagedThreadId);
        StateW = args.StateW;
        for (int i = 1; i <= 2; ++i)
        {
            Thread.Sleep(5000);
            Console.WriteLine("Handler{0}.Worker is running on ThreadId:{1}, i:{2}",
                name, Thread.CurrentThread.ManagedThreadId, i);
        }
        mrs.Set();
    }
}
internal class Client
{
    public static void Main(string[] args)
    {
        Console.WriteLine("Client is running on ThreadId:{0}",
            Thread.CurrentThread.ManagedThreadId);
        ManualResetEvent[] mres = new ManualResetEvent[3];
        for (int i = 0; i < mres.Length; i++) mres[i] = new ManualResetEvent(false);
        EventWrapper s = new EventWrapper();
        s.EventW += (new HandlerWrapper("1", mres[0])).Handler;
        s.EventW += (new HandlerWrapper("2", mres[1])).Handler;
        s.EventW += (new HandlerWrapper("3", mres[2])).Handler;
        // Change subject state and notify observers
        s.StateW = "ABC123";
        var timer = new Stopwatch();
        timer.Start();
        s.Notify();
        ManualResetEvent.WaitAll(mres);
        timer.Stop();
        TimeSpan timeTaken = timer.Elapsed;
        string tmp1 = "Client time taken: " + timeTaken.ToString(@"m\:ss\.fff");
        Console.WriteLine(tmp1);
        Console.ReadLine();
    }
}

The execution result is here

Client time taken

As it can be seen from the execution result, we see EventHandlers running on separate threads, concurrency can be seen from the execution log, and the total time taken is 10.020 seconds.

Asynchronous Events using TPL – Extension method

Since the usage of TPL required changing existing code and obfuscating the readability of code, I created an Extension method to simplify the usage of TPL. Instead of writing.

EventW?.Invoke(this, args);

One would write

EventW?.InvokeAsync<EventArgsW>(this, args);

And all TPL magic would happen behind the scenes. Here is all the source code for the new solution.

public class EventArgsW : EventArgs
{
    public string StateW = null;
}
public class EventWrapper
{
    public event EventHandler<EventArgsW> EventW;
    public string StateW;
    public void Notify()
    {
        Console.WriteLine("Notify is running on ThreadId:{0}",
            Thread.CurrentThread.ManagedThreadId);
        EventArgsW args = new EventArgsW();
        args.StateW = this.StateW;
        EventW?.InvokeAsync<EventArgsW>(this, args);  //(1)
    }
}
public class HandlerWrapper
{
    private string name;
    private string StateW;
    private ManualResetEvent mrs;
    public HandlerWrapper(string name, ManualResetEvent mrs)
    {
        this.name = name;
        this.mrs = mrs;
    }
    public void Handler(object subject, EventArgsW args)
    {
        Console.WriteLine("Handler{0} is running on ThreadId:{1}",
            name, Thread.CurrentThread.ManagedThreadId);
        Worker(subject, args);
    }
    private void Worker(object subject, EventArgsW args)
    {
        Console.WriteLine("Handler{0}.Worker is running on ThreadId:{1}, i:0",
            name, Thread.CurrentThread.ManagedThreadId);
        StateW = args.StateW;
        for (int i = 1; i <= 2; ++i)
        {
            Thread.Sleep(5000);
            Console.WriteLine("Handler{0}.Worker is running on ThreadId:{1}, i:{2}",
                name, Thread.CurrentThread.ManagedThreadId, i);
        }
        mrs.Set();
    }
}
public static class AsyncEventsUsingTplExtension
{
    public static void InvokeAsync<TEventArgs>   //(2)
        (this EventHandler<TEventArgs> handler, object sender, TEventArgs args)
    {
        Task.Factory.StartNew(() =>
        {
            Console.WriteLine("InvokeAsync<TEventArgs> is running on ThreadId:{0}",
                Thread.CurrentThread.ManagedThreadId);

            var delegates = handler?.GetInvocationList();

            foreach (var delegated in delegates)
            {
                var myEventHandler = delegated as EventHandler<TEventArgs>;
                if (myEventHandler != null)
                {
                    Task.Factory.StartNew(() => myEventHandler(sender, args));
                }
            };
        });
    }
}
internal class Client
{
    public static void Main(string[] args)
    {
        Console.WriteLine("Client is running on ThreadId:{0}",
            Thread.CurrentThread.ManagedThreadId);
        ManualResetEvent[] mres = new ManualResetEvent[3];
        for (int i = 0; i < mres.Length; i++) mres[i] = new ManualResetEvent(false);
        EventWrapper s = new EventWrapper();
        s.EventW += (new HandlerWrapper("1", mres[0])).Handler;
        s.EventW += (new HandlerWrapper("2", mres[1])).Handler;
        s.EventW += (new HandlerWrapper("3", mres[2])).Handler;
        // Change subject state and notify observers
        s.StateW = "ABC123";
        var timer = new Stopwatch();
        timer.Start();
        s.Notify();
        ManualResetEvent.WaitAll(mres);
        timer.Stop();
        TimeSpan timeTaken = timer.Elapsed;
        string tmp1 = "Client time taken: " + timeTaken.ToString(@"m\:ss\.fff");
        Console.WriteLine(tmp1);
        Console.ReadLine();
    }
}

And here is the execution result

Execution result

As it can be seen from the execution result, we see EventHandlers running on separate threads, concurrency can be seen from the execution log, and the total time taken is 10.039 seconds. TPL is dispatching work to threads in the Thread Pool, and it can be seen thread Id=4 has been used twice; probably, it finished work early and was available for work again.

Asynchronous Events using TAP

By nature of how they are defined in C#, EventHandlers are synchronous functions in the context of Task Asynchronous Pattern (TAP). If you want EventHandlers to be async in the context in TAP, so you can await in them, you need to practically roll out your own Events notifications mechanism that supports your custom version of async EvenHandlers. A nice example of such work can be seen in [1]. I modified that code for my examples, and here is the new version of the solution:

public class EventArgsW : EventArgs
{
    public string StateW = null;
}
public class EventWrapper
{
    public event AsyncEventHandler<EventArgsW> EventW;
    public string StateW;
    public async Task Notify(CancellationToken token)
    {
        Console.WriteLine("Notify is running on ThreadId:{0}",
            Thread.CurrentThread.ManagedThreadId);
        EventArgsW args = new EventArgsW();
        args.StateW = this.StateW;
        await this.EventW.InvokeAsync(this, args, token);
    }
}
public class HandlerWrapper
{
    private string name;
    private string StateW;
    private ManualResetEvent mrs;
    public HandlerWrapper(string name, ManualResetEvent mrs)
    {
        this.name = name;
        this.mrs = mrs;
    }
    public async Task Handler(object subject, EventArgsW args,
        CancellationToken token)
    {
        Console.WriteLine("Handler{0} is running on ThreadId:{1}",
            name, Thread.CurrentThread.ManagedThreadId);
        await Worker(subject, args);
    }
    private async Task Worker(object subject, EventArgsW args)
    {
        Console.WriteLine("Handler{0}.Worker is running on ThreadId:" +
            "{1}, i:0",
            name, Thread.CurrentThread.ManagedThreadId);
        StateW = args.StateW;
        for (int i = 1; i <= 2; ++i)
        {
            Thread.Sleep(5000);
            Console.WriteLine("Handler{0}.Worker is running on ThreadId:" +
                "{1}, i:{2}",
                name, Thread.CurrentThread.ManagedThreadId, i);
        }
        await Task.Delay(0);
        mrs.Set();
    }
}
public delegate Task AsyncEventHandler<TEventArgs>(
        object sender, TEventArgs e, CancellationToken token);
public static class AsynEventHandlerExtensions
{
    // invoke a async event (with null-checking)
    public static async Task InvokeAsync<TEventArgs>(
        this AsyncEventHandler<TEventArgs> handler,
        object sender, TEventArgs args, CancellationToken token)
    {
        await Task.Run(async () =>
        {
            Console.WriteLine("InvokeAsync<TEventArgs> is running on ThreadId:{0}",
                Thread.CurrentThread.ManagedThreadId);
            var delegates = handler?.GetInvocationList();
            if (delegates?.Length > 0)
            {
                var tasks =
                delegates
                .Cast<AsyncEventHandler<TEventArgs>>()
                .Select(e => Task.Run(
                    async () => await e.Invoke(sender, args, token)));
                await Task.WhenAll(tasks);
            }
        }).ConfigureAwait(false);
    }
}
internal class Client
{
    public static async Task Main(string[] args)
    {
        Console.WriteLine("Client is running on ThreadId:{0}",
            Thread.CurrentThread.ManagedThreadId);
        ManualResetEvent[] mres = new ManualResetEvent[3];
        for (int i = 0; i < mres.Length; i++)
            mres[i] = new ManualResetEvent(false);
        EventWrapper s = new EventWrapper();
        s.EventW += (new HandlerWrapper("1", mres[0])).Handler;
        s.EventW += (new HandlerWrapper("2", mres[1])).Handler;
        s.EventW += (new HandlerWrapper("3", mres[2])).Handler;
        // Change subject state and notify observers
        s.StateW = "ABC123";
        var timer = new Stopwatch();
        timer.Start();
        await s.Notify(CancellationToken.None);
        ManualResetEvent.WaitAll(mres);
        timer.Stop();
        TimeSpan timeTaken = timer.Elapsed;
        string tmp1 = "Client time taken: " +
            timeTaken.ToString(@"m\:ss\.fff");
        Console.WriteLine(tmp1);
        Console.ReadLine();
    }
}

And here is the execution result.

Client time

As it can be seen from the execution result, we see EventHandlers, now async are running on separate threads, concurrency can be seen from the execution log, and the total time taken is 10.063 seconds

Asynchronous Events using TAP – Ver2

While it was not the primary purpose of this article, we can change the code to better demo the TAP pattern. We will just make a small change to the above project code, changing one method, and all others are the same as above.

private async Task Worker(object subject, EventArgsW args)
{
    Console.WriteLine("Handler{0}.Worker is running on ThreadId:" +
        "{1}, i:0",
        name, Thread.CurrentThread.ManagedThreadId);
    StateW = args.StateW;
    for (int i = 1; i <= 2; ++i)
    {
        await Task.Delay(5000);
        Console.WriteLine("Handler{0}.Worker is running on ThreadId:" +
            "{1}, i:{2}",
            name, Thread.CurrentThread.ManagedThreadId, i);
    }
    mrs.Set();
}

Now, we get the following execution result.

Client is running on thread

If we focus our attention on, for example, Handler1.Worker, we can see that that async method has been running on 3 different threads from the ThreadPool, threads with Id 5,8,6. That is all fine due to the TAP pattern because after the await method work was picked by the next available thread in the ThreadPool. Concurrency is, again, obvious. The total time is 10.101 seconds.

Conclusion

The event mechanism, in reality, provides Synchronous calls to event handlers. We showed in the above examples how the invocation of event handlers can be made asynchronous. Two reusable extension methods have been presented in the code, that simplify asynchronous invocation implementation. The benefit is the parallel invocation of EventHandlers, which is important in today’s multi-core systems.


Similar Articles