Azure Service Bus Queues - The Microservices Powerhouse

Azure Service Bus is a fully managed enterprise integration message broker that provides reliable and secure asynchronous message communication between different applications and services. It supports both queue-based and publish-subscribe messaging patterns. In this article, we'll delve into the concepts of Azure Service Bus queues with and without sessions, highlighting their differences, use cases, and providing Python code examples from the azure-service-bus-python repository.

Azure Service Bus Queues Without Sessions

In a standard Azure Service Bus queue (without sessions), messages are enqueued and dequeued in a First-In-First-Out (FIFO) manner. Each message is independent, and the receiver processes them without any grouping context. This approach is suitable for scenarios where each message is a standalone unit of work.

Key Characteristics

  • Independent Messages: Each message is processed individually without any relation to other messages.
  • FIFO Ordering: Messages are received in the order they were sent.
  • Scalability: Multiple receivers can process messages concurrently, but each message is delivered to only one receiver for processing.

Python Example: Sending and Receiving Messages Without Sessions

Below are Python code snippets demonstrating how to send and receive messages using Azure Service Bus queues without sessions.

Sending Messages

from azure.servicebus import ServiceBusMessage, ServiceBusSender, ServiceBusClient
from dotenv import load_dotenv
import os

load_dotenv()

connection_string = os.environ["SERVICE_BUS_CONNECTION_STRING_NORMAL_QUEUE"]
queue_name = os.environ["QUEUE_NAME"]

client = ServiceBusClient.from_connection_string(
    conn_str=connection_string,
    logging_enable=True
)

def get_sender_client():
    return client.get_queue_sender(queue_name=queue_name)

def send_messages(message):
    sender_client = get_sender_client()
    with sender_client:
        message = ServiceBusMessage(message)
        sender_client.send_messages(message)
        print("Message sent successfully!")

for i in range(1, 11):
    # Creating 10 messages for sampling
    message = f"{i} - This is a message from me!"
    send_messages(message)

In this example, 10 messages are sent to the specified queue. The ServiceBusClient is used to create a sender for the queue, and the message is sent using the send_messages method.

ServiceBusClient

Receiving Messages

import time
from azure.servicebus import (
    ServiceBusClient,
    ServiceBusMessage,
    ServiceBusReceiver,
    ServiceBusMessageBatch
)
from dotenv import load_dotenv
import os

load_dotenv()

connection_string = os.environ["SERVICE_BUS_CONNECTION_STRING_NORMAL_QUEUE"]
queue_name = os.environ["QUEUE_NAME"]

client = ServiceBusClient.from_connection_string(
    conn_str=connection_string,
    logging_enable=True
)

receiver = client.get_queue_receiver(
    queue_name=queue_name,
    max_wait_time=5
)

while True:
    messages = receiver.receive_messages(
        max_message_count=1,
        max_wait_time=5
    )

    for msg in messages:
        print("Consumer 1")
        print(f"MessageId   : {msg.message_id}")
        print(f"SequenceNo  : {msg.sequence_number}")
        print("Received    : " + str(msg))
        receiver.complete_message(msg)
        print("Completed   : " + str(msg))

    time.sleep(5)

I added two consumers with same code but to see how randomly they are picking messages from the queue.

The result was indeed random and is in the screenshot below.

Result

This randomness works fine in the grand scheme of microservices, but there are instances when, maybe due to microservices, the expectation is that all the messages related to a particular unit of work should be picked and processed as together as possible so that the processing time is minimal. This is where the Azure Service Bus session comes into play. more about it in the next section of this article.

Azure Service Bus Queues With Sessions

Message sessions in Azure Service Bus enable joint and ordered handling of unbounded sequences of related messages. They provide a way to group related messages and ensure they are processed in a specific order. This is particularly useful for scenarios that require message grouping, such as workflows or transactions.

Key Characteristics

  • Related Messages: Messages with the same session ID are grouped together and processed sequentially.
  • FIFO per Session: Within a session, messages are received in the order they were sent.
  • Session State: Sessions can maintain a state, allowing receivers to store and retrieve session-specific information.

Python Example: Sending and Receiving Messages With Sessions

The following examples demonstrate how to send and receive messages using Azure Service Bus queues with sessions. These snippets are based on the azure-service-bus-python repository.

Sending Messages with Session ID

from dotenv import load_dotenv
from azure.servicebus import ServiceBusMessage, ServiceBusSender, ServiceBusClient
import os

load_dotenv()

connection_string = os.environ["SERVICE_BUS_CONNECTION_STRING_SESSION_QUEUE"]
queue_name = os.environ["QUEUE_NAME_WITH_SESSION_ENABLED"]

client = ServiceBusClient.from_connection_string(conn_str=connection_string)
sender = client.get_queue_sender(queue_name=queue_name)

customers = ["CustA", "CustB", "CustC", "CustD", "CustE", "CustF"]

# Session feature is only available in Azure starting from Standard Pricing Tier
# and above. It is not available in Basic Pricing Tier.
with sender:
    for customer in customers:
        for i in range(1, 4):
            message = ServiceBusMessage(
                body=f"Step {i} - This is a message from {customer}!",
                session_id=customer
            )
            sender.send_messages(message)
            print(f"Message sent successfully! {message}")

In this snippet, a message is sent with a session_id property. Messages with the same session ID will be grouped together in the queue.

Terminal

Receiving Messages from a Session

from dotenv import load_dotenv
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import NEXT_AVAILABLE_SESSION
from azure.servicebus.exceptions import ServiceBusError
import os
import asyncio

load_dotenv()

connection_string = os.environ["SERVICE_BUS_CONNECTION_STRING_SESSION_QUEUE"]
queue_name = os.environ["QUEUE_NAME_WITH_SESSION_ENABLED"]

async def process_chat(session):
    async with session:
        session_id = session.session_id
        print(f"📥 Started session: {session_id}")
        try:
            async for msg in session:
                print(f"[{session_id}] Received: {msg.body.decode()}")
                await session.complete_message(msg)
        except ServiceBusError as e:
            print(f"❌ Error in session {session_id}: {e}")
        finally:
            print(f"✅ Finished session: {session_id}")

async def main():
    servicebus_client = ServiceBusClient.from_connection_string(conn_str=connection_string)

    async with servicebus_client:
        receiver = servicebus_client.get_queue_receiver(
            queue_name=queue_name,
            session_id=NEXT_AVAILABLE_SESSION
        )

        # Process multiple sessions concurrently
        async with receiver:
            await receiver.session.set_state("OPEN")
            messages = await receiver.receive_messages(max_message_count=10, max_wait_time=5)
            for msg in messages:
                print(f"Messages: {msg}")
                await receiver.complete_message(msg)

            await receiver.session.set_state("CLOSED")

while True:
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("Exiting...")
        break
    except Exception as e:
        print(f"Error: {e}")

Here, the receiver specifies a session_id to receive messages from that particular session. This ensures that related messages are processed in the order they were sent.

Particular session

As you can see, the messages for each customer in this case are received in the same order for each customer.

Enabling Sessions on a Queue

To use sessions, the queue must have sessions enabled. This can be done through the Azure portal, Azure CLI, or Azure Resource Manager templates. It's important to note that sessions must be enabled at the time of queue creation and cannot be modified later.

Conclusion

Azure Service Bus queues offer flexible messaging patterns to accommodate various application needs. Standard queues without sessions are ideal for independent, unordered message processing, while session-enabled queues provide ordered, related message processing capabilities. By understanding these options and utilizing the appropriate pattern, developers can design robust and efficient messaging solutions. The azure-service-bus-python repository provides practical examples to help you get started with both approaches.

Up Next
    Ebook Download
    View all
    Learn
    View all