Problem
How to use Azure Service Bus in .NET Core.
Solution
Create a class library and add NuGet package - Microsoft.Azure.ServiceBus
Add a class to encapsulate settings.
- public class AzureQueueSettings
- {
- public AzureQueueSettings(string connectionString, string queueName)
- {
- if (string.IsNullOrEmpty(connectionString))
- throw new ArgumentNullException("connectionString");
-
- if (string.IsNullOrEmpty(queueName))
- throw new ArgumentNullException("queueName");
-
- this.ConnectionString = connectionString;
- this.QueueName = queueName;
- }
-
- public string ConnectionString { get; }
- public string QueueName { get; }
- }
Add a class to wrap functionality of sending messages to queue
- public class AzureQueueSender<T> : IAzureQueueSender<T> where T : class
- {
- public AzureQueueSender(AzureQueueSettings settings)
- {
- this.settings = settings;
- Init();
- }
-
- public async Task SendAsync(T item, Dictionary<string, object> properties)
- {
- var json = JsonConvert.SerializeObject(item);
- var message = new Message(Encoding.UTF8.GetBytes(json));
-
- if (properties != null)
- {
- foreach (var prop in properties)
- {
- message.UserProperties.Add(prop.Key, prop.Value);
- }
- }
-
- await client.SendAsync(message);
- }
-
- private AzureQueueSettings settings;
- private QueueClient client;
-
- private void Init()
- {
- client = new QueueClient(
- is.settings.ConnectionString, this.settings.QueueName);
- }
- }
Add a class to wrap the functionality of receiving messages from the queue.
- public void Receive(
- Func<T, MessageProcessResponse> onProcess,
- Action<Exception> onError,
- Action onWait)
- {
- var options = new MessageHandlerOptions(e =>
- {
- onError(e.Exception);
- return Task.CompletedTask;
- })
- {
- AutoComplete = false,
- MaxAutoRenewDuration = TimeSpan.FromMinutes(1)
- };
-
- client.RegisterMessageHandler(
- async (message, token) =>
- {
- try
- {
-
- var data = Encoding.UTF8.GetString(message.Body);
- T item = JsonConvert.DeserializeObject<T>(data);
-
-
- var result = onProcess(item);
-
- if (result == MessageProcessResponse.Complete)
- await client.CompleteAsync(
- message.SystemProperties.LockToken);
- else if (result == MessageProcessResponse.Abandon)
- await client.AbandonAsync(
- message.SystemProperties.LockToken);
- else if (result == MessageProcessResponse.Dead)
- await client.DeadLetterAsync(
- message.SystemProperties.LockToken);
-
-
- onWait();
- }
- catch (Exception ex)
- {
- await client.DeadLetterAsync(
- message.SystemProperties.LockToken);
- onError(ex);
- }
- }, options);
- }
Now, you can use these wrapper classes to send messages.
- var settings = new AzureQueueSettings(
- connectionString: config["ServiceBus_ConnectionString"],
- queueName: config["ServiceBus_QueueName"]);
-
- var message = new Message { Text = "Hello Queue" };
-
- IAzureQueueSender<Message> sender =
- eQueueSender<Message>(settings);
- await sender.SendAsync(message);
And receive messages.
- IAzureQueueReceiver<Message> receiver =
- eQueueReceiver<Message>(settings);
- receiver.Receive(
- message =>
- {
- Console.WriteLine(message.Text);
- return MessageProcessResponse.Complete;
- },
- ex => Console.WriteLine(ex.Message),
- () => Console.WriteLine("Waiting..."));
NOTE
The sample code also includes wrappers for topics and subscriptions.
Discussion
The sample code will require you to setup Azure account and Service Bus. Instructions for these could be found here.
Source Code
GitHub