This article is a quickstart demo of how one can send or receive events from Azure Event Hub using Python script. We will be using two python scripts, ‘send.py’ and ‘recv.py’ for sending and receiving text messages in this exercise.
I created an Azure VM with an ubuntu image to test the send/receive python scripts. Obviously creating an Azure VM is very cheap, easily done in few steps and you can connect to the new machine within minutes. Don’t forget the steps required for connectivity like adding a port in the firewall and in case you want to connect the VM with a remote desktop connection there are certain commands that have to be run before you try to RDP, I wasted few mins before knowing these.
Make sure the following prerequisites are completed in the VM before you start,
- Python version 2.7 or above
- Python package for Event Hubs
- Checkpoint blob for events to use ‘Azure Blob Storage’ for storing checkpoints while processing events from Azure Event Hubs
I installed python and installed the python package with the following commands from putty, you can also run this in the terminal directly into the VM after connecting via RDP.
sudo apt install python3-pip
pip install azure-eventhub
After completing the above procedure with installing checkpoint store blob with the below command,
pip install azure-eventhub-checkpointstoreblob-aio
Event Hub Namespace
The first step is to create an Event Hubs Namespace followed by an EventHub and get the credentials that you can use to your application to communicate with Event Hubs. If you want to learn how to create a namespace and event hub, please refer to my other post.
Sending the events using send.py
Open your favorite script editor for the python script we are going to use. I used the ‘atom’ editor which was readily available to me. In case if you don’t have an editor and do not want to waste downloading it or if you have any installation restrictions on your laptop you can simply use ‘Notepad++’ which in my opinion serves our purpose here.
Save the below code as send.py,
import asyncio
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData
async def run():
# Create a producer client to send messages to the event hub.
# Specify a connection string to your event hubs namespace and
# the event hub name.
producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESPACE - CONNECTION STRING", eventhub_name="EVENT HUB NAME")
async with producer:
# Create a batch.
event_data_batch = await producer.create_batch()
# Add events to the batch.
event_data_batch.add(EventData('First event '))
event_data_batch.add(EventData('Second event'))
event_data_batch.add(EventData('Third event'))
# Send the batch of events to the event hub.
await producer.send_batch(event_data_batch)
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
getting event hub namespace and event hub name for conn string
More detailed source code is available at GitHub at the following link.
Before creating a script to Receive events you have to create ‘Azure Blob Storage’ (blob container) which will be used as a checkpoint store. The checkpoint store is used to maintain checkpoints/last read positions.
Create blob container
You can create a blob container by first creating an Azure Storage Account.
Receiving the events using recv.py
Similar to the send.py script copy paste the below script and save it as recv.py in your preferred location.
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
async def on_event(partition_context, event):
# Print the event data.
print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))
# Update the checkpoint so that the program doesn't read the events
# that it has already read when you run it next time.
await partition_context.update_checkpoint(event)
async def main():
# Create an Azure blob checkpoint store to store the checkpoints.
checkpoint_store = BlobCheckpointStore.from_connection_string("AZURE STORAGE CONNECTION STRING", "BLOB CONTAINER NAME")
# Create a consumer client for the event hub.
client = EventHubConsumerClient.from_connection_string("EVENT HUBS NAMESPACE CONNECTION STRING", consumer_group="$Default", eventhub_name="EVENT HUB NAME", checkpoint_store=checkpoint_store)
async with client:
# Call the receive method. Read from the beginning of the partition (starting_position: "-1")
await client.receive(on_event=on_event, starting_position="-1")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# Run the main method.
loop.run_until_complete(main())
storage conn string
Run the receiving script
Open a terminal that has python in its path and run the recv.py. Once you run it, you won't be seeing any difference initially as we have not executed our send.py file yet.
python3 recv.py
Run the send script
In another terminal window and run the below script to initiate the message flow
python3 send.py
The send script has started sending the messages, as I checked I could see the message received acknowledgment and the partition it was saved into with its ID
recv.py
The overview page available in the left navigation pane of the Azure portal will let you see a graphical monitor overview of the sum of messages, number of requests, and their throughputs.
If you feel you need deeper insights you can go with the ‘Monitoring’ tab available in the pane which has loads of parameters you can select as metrics. This is one of the best options if you want to have a complete picture of how your events are doing and the very good part of this dashboard is it's customizable based on your need for hours or days of data.
Azure Monitor metrics data is available for 90 days. However, when creating charts only 30 days can be visualized. For example, if you want to visualize a 90 day period, you must break it into three charts of 30 days within the 90 day period.
1k messages logged
We can see the events physically getting logged into the container inside the storage
Summary
In this article, we saw how to use python to send or receive messages in the azure event hub. There are other methods too which you can use to configure send/receive in azure event hub, I choose python as it would be easy to understand and simple.
References
The python codes have been taken from Microsoft documentation https://docs.microsoft.com/en-us/azure/event-hubs. You can also refer to the following MS' GitHub code repo where you can get all the codes to test in case you don't have time and just want to test them before implementing.