In this article, we will be discussing further topics mentioned below:
- Understanding Producer Application
- Understanding Consumer Application
- Running Kafka Locally
- Producing and Consuming the message
So, let’s get started.
Understanding Producer Application
To describe the producer application, we have created a .Net Core Web API project and kept it simple thus added a User Controller which exposes an endpoint that can be used to produce a message in RegisterUser topic inside Kafka.
Also, as mentioned in the previous article, we have created a .Net Core Class Library project where we have centralized the code related to Kafka and this project can be independently packaged as a NuGet and can be installed in all services interacting with Kafka brokers.
Now, let’s see our UsersController Code, Here we can see that on getting the request to register the User, we are passing all this Info and creating a message in RegisterUser topic which can be consumed by some other service to do the actual work.
- using System.Threading.Tasks;
- using Kafka.Constants;
- using Kafka.Interfaces;
- using Kafka.Messages.UserRegistration;
- using Microsoft.AspNetCore.Http;
- using Microsoft.AspNetCore.Mvc;
- using Swashbuckle.AspNetCore.Annotations;
-
- namespace Kafka_ProducerApplication.Controllers
- {
- [ApiController]
- [Route("api/[controller]")]
-
- public class UsersController : ControllerBase
- {
- private readonly IKafkaProducer<string, RegisterUser> _kafkaProducer;
- public UsersController(IKafkaProducer<string, RegisterUser> kafkaProducer)
- {
- _kafkaProducer = kafkaProducer;
- }
-
- [HttpPost]
- [Route("Register")]
- [ProducesResponseType(StatusCodes.Status200OK)]
- [SwaggerOperation("Register User", "This endpoint can be used to register a User ,but for demo produces dummy message in Kafka Topic")]
- public async Task<IActionResult> ProduceMessage(RegisterUser request)
- {
- await _kafkaProducer.ProduceAsync(KafkaTopics.RegisterUser, null, request);
-
- return Ok("User Registration In Progress");
- }
- }
- }
Similarly in the real world, we can have different services which will be doing their own local transactions and once they are done with it, they can produce message(s) in Kafka topics which can eventually be consumed by some other services in order to complete the distributed transaction.
Understanding Consumer Application
As our consumer app will be consuming the message and we need this in real-time, so we have created a .Net Core Class Library project, where we have written a background service that will be pulling any new messages coming to the Kafka topic.
Here we have created a Consumer class that is bound to a Kafka topic and a Handler where we will be writing the actual code to handle the message that has been consumed by the consumer.
Similar to the Producer app, we have also created one .Net Core Class Library project for Kafka.
This is the background service code,
- using System;
- using System.Net;
- using System.Threading;
- using System.Threading.Tasks;
- using Kafka.Constants;
- using Kafka.Interfaces;
- using Kafka.Messages.UserRegistration;
- using Microsoft.Extensions.Hosting;
-
- namespace Kafka_ConsumerApplication.Core.kafkaEvents.UserRegistration.Consumers
- {
- public class RegisterUserConsumer : BackgroundService
- {
- private readonly IKafkaConsumer<string, RegisterUser> _consumer;
- public RegisterUserConsumer(IKafkaConsumer<string, RegisterUser> kafkaConsumer)
- {
- _consumer = kafkaConsumer;
- }
- protected override async Task ExecuteAsync(CancellationToken stoppingToken)
- {
- try
- {
- await _consumer.Consume(KafkaTopics.RegisterUser, stoppingToken);
- }
- catch (Exception ex)
- {
- Console.WriteLine($"{(int)HttpStatusCode.InternalServerError} ConsumeFailedOnTopic - {KafkaTopics.RegisterUser}, {ex}");
- }
- }
-
- public override void Dispose()
- {
- _consumer.Close();
- _consumer.Dispose();
-
- base.Dispose();
- }
- }
- }
And Handler code,
- using System;
- using System.Threading.Tasks;
- using Kafka.Constants;
- using Kafka.Interfaces;
- using Kafka.Messages.UserRegistration;
-
- namespace Kafka_ConsumerApplication.Core.kafkaEvents.UserRegistration.Handlers
- {
- public class RegisterUserHandler : IKafkaHandler<string, RegisterUser>
- {
- private readonly IKafkaProducer<string, UserRegistered> _producer;
-
- public RegisterUserHandler(IKafkaProducer<string, UserRegistered> producer)
- {
- _producer = producer;
- }
- public Task HandleAsync(string key, RegisterUser value)
- {
-
- Console.WriteLine($"Consuming UserRegistered topic message with the below data\n FirstName: {value.FirstName}\n LastName: {value.LastName}\n UserName: {value.UserName}\n EmailId: {value.EmailId}");
-
-
- _producer.ProduceAsync(KafkaTopics.UserRegistered, "", new UserRegistered { UserId = 1 });
-
- return Task.CompletedTask;
- }
- }
- }
Here you can see that after reading the message and printing it on console we are producing another message in a different topic named UserRegistered to inform that a user has been registered.
Now, this topic can be consumed by some service that may be notifying the User about the completion of the registration. So, we can define our own workflows depending on the requirements and can get benefit from Kafka.
Running Kafka Locally
As so far we have understood what has been written inside our Producer and Consumer Apps, so let's quickly start the Kafka Broker and Zookeeper locally and see this transaction happening, but the pre-requisite is that you must have docker installed on your system.
So to run the Kafka locally, go to the location where you have kept the docker-compose.yml file (As shared and mentioned in the
previous article) and execute the below command.
If you already have these containers, simply run them again.
And we can see that both are running
Producing and Consuming the message
As our groundwork is done, so let’s run both of our producer and consumer apps and see this in action.
Below is the interface of our Producer App.
Here using Swagger, we are submitting our request to register a User with the above-mentioned details.
We can see that the topic RegisterUser has been created successfully and we have a message produced in the topic as well.
As soon as the message is produced, Our background service in the Consumer app immediately picked up the message and printed that on the console.
And after printing the message, we are producing another message in the UserRegistered topic informing that a User has been registered successfully with the UserId as 1.
Now, let's see if we have the new topic UserRegistered created?
Yes, we can see the topic is created successfully and the message is also produced by the consumer app.
So, we have successfully established communication b/w our different apps to complete the process of User registration.
NOTE
All the commands which we have used have already been explained in the previous article. Also, the entire source code is attached to this article.
SUMMARY
To sum up we can say that, we have built two applications using .Net Core, which are communicating with each other using Kafka as a message streaming platform that sits in the middle of both the applications.
I hope you find this article helpful. Stay tuned for more … Cheers!!
You can also check out some of my previous articles on Kafka mentioned below,