Introduction
Here are the simple steps to write KAFKA consumer listeners using the C#.Net Core program. I also explain the steps to integrate this with an existing web API.
Simple steps to create Kafka Consumer
- Create a .NET Core console application on an existing/new solution and add a class Class “MyKafkaConsumer”
- Nuget installs “Confluent. Kafka” - Confluent's .NET Client for Apache Kafka - is required with ConsumerConfig
- Write the following lines on Startup.cs file
//// start kafka consumer
var consumerConfig = new ConsumerConfig();
Configuration.Bind("consumer", consumerConfig);
services.AddSingleton<ConsumerConfig>(consumerConfig);
// Note: Please make sure all the other related service(s) which you are using
// part of your business logic are added here like below;
services.AddTransient<interface.IMyBusinessServices, Implementations.MyBusinessServices>();
services.AddHostedService<MyKafkaConsumer>(); // important
Modify the class “MyKafkaConsumer” by implementing the base class Microsoft.Extensions.Hosting.BackgroundService and implementing the following base class overload methods.
Task StartAsync(CancellationToken cancellationToken);
Task ExecuteAsync(CancellationToken stoppingToken);
Write a constructor and inject and do a dependency injection to set ConsumerConfig, Microsoft.Extensions.Options. IOptions< AppSettings> and any additional business logic/service.
Example
public MyKafkaConsumer(ConsumerConfig consumerConfig, IMyBusinessServices mybusinessServices, IOptions<AppSettings> appSettings)
{
_consumerConfig = consumerConfig;
_mybusinessServices = mybusinessServices;
_appSettings = appSettings;
}
Write the below in the StartConsumer method.
private async Task StartConsumer(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
// Read user from Kafka
using (var consumer = new ConsumerBuilder<Ignore, string>(_consumerConfig).Build())
{
consumer.Subscribe("kafkaListenTopic_From_Producer");
var consumeResult = consumer.Consume().Value;
if (!consumeResult.IsNullOrEmpty())
{
// Write your business logic to invoke IMyBusinessServices here
}
}
}
}
Write the below code on the ExecuteAsync method.
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
Task.Run(() => StartConsumer(stoppingToken));
return Task.CompletedTask;
}
Now the Kafka service is ready to test with a sample message. Your console application can be run independently.
Follow the below steps if you want to test the application;
Download Kafka from the kafka.apache.org website - and follow steps 1 to 4 from - https://kafka.apache.org/quickstart and start the application – MyKafkaConsumer which you wrote.
Place a debugger point inside the StartConsumer method. You can see messages are coming to the Kafka reader when a producer sends, and your logic should work as normal.
Now your Kafka code is ready to sync with WEB API. Follow the below steps to integrate with Web API
You will not be able to instantiate EF Core context directly if you are using background service. You need to inject EF Core into your database repository. You can use it like the below on the constructor.
public class MyClassRepository
{
private MyDatabaseContext _myDatabaseContext;
private readonly IServiceScopeFactory _serviceScopeFactory;
public MyClassRepository(IServiceScopeFactory serviceScopeFactory)
{
_serviceScopeFactory = serviceScopeFactory;
using (IServiceScope scope = _serviceScopeFactory.CreateScope())
{
_myDatabaseContext = scope.ServiceProvider.GetRequiredService<MyDatabaseContext>();
}
}
// now your dbcontext is ready to use
}
Summary
In this article, we learned about .Net Core - C# web API with Apache Kafka integration.