The Azure event hub is a data streaming platform and ingesting event service. Event hubs can receive streaming data from one or multiple sources (event Producers) and it can be saved and processed by one or multiple consumers. Event data can be captured/saved by the Azure Blob storage or Azure Data Lake Storage for long-time processing purposes. It can be integrated with Azure Stream Analytics and Azure Functions from the event hub.
Some of the important terms that you need to understand before starting Azure Event Hub implementation include.
Event producer
Producers are responsible to generate the event data and send to the event hub. One or multiple producers can send data to the event hub.
- Event receivers/consumers: Consumers are responsible for listening to the event data from the provided event hub. One or multiple consumers can read one partition or all of the event data from the event hub.
- Consumer group: Event consumers can be grouped based on business logic. Consumer clients need the information of the group in order to read event data from the event hub.
Partition
As data are sent as streaming data to the event hub, the event hub uses partitions so that consumers can read one partition of the event data based on partition ID or can read all event data. Partition is directly related to the number of consumers who are going to listen to the event data.
Partition Key
Used to generate the hash key of the event data.
More details can be found in Azure documentation at the Microsoft official site.
Where to use
- Application monitoring.
- Abnormal detection
- Streaming data captured and analyzed.
Some real-world applications which can be used with Azure Event Hub
Reading sensor data
There is an Automatic Traffic System has been installed in the city. The application records the traffic data in real-time with its cameras and sensors. In a normal situation, an automatic traffic system works as per schedule but if there is some abnormal situation like a traffic jam or natural disaster it should be able to handle this by some automatic mechanism to redirect road traffic to other routes. Also, the system should be able to alert the traffic police to handle the situation. As sensors generate huge data from different sources from all over the city (multiple event sources) at the same time (maybe 10 million or more per sec) a normal system will be not able to able to capture all the events.
Here Azure Event Hub can ingest all streaming data coming from all the traffic signal centers (city squares) and analyze it based on which decision should be taken by Automatic Traffic System. So, in this case, the real-time data will be captured by sensors from the automatic traffic system, and those data are sent to the Azure event hub for data storage and analysis based on which system can make a decision.
E-commerce application to capture user data for boosting e-commerce business
E-commerce applications capture user data like user ID, mobile no, device identification, location, items that the user has searched & user preference, based on which product promotion can be set on the user notification area and also can be displayed in his social site advertisement area. Users can access eCommerce applications from different devices like mobile, desktop, laptop tab, etc. It must be able to capture all user data in real-time. Again, Azure Event Hub can capture all event data and can save and pass the data to the other systems for analysis based on which product promotion can be sent to individual users' devices.
Nowadays, applications are not only supposed to be able to execute business logic properly (like purchase requests) but they also must be able to lead the business to the next level and provide information based on which applications can predict & suggest items that a user is going to buy in the future from e-commerce platforms.
Here is what we are going to learn
Azure event hub implementation using .net core console application.
We are going to take ecommerce applications in this demonstration.
How to create an Event Hub in Azure
In order to create an event, the first event hub namespace is needed.
Log in to the Azure portal and navigate to the Azure Event Hub.
Now click on the add button to create a namespace.
The rest of the configuration can be done with the default setting. Now review and create a namespace.
Once the Event Hub namespace is created, now we can proceed with Event Hub creation. Navigate to the newly created Event Hub namespace and click on namespace.
Click on Event Hub and provide the mandatory fields. As we are using the basic tier option in the event hub namespace, some options will be in disabled mode.
The capture option cannot be configured as it is not available in the basic tier plan. Partition count is an important property and it is directly related to the event consumers. In this demo, we are implementing two consumers to listen to the event hub.
After creating the namespace and the event hub we need to create SAS(Shared access policies) to access the event hub instance from our console application.
There are 3 claims in the shared access policy. One policy can have all three options or maybe a combination of any two. But per the application needs, I suggest you create a policy based on responsibilities.
SAS claims
For this demo, two policies were created, one for producers with send claims and one for consumers with listen claims. Please note that policies can be defined in the Event Hub Instance level and Event namespace level as well. In this demo, policies are defined at the event namespace level.
In this example, I will be covering the event producer and event consumers only. I have used .net core 3.1 with Visual Studio 2019. It is always a good practice to implement code with the latest version.
We are going to implement an e-commerce application that collects user data from user devices and sends it to an event hub. User data contains user info, device info, and the items that the user wants to buy in the near future, and he is searching for that item in e-commerce applications. We have created two consumers who are going to listen to the event hub and consume the event data. Event data can be further passed to any analysis application to generate the business lead.
There are 3 parts to the console application.
Entry point or main method
Code from Program.cs
using System;
using System.Threading;
using System.Threading.Tasks;
namespace AzureEventHubMutliProducerConsumer {
class Program {
static void Main(string[] args) {
Program program = new Program();
Task[] tasks = new Task[3];
tasks[0] = Task.Run(() => {
Thread.Sleep(1000);
//Run the producer
program.RunProducer();
});
tasks[1] = Task.Run(() => {
Thread.Sleep(1000);
//Run the event consumer
program.RunEventHubConsumerReadEvent();
});
tasks[2] = Task.Run(() => {
Thread.Sleep(1000);
//Run the event consumer
program.RunEventHubConsumerReadEventPartitionEvent();
});
Task.WaitAll(tasks);
Console.WriteLine("Press any any to end program");
Console.ReadKey();
}
public void RunProducer() {
//Run the producer
EventProducer eventProducer = new EventProducer();
eventProducer.Init();
eventProducer.CreatePurchaseRequest().Wait();
}
public void RunEventHubConsumerReadEvent() {
//Run the EventHubConsumerClientDemo
EventHubConsumerClientDemo eventHubConsumer = new EventHubConsumerClientDemo();
eventHubConsumer.ConsumerReadEvent("$Default").Wait();
}
public void RunEventHubConsumerReadEventPartitionEvent() {
//Run the EventHubConsumerClientDemo
EventHubConsumerClientDemo eventHubConsumer = new EventHubConsumerClientDemo();
eventHubConsumer.ConsumerReadEventPartitionEvent("$Default", "1").Wait();
}
}
}
3 threads will be created by the console program in order to run the event producer and two consumers can run in parallel.
Event Producer
Code for EventProducer.cs
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
namespace AzureEventHubMutliProducerConsumer {
public class EventProducer {
string connectionString = "----- Get key from azure queue ->Shared access key-----";
string eventHubName = "-----Event hub name--";
EventDataBatch generateData;
List<string> device = new List<string>();
EventHubProducerClient producerClient;
public void Init() {
producerClient = new EventHubProducerClient(connectionString, eventHubName);
device.Add("Mobile");
device.Add("Laptop");
device.Add("Desktop");
device.Add("Tablet");
}
public async Task GenerateEvent() {
try {
// send in batch
int partitionId = 0;
foreach (var eachDevice in device) {
StringBuilder strBuilder = new StringBuilder();
var batchOptions = new CreateBatchOptions() {
PartitionId = partitionId.ToString()
};
generateData = producerClient.CreateBatchAsync(batchOptions).Result;
strBuilder.AppendFormat("Search triggered for iPhone 21 from device {0} ", eachDevice);
var eveData = new EventData(Encoding.UTF8.GetBytes(strBuilder.ToString()));
// All value should be dynamic
eveData.Properties.Add("UserId", "UserId");
eveData.Properties.Add("Location", "North India");
eveData.Properties.Add("DeviceType", eachDevice);
generateData.TryAdd(eveData);
producerClient.SendAsync(generateData).Wait();
//Reset partitionId as it can be 0 or 1 as we have define in azure event hub
partitionId++;
if (partitionId > 1) partitionId = 0;
}
await Task.CompletedTask;
} catch (Exception exp) {
Console.WriteLine("Error occruied {0}. Try again later", exp.Message);
}
}
}
}
The Init() method will initialize the EventHubProducer client with the event connection key and event hub name. It will also create and add 4 devices to the device list. In the real world, there might be millions of devices and millions of users using e-commerce at the same time. We are trying to replicate the same scenario by using the device list to generate events.
The GenerateEvent() method will generate event data for each device with some user data. We are using the partition id 0 or 1 which will be used by the consumer to read event data based on partition id. Event data can be sent in a single event data or a batch of event data, here we are sending a batch with partition ID.
Event Consumer
Code for EventConsumer.cs
using Azure.Messaging.EventHubs.Consumer;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace AzureEventHubMutliProducerConsumer {
public class EventHubConsumerClientDemo {
string connectionString = "----- Get key from azure queue ->Shared access key-----";
string eventHubName = "-----Event hub name--";
//Read all events. No blol container needed directly consumer can use the method to procees event
public async Task ConsumerReadEvent(string consumerGroup) {
try {
CancellationTokenSource cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(30));
EventHubConsumerClient eventConsumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);
await foreach (PartitionEvent partitionEvent in eventConsumer.ReadEventsAsync(cancellationSource.Token)) {
Console.WriteLine("---Execution from ConsumerReadEvent method---");
Console.WriteLine("------");
Console.WriteLine("Event Data recieved {0} ", Encoding.UTF8.GetString(partitionEvent.Data.Body.ToArray()));
if (partitionEvent.Data != null) {
Console.WriteLine("Event Data {0} ", Encoding.UTF8.GetString(partitionEvent.Data.Body.ToArray()));
if (partitionEvent.Data.Properties != null) {
foreach (var keyValue in partitionEvent.Data.Properties) {
Console.WriteLine("Event data key = {0}, Event data value = {1}", keyValue.Key, keyValue.Value);
}
}
}
}
Console.WriteLine("ConsumerReadEvent end");
await Task.CompletedTask;
} catch (Exception exp) {
Console.WriteLine("Error occruied {0}. Try again later", exp.Message);
}
}
//Read all events based on partitionId
public async Task ConsumerReadEventPartitionEvent(string consumerGroup, string partitionId) {
try {
CancellationTokenSource cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(30));
EventHubConsumerClient eventConsumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);
ReadEventOptions readEventOptions = new ReadEventOptions() {
MaximumWaitTime = TimeSpan.FromSeconds(30)
};
await foreach (PartitionEvent partitionEvent in eventConsumer.ReadEventsFromPartitionAsync(partitionId, EventPosition.Latest, readEventOptions, cancellationSource.Token)) {
Console.WriteLine("---Execution from ConsumerReadEventPartitionEvent method---");
Console.WriteLine("------");
if (partitionEvent.Data != null) {
Console.WriteLine("Event Data recieved {0} ", Encoding.UTF8.GetString(partitionEvent.Data.Body.ToArray()));
if (partitionEvent.Data.Properties != null) {
foreach (var keyValue in partitionEvent.Data.Properties) {
Console.WriteLine("Event data key = {0}, Event data value = {1}", keyValue.Key, keyValue.Value);
}
}
}
}
await Task.CompletedTask;
} catch (Exception exp) {
Console.WriteLine("Error occruied {0}. Try again later", exp.Message);
}
}
}
}
Event consumer with ReadEventsAsync method
ConsumerReadEvent() method takes the consumer group name as input params and connects to the event hub with the connection key and event hub name. There is a CancellationTokenSource used so that after waiting from the provided time period for event data, it should be terminated automatically. Also, ReadEventOptions provides period. In order to read events from even hubs, EventHub- ConsumerClient has been used. The method ReadEventsAsync method will read all the event data from the event hub.
await foreach (PartitionEvent partitionEvent in eventConsumer.ReadEventsAsync(cancellationSource.Token))
The rest of the method is used to process the event data and its properties and display them in the console. After consuming all event data from the event hub, the consumer will wait for 30 seconds as provided in the cancellation token and after that the application will throw exceptions and the application will be terminated.
Event consumer with ReadEventsFromPartitionAsync method
It is similar to the ConsumerReadEvent method, but the main difference is the input params and method which is going to read event data by partition ID. While defining the partition in the Azure Event Hub namespace in the Azure portal we have set only 2 partitions, so only 0 or 1 can be the partition id. Based on the partition ID provided in the method, data will be retrieved from the event hub. The rest of the partition will be not consumed by the consumer.
await foreach (PartitionEvent partitionEvent in eventConsumer.ReadEventsFromPartitionAsync(partitionId, EventPosition.Latest, readEventOptions, cancellationSource.Token))
EventPosition can be set to latest, early, and some other option as well.
In summary, we are running to two consumer clients in parallel. One will read all event data and the other consumer will only read event data based on partition ID.
How to run applications with this code
Option 1. Prerequisite- Visual Studio 2019 and .net core 3.1. Update your “Shared access key” in the producer and consumer class
- Open Visual Studio and click on New -> Project.
- Select .net core from the left end and select the Console App(.net core) template.
- Mention the path where the solution will be created and click on the ok button.
- A new console app project will be created with the default Program.cs file.
- Now from Nuget package manager, search Azure.Messaging.EventHubs.Processor and installation for the selected project.
- Add a new .cs file to the project and name it EventProducer.cs
- Copy the content from the section “Code for EventProducer.cs” and paste it as EventProducer.cs file
- Add a new .cs file to the project and name it EventHubConsumerClientDemo.cs
- Copy the content from the section “Code for EventHubConsumerClientDemo.cs” and paste it as the EventHubConsumerClientDemo.cs file
- Copy the content of the main entry point (only the c# code) and paste it into the Program.cs file.
Build the solution and run the application.
Option 2. Download the zip file from here and unzip it in your local system. Open the solution file, build the project, and run the application.
Thanks, and happy coding!