Everything You Need to Know About Azure Service Bus: Part 2

Before reading this article, I highly recommend reading the previous part:




This is Part 2 of Azure Services Bus Brokered Messaging and you can find Part 1 here. In the first part we concentrated strictly on how to utilize Service Bus Queues. Continuing in Part 2, due to the sheer breadth of information, our main focus will be on the features and strengths of Service Bus Topics and Subscriptions that by now, you should know are different than Queues.
  • Topics
  • Sending Messages to Topics
  • Subscriptions
  • Retrieving Messages on a Subscription
  • Rules, Filters and Actions Oh My!

Topics

message

Now truly it is hard to talk about Topics without Subscriptions because of their close relation and nearly all the examples are exactly what you will see. But, despite their differences, Topics share a number of similarities to a Service Bus Queue. So from that standpoint since you already familiar with Service Bus Queues, we're going to talk about Topics and then introduce Subscriptions.

Topics just like Queues provide a destination to send messages. But Topics in combination with Subscriptions provide a publish and subscribe pattern for distributing messages. This is unlike the island view of Queues where a queue is both the receiver and retainer of messages it receives.

Simply put, Topics provide the publisher side of a publish/subscribe pattern. From a 30,000 foot view they don't seem all that different from Queues since they provide a destination to send messages. However, topics, just like queues, must still be created, managed and sent messages.

Many of the management operations for Topics are utilized and executed in the same fashion as Queues and we can use the same NamespaceManager that we saw in part 1 to facilitate our CRUD operations.

Creating Topics

Creating a topic can be as straight forward as giving it a name:

  1. NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(AccountInfo.ConnectionString);  
  2. TopicDescription newTopicDescription = await _namespaceManager.CreateTopicAsync(topicName); 

Or a topic can be created from a TopicDescription where we define characteristics of a topic.

  1. TopicDescription description = new TopicDescription(topicPath)  
  2. {  
  3. SupportOrdering = true,  
  4. AutoDeleteOnIdle = TimeSpan.FromHours(8),  
  5. DefaultMessageTimeToLive = new TimeSpan(0, 4, 0, 0)  
  6. };  
  7.   
  8. TopicDescription newTopicDescription = null;  
  9.   
  10. //If not exists, we'll create it  
  11. if (!await namespaceManager.TopicExistsAsync(description.Path))  
  12. {  
  13. newTopicDescription = await namespaceManager.CreateTopicAsync(description);  
  14. }  
  15.   
  16. //Or we'll retrieve it if it already exists  
  17. TopicDescription topicDescription = newTopicDescription ?? await namespaceManager.GetTopicAsync(description.Path); 

With a more contrived example we can show some of the utility methods available for checking if a topic exists as well as retrieving details about an existing topic.

Updating Topics

As with Queues, we can also update properties of an existing Topic by providing a TopicDescription. However, there are specific properties that can't be changed after creation such as RequiresDuplicateDetection.

  1. TopicDescription description = new TopicDescription(topicPath)  
  2. {  
  3.    SupportOrdering = true,  
  4.    AutoDeleteOnIdle = TimeSpan.FromHours(8),  
  5.    DefaultMessageTimeToLive = new TimeSpan(0, 4, 0, 0)  
  6. };  
  7.   
  8. await namespaceManager.UpdateTopicAsync(description);  

Deleting Topics

When we delete a topic that doesn't exist, it will not throw an exception:

  1. await namespaceManager.DeleteTopicAsync(topicName); 

Sending Messages to Topics

Now, anyone remotely familiar with how Topics and Subscriptions work will be wondering why on Earth I want to talk about sending messages to topics before I have even introduced Subscriptions. Its simple, up to this point including sending messages to topics, not much must change and it is easy to get a feel for Service Bus Topics without being bogged down with what makes them different. Stick with me, we'll be there momentarily.

You probably don't recall that I stated back in Part 1 that the MessageSender is an abstraction for the QueueClientand using the abstraction would have a payoff? Well, it's also an abstraction for a TopicClient as well. A TopicClient is the direct client for sending messages to a Topic. Therefore, if you can use a MessageSenderinstead of the relevant client, then your application doesn't need to care whether it is sending messages to a Queue or a Topic and therein lies the payoff of using the MessageSender.

You can refer back to our Queue example of creating a MessageSender to send messages and sending one is as easy as it was for sending to a Queue.

  1. await messageSender.SendAsync(new BrokeredMessage("SimpleMessage")); 

However, if you're testing this, you're in for a big surprise, so I'll save you the loss of hair (something I an good at loosing!). When a topic doesn't have a registered subscription, a message is automatically discarded. So, it appears this would be the opportune time to introduce Subscriptions.

Subscriptions

As a reminder, Topics and Subscriptions together represent a publish and subscribe pattern (see the previous topics/subscription diagram). Whereas the topic represents the publisher, a subscription represents a subscriber. When a subscription is created a topic is provided that will be subscribes to receive messages the subscription is interested in. In the simplest form, messages incoming to a topic are copied and forwarded onto each subscription whose subscription policy (we call a rule) is satisfied.

I like to equate this relationship between topics and subscriptions to an airport arrival scenario. Imagine if you will, a topic represented by an airport. You, as the message arrive at the airport and make you way through baggage claim and arrive where the limousine drivers are standing holding signs with various people's names. These drivers represent the subscription policies (rules). Assuming you find a limousine driver who is holding a sign with your name, s/he escorts you to their limousine (subscription) where you then reside in the limousine and are escorted off to some great, all-inclusive vacation and are no longer at at the airport. Well, what if you didn't find a driver holding a sign with your name on it?

Have you ever seen the movie The Terminal with Tom Hanks? Where Viktor Navorski (Tom Hanks) is trapped at the JFK International airport because he is denied entry to the U.S. and at the same time can't return to his native country due to a revolution? Well, unlike Viktor who has a place to stay at (The Terminal), when a message arrives at a topic that does not satisfy any associated subscription's rule, the message is discarded.

So let's take a closer look at what all these characteristics of subscriptions and how they tie into a Topic.

Creating a Subscription

When we create a subscription, at a bare minimum we associate it to a topic:

  1. SubscriptionDescription switftSubscription = await namespaceManager.SubscriptionExistsAsync(“singingtelegrams”, “TaylorSwift”); 

In addition, we can set some properties of the subscription by passing in a SubscriptionDescription:

  1. SubscriptionDescription subscription = new SubscriptionDescription((“singingtelegrams”, “TaylorSwift”)  
  2. {  
  3.    EnableDeadLetteringOnFilterEvaluationExceptions = true,  
  4.    EnableDeadLetteringOnMessageExpiration = true,  
  5.    EnableBatchedOperations = true,  
  6. };  
  7.   
  8. SubscriptionDescription bieberSubscription = await namespaceManager.CreateSubscriptionAsync(subscription); 

In the preceding examples we provide the path of the existing topic as well as the name for the subscription we are creating. In this fashion, these subscriptions have no explicit rule defining what messages they should receive. Therefore, they have subscribed to receive all the messages sent to the "singingtelegrams". When any message is sent to the "singingtelegram" Topic, both the TaylorSwift and JustinBieber subscriptions will receive a copy in their virtual queue.

Currently the limitation on the number of subscriptions that are subscribed to a Topic is 2000.

Deleting a Subscription

The only slight difference with the deletion of a Queue and a Subscription is that we also must provide the Topic Path that the subscription is associated with as well as the name of the subscription.

  1. await namespaceManager.DeleteSubscriptionAsync(topicPath, subscriptionName); 

Update a Subscription

Again, just as with Queues and Topics, there are certain properties that must be set at creation time such as RequireSession, but we can provide a SubscriptionDescription instance to update the properties of our Subscription:

  1. SubscriptionDescription subscription = new SubscriptionDescription(topicPath, subscriptionName)  
  2. {  
  3.    DefaultMessageTimeToLive = TimeSpan.FromHours(8),  
  4.    LockDuration = TimeSpan.FromSeconds(90)  
  5. };  
  6.   
  7. SubscriptionDescription updatedSubscription = await namespaceManager.UpdateSubscriptionAsync(subscription); 

Recall that back in Part 1, I had stated that in the end we really are still dealing with queues. Well, Microsoft does define a Subscription as a virtual queue. Copies of messages that are sent to the subscription reside there and are retrieved as they are with a Service Bus Queue.

Retrieving Messages on a Subscription

As in the case of using the abstract MessageSender, if we use the abstract MessageReceiver we don't need to worry about whether it is a Service Bus Queue or a Subscription that we are pulling messages from. The following is the exact code from receiving messages from queues in Part 1 with the exception of a single parameter passed in for the CreateMessageReceiverAsync method:

  1. MessageReceiver messageReceiver =  await _messagingFactory.CreateMessageReceiverAsync("singingtelegram/subscriptions/TaylorSwift");  
  2.    
  3. try  
  4. {  
  5.     while (!cancellationToken.IsCancellationRequested)  
  6.     {  
  7.         var msg = await messageReceiver.ReceiveAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false);  
  8.         if (msg.LockedUntilUtc == DateTime.UtcNow) await msg.RenewLockAsync();  
  9.         await ProcessAndReleaseMessageAsync(msg);  
  10.         await Task.Delay(TimeSpan.FromSeconds(30), cancellationToken);  
  11.     }  
  12. }  
  13. catch (Exception ex)  
  14. {  
  15.     //log error;  
  16.     throw;  
  17. }  
  18.    
  19. private async Task ProcessAndReleaseMessageAsync(BrokeredMessage message)  
  20. {  
  21.     MessageProcessingAction action = MessageProcessingAction.Abandon;  
  22.    
  23.     try  
  24.     {  
  25.         //Process message  
  26.         action = MessageProcessingAction.Complete;  
  27.     }  
  28.     catch (Exception ex)  
  29.     {  
  30.         //log  
  31.     }  
  32.     finally  
  33.     {  
  34.         //if something fails update with abandon  
  35.         //C# 6.0 allows await calls in a finally blocks  
  36.         UpdateMessageState(message, action);  
  37.     }  
  38. }  
  39.    
  40. private async Task UpdateMessageState(BrokeredMessage message, MessageProcessingAction action)  
  41. {  
  42.     switch (action)  
  43.     {  
  44.         case MessageProcessingAction.Complete:  
  45.             await message.CompleteAsync();  
  46.             break;  
  47.         case MessageProcessingAction.Abandon:  
  48.             await message.AbandonAsync();  
  49.             break;  
  50.         case MessageProcessingAction.Deadletter:  
  51.             await message.DeadLetterAsync();  
  52.             break;  
  53.         default:  
  54.             await message.AbandonAsync();  
  55.             break;  
  56.     }  

Note: What we do need to care about is the string we provide the MessagingFactory.CreateMessageReceiverAsync(entityPath) method. In our Queue example, we just provide a string of the queue path. But, in this case we need our subscription path in the form of <Topic Path>/subscriptions/<subscription name> as shown by "singingtelegram/subscriptions/TaylorSwift".

Upon receiving any messages, we can follow the exact same process we did in our Queue example for updating a Message by calling Abondon, Complete or DeadLetter. I'll let you review the message receiving details back in Part 1.

Until now, the only compelling reason to use Topics and Subscriptions over Queues has been the ability to provide copies of a single message to multiple interested parties (subscriptions). But that is only one compelling feature of Topics and Subscriptions. Another compelling feature is message routing and the ability to control what messages are routed to what subscriptions.

We can provide message routing using Rules. Now, just because we didn't define a rule for the subscriptions we already created, doesn't mean a rule doesn't exist. Indeed all subscriptions at a minimum have one rule.

Rules, Filters and Actions Oh My!

It is here that we have reached the zenith of what makes Topics and Subscriptions so powerful. A rule in the world of subscriptions define what messages should be copied to a specific Subscription. Each subscription has access to all the messages received by a Topic and individually can filter out what messages that belong to it. This filtering is done using the Rule's Filter that we'll talk about in depth.

In addition to filtering, when a message is filtered out by a Subscription because the message met the rule's condition, we have the opportunity to carry out actions against the message. These actions include adding, removing and updating custom properties and changing system property values. This is all defined by the rule's action. Therefore, it really is the filter and actions that make up a rule.

Tip: On a subjective side note, a rule resembles more of a policy than a rule. Where a rule defines not only the condition(s) (filters) but also what should take place when a condition is met smells more of a policy. I only state that in case someone has an easier time conceptualizing a rule as a policy.

Another way to state the preceding verbose explanation is that a subscription filters out messages that meets a rule's conditions and optionally applies actions against the properties of a message when the filtered message is copied to the subscription. So let's see examples of all these.

Rule Filter

In the .NET landscape, a subscription rule is in the form of a RuleDescription and defines the rule's Filter and Action. When we created our subscriptions, an available override to pass in a RuleDescription is available:

  1. RuleDescription rule = new RuleDescription  
  2. {  
  3.    Name = "ForwardToTaylorSwift",  
  4.    Filter = new SqlFilter("RequestedSinger = 'Justin Bieber’ OR RequestedSinger = 'Taylor Swift’")  
  5. };  
  6.   
  7. SubscriptionDescription subscription = new SubscriptionDescription(topicPath, subscriptionName);  
  8. SubscriptionDescription description = await namespaceManager.CreateOrRetrieveSubscriptionAsync(subscription, rule); 

So here is our first introduction to a Rule and its filter. As the filter's type implies, it allows for defining a SQL92 standard expression in its constructor that will govern what messages are filtered out. We can read this as, all messages received at the "singingtelegram" topic that have a custom property "RequestedSinger" with the value "Justin Bieber" or "Taylor Swift" will be filtered out to the "TaylorSwift" subscription.

So if we were to send a BrokeredMessage that matches the preceding rule to our "singingtelegram" topic:

  1. BrokeredMessage message = new BrokeredMessage();  
  2. message.Properties.Add("RequestedSinger""Justin Bieber");  
  3.   
  4. await messageSender.SendAsync(message); 

But, this doesn't exclude other subscriptions from also receiving this message as well. Recall, that I specified earlier that all subscriptions have a rule. When we don't explicitly define a rule as the case was in our original subscription creation examples, a default rule is provided for us that has a default filter. Since the rule condition is defined in the rule filter, let's have a look at what out-of-the-box filters are available:

  • SQLFilter: The filter that a number of other filters derive from such as TrueFilter and FalseFilter.

  • TrueFilter: This is the default filter provided using the default rule that is generated for us when a rule or filter is not explicitly provided when creating a subscription. Ultimately, this generates the SQL92 expression 1=1 and subscribes to receive all messages of the associated topic.

  • FalseFilter: The antithesis of a TrueFilter that generates a SQL92 expression of 1=0; a subscription with this filter will never subscript to any messages of the associated topic.

  • CorrelationFilter: This filter subscribes the subscription to all messages of a specific CorrelationId property of the message.

Tip: Be aware that the comparison values in your SQL expression are case sensitive, whereas the property names are not (for example "RequestedSinger = ‘Taylor Swift’” is not the same as "RequestedSinger = ‘taylor swift’”).

Adding Rules

Subscriptions are not limited to one rule however. We can add additional rules to an existing subscription. However, this is a perfect example of where the MessageReceiver abstraction class can't fulfill our needs and we'll need to turn to the SubscriptionClient.

  1. SubscriptionDescription subscription = await _operations.CreateOrRetrieveSubscriptionAsync(topicPath, subscriptionName);  
  2.   
  3. //MessagingFactory as demo'd in part 1  
  4. SubscriptionClient subscriptionClient = messagingFactory.CreateSubscriptionClient(subscription.TopicPath, subscription.Name);  
  5.   
  6. RuleDescription ruleDescription = new RuleDescription("NeverGrowUpSongRule"new SqlFilter("RequestedSong = 'Never Grow Up'"));  
  7.   
  8. await subscriptionClient.AddRuleAsync(ruleDescription); 

The first operation is simply a personal wrapper that returns a verified SubscriptionDescription. With it verified, we can acquire a MessagingFactory (see Sending Messages in Part 1) that will allow us to create a SubscriptionClient that will in turn allow us to add a rule to the existing subscription.

Tip: Understand that every satisfied rule will generate a copy of a message. That means if a subscription has 3 separate rules, a single message that satisfies all 3 rule conditions, will receive 3 separate copies of the message. If you want a single message based on various conditions, those conditions need to be specified in a single rule filter (for example "RequestedSinger = ‘Justin Bieber’ OR RequestedSinger = ‘TaylorSwift’”).

Quick Note on CoorelationFilter

The CoorelationFilter is a special filter for a specific use. It is one of the available ways to implement a coorelation pattern in which a subscription can retrieve all messages bound to the same CoorelationId of a message. We'll look at this in Part 3 when we examine messaging patterns.

Rule Actions

Every rule must have a filter; this is what defines the rule's condition. But, a rule does not need to have an action. As said earlier, a rule's action allows us to manipulate the properties (both system and custom) on a message that meets the rule's condition.

As we stated before, the rule is the container for both the filter and the action and we can define an action in our Action property of our RuleDescription:

  1. RuleDescription ruleDescription = new RuleDescription("ChangeRequestedSinger")  
  2. {  
  3.    Filter = new SqlFilter("RequestedSinger = 'Justin Bieber'"),  
  4.    Action = new SqlRuleAction("SET RequestedSinger = 'Taylor Swift'")  
  5. }; 

Here we are creating a RuleDescription that will filter out any messages that has a RequestedSinger with the value of "Justin Bieber" and will change the value of that same property to "Taylor Swift" as defined by the "SET RequestedSinger = ‘Taylor Swift’" action.

If we were to add this rule, or as shown below, create a subscription with this rule:

  1. SubscriptionDescription newSubscriptionDescription = await _namespaceManager.CreateSubscriptionAsync(topicPath, subscriptionName, ruleDescription ); 

Then send a message that satisfies the subscription's condition (in other words Filter = new SqlFilter(“RequestedSinger = ‘Justin Bieber'”).

  1. BrokeredMessage message = new BrokeredMessage();  
  2. message.Properties.Add("RequestedSinger""Justin Bieber");  
  3.   
  4. await messageSender.SendAsync(message); 

When we receive the message, we will find that the RequestedSinger custom property value will no longer be "Justin Bieber" but "TaylorSwift" .

  1. BrokeredMessage msg = await messageReceiver.ReceiveAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false);  
  2. message.Properties["RequestedSinger"// ="Taylor Swift" 

Manipulating custom properties is only one of the available features of the Action. Adding custom properties or changing system properties is also possible. Imagine you wanted a catch-all subscription that caught any messages without a defined RequestedSinger and created the RequestedSinger property and updated the system property ContentType.

  1. var ruleDescription = new RuleDescription("ChangeRequestedSinger")  
  2. {  
  3.    Filter = new SqlFilter("RequestedSinger IS NULL"),  
  4.    Action = new SqlRuleAction("SET sys.ContentType = 'audio/mpeg'; SET RequestedSinger = 'Taylor Swift'")  
  5. }; 

With a Subscription with the preceding RuleDescription, any messages filtered will have the System property ContentTypeupdated to audio/mpeg and create a new custom property RequestedSinger.

Tip: Notice the prefix "sys" in the SQLRuleAction, is required to set the scope of the property to a System property. By default, the scope is set to the User properties (custom properties) that is denoted by the user.<property_name>.

So as you can see there is a lot of power in rule actions. You can learn more about the syntax and available options for actions at Microsoft's MSDN page.

Conclusion

Well as you can clearly see, Topics is a big "topic" and is inseparable from Subscriptions. We've learned that two of the most powerful features in Topics and Subscriptions is the ability to distribute messages to multiple interested parties (subscriptions) and those parties are able to filter out what messages they are specifically interested in. There is still a good bit to cover on the topic of Azure Service Bus. So in the final part, we'll be diving into some of the more advanced features, patterns and security and best practices.