Introduction
Azure Service Bus is a fully managed multi-tenant cloud messaging service. It is an enterprise integration message broker. It is used to decouple the application and service from each other. The service bus topic and subscriptions support a publish/subscribe messaging communication model. The topic may have multiple independent subscribers and topic subscribers can receive the copy of each message sent to the topic. In this model, the components of a distributed application communicate with each other via topics. The topic acts as an intermediary.
In the service bus queue, each message is processed by a single consumer. It means that the service bus queue provides one-to-one (point-to-point) communication. The topic/subscriptions provide one-to-many communication using a publish/subscribe messaging model. When a message is sent to the topic, it is available to each subscriber to handle or process independently. We can also define filter rules for a topic on a per subscription basis that allow us to restrict the message received by topic subscription.
Namespace is the scoping container for all message components. The topic and subscriptions are also created under the namespace. In my
previous article, I have explained about "what Azure service bus is and how to create namespace". Please take note that topics and subscriptions are only available when we choose either the Standard or Premium pricing tier while creating the namespace.
Create a topic using the Azure portal
To create a topic, click on the "Topic" button. We can view all the topics under the namespace in "Entities >> topics" tab. On the "Create new topic" screen, there are multiple fields that need to be filled up, such as name, max topic size, message time to live, and other option.
Under topic, we can create one or more subscriptions. To create subscriptions, select the topic that you want to create a subscription for and click on "Subscriptions" either from the left menu or from the toolbar.
On the "Create new topic" screen, there are multiple fields that need to be filled up, such as name, message time to live, lock duration, max delivery count, and other options. We can create multiple subscriptions as we need.
In the following example, I have created a sender that sends a message to the topic and one receiver application (subscription) that receives the message from the topic and processes it. Here, I am using .NET Core console application to send and receive the message.
Code to send a message to the topic
At the initial part of the code, I have created an instance of TopicClient using connection string and topic name. The TopicClient is available in Microsoft.Azure.ServiceBus NuGet package. It can be downloaded using the NuGet Package Manager.
Using the SendAsync method of the topic client, we can send the message to the topic. In the following example, I have sent multiple messages (currently 5) to the topic.
- static ITopicClient topicClient;
- static void Main(string[] args)
- {
- string connectionStringServiceBus = "<<connection String Service Bus>>";
- string topicName = "<< Topic Name >>";
-
- SendMessage(connectionStringServiceBus, topicName).GetAwaiter().GetResult();
- }
-
- static async Task SendMessage(string connectionStringServiceBus, string topicName)
- {
- const int numberOfMessages = 5;
- topicClient = new TopicClient(connectionStringServiceBus, topicName);
-
- Console.WriteLine("======================================================");
- Console.WriteLine("Press any key to exit after sending all the messages.");
- Console.WriteLine("======================================================");
-
-
- await SendMessagesToQueueAsync(numberOfMessages);
-
- Console.ReadKey();
-
- await topicClient.CloseAsync();
- }
-
- static async Task SendMessagesToQueueAsync(int numberOfMessages)
- {
- try
- {
- for (var i = 0; i < numberOfMessages; i++)
- {
-
- string messageBody = $"GNR-MSG dataid:{i}";
- var message = new Message(Encoding.UTF8.GetBytes(messageBody));
-
- Console.WriteLine($"Sending message to queue: {messageBody}");
-
-
- await topicClient.SendAsync(message);
- }
- }
- catch (Exception exception)
- {
- Console.WriteLine($"Exception: {exception.Message}");
- }
- }
When we run the sender application and check the Azure portal (select topic under the namespace then goto Overview window), all the subscriptions are listed in the bottom of this window and the "Active Message Count" field contains the count of messages in the queue.
Receive messages from the subscription
In the same way as sender application, I have created an instance of TopicClient using connection string and topic name. Also, I have created the instance of SubscriptionClient using connection string, topic name, and subscription name. To read the message from subscription, we need to register a MessageHandler (using RegisterMessageHandler method of subscription client). The RegisterMessageHandler contains two parameter async task for process messages and message handler option. The first parameter of this function is defined as a function to process the received messages. The CompleteAsync method of subscription client completed a message and then delete from the subscription. Following is receiver code.
- static ITopicClient topicClient;
- static ISubscriptionClient subscriptionClient;
- static void Main(string[] args)
- {
- string connectionStringServiceBus = "<<connection String Service Bus>>";
- string topicName = "<<Topic Name>>";
- string SubscriptionName = "<<Subscription Name>>";
-
- subscriptionClient = new SubscriptionClient(connectionStringServiceBus, topicName, SubscriptionName);
-
- Console.WriteLine("======================================================");
- Console.WriteLine("Press any key to exit after receiving all the messages.");
- Console.WriteLine("======================================================");
-
- topicClient = new TopicClient(connectionStringServiceBus, topicName);
-
- RegisterMessageHandlerAndReceiveMessages();
-
- Console.ReadKey();
-
- topicClient.CloseAsync().Wait();
- }
- static void RegisterMessageHandlerAndReceiveMessages()
- {
-
- var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
- {
- MaxConcurrentCalls = 1,
- AutoComplete = false
- };
-
-
- subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
- }
-
- static async Task ProcessMessagesAsync(Message message, CancellationToken token)
- {
-
- Console.WriteLine($"Received message: Sequence Number:{message.SystemProperties.SequenceNumber} \t Body:{Encoding.UTF8.GetString(message.Body)}");
-
- await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
- }
-
- static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
- {
- Console.WriteLine($"Exception:: {exceptionReceivedEventArgs.Exception}.");
- return Task.CompletedTask;
- }
Summary
The topic/subscriptions provides one-to-many communication using a publish/subscribe messaging model. This is more useful for scaling to a large number of recipients (each message sent to the topic is available to each subscription registered with the topic). We can also set filter rules per subscription to restrict the message processed by the subscriptions.
You can view and download the source code from
GitHub.