.Net Core - C# Web API with Apache Kafka Integration

Introduction

Here are the simple steps to write KAFKA consumer listeners using the C#.Net Core program. I also explain the steps to integrate this with an existing web API.

Simple steps to create Kafka Consumer

  • Create a .NET Core console application on an existing/new solution and add a class Class “MyKafkaConsumer”
  • Nuget installs “Confluent. Kafka” - Confluent's .NET Client for Apache Kafka - is required with ConsumerConfig
  • Write the following lines on Startup.cs file
    //// start kafka consumer
    var consumerConfig = new ConsumerConfig();
    Configuration.Bind("consumer", consumerConfig);
    services.AddSingleton<ConsumerConfig>(consumerConfig);
    // Note: Please make sure all the other related service(s) which you are using
    // part of your business logic are added here like below;
    services.AddTransient<interface.IMyBusinessServices, Implementations.MyBusinessServices>();
    services.AddHostedService<MyKafkaConsumer>(); // important
    

Modify the class “MyKafkaConsumer” by implementing the base class Microsoft.Extensions.Hosting.BackgroundService and implementing the following base class overload methods.

Task StartAsync(CancellationToken cancellationToken);
Task ExecuteAsync(CancellationToken stoppingToken);

Write a constructor and inject and do a dependency injection to set ConsumerConfig, Microsoft.Extensions.Options. IOptions< AppSettings> and any additional business logic/service.

Example

public MyKafkaConsumer(ConsumerConfig consumerConfig, IMyBusinessServices mybusinessServices, IOptions<AppSettings> appSettings)
{
    _consumerConfig = consumerConfig;
    _mybusinessServices = mybusinessServices;
    _appSettings = appSettings;
}

Write the below in the StartConsumer method.

private async Task StartConsumer(CancellationToken stoppingToken)
{
    while (!stoppingToken.IsCancellationRequested)
    {
        // Read user from Kafka
        using (var consumer = new ConsumerBuilder<Ignore, string>(_consumerConfig).Build())
        {
            consumer.Subscribe("kafkaListenTopic_From_Producer");
            var consumeResult = consumer.Consume().Value;
            if (!consumeResult.IsNullOrEmpty())
            {
                // Write your business logic to invoke IMyBusinessServices here
            }
        }
    }
}

Write the below code on the ExecuteAsync method.

protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
    Task.Run(() => StartConsumer(stoppingToken));
    return Task.CompletedTask;
}

Now the Kafka service is ready to test with a sample message. Your console application can be run independently.

Follow the below steps if you want to test the application;

Download Kafka from the kafka.apache.org website - and follow steps 1 to 4 from - https://kafka.apache.org/quickstart and start the application – MyKafkaConsumer which you wrote.

Place a debugger point inside the StartConsumer method. You can see messages are coming to the Kafka reader when a producer sends, and your logic should work as normal.

Now your Kafka code is ready to sync with WEB API. Follow the below steps to integrate with Web API

  • Create a regular .Net Core web API project and add to a solution with default configurations.
  • Create another console application/class library project within the same solution (you can create a console application as well to test the service independently to validate the service is listening and up and running). This is the class for which we are going to write the logic for the Kafka consumer background service.
  • Create Kafka consumer service by using the base class BackgroundService.
  • Perform individual tests of the Kafka consumer service project by using Kafka producer service inputs and validate the consumer listening logic. After successful validation of individual components, we can integrate Kafka background service with web API.
  • Move all the configuration entries (such as consumer bootstrap servers, grouped, etc.) from the Kafka class library/console application to the original Web API appconfig.json file. This file will be the only place to read all your configuration entries.
  • Now we need to make some modifications to your startup.cs file like below.
    services.AddHostedService<MyKafkaConsumer>();
    // MyKafkaConsumer is the listener class to listen to Kafka messages.
    

You will not be able to instantiate EF Core context directly if you are using background service. You need to inject EF Core into your database repository. You can use it like the below on the constructor.

public class MyClassRepository
{
    private MyDatabaseContext _myDatabaseContext;
    private readonly IServiceScopeFactory _serviceScopeFactory;
    public MyClassRepository(IServiceScopeFactory serviceScopeFactory)
    {
        _serviceScopeFactory = serviceScopeFactory;
        using (IServiceScope scope = _serviceScopeFactory.CreateScope())
        {
            _myDatabaseContext = scope.ServiceProvider.GetRequiredService<MyDatabaseContext>();
        }
    }
    // now your dbcontext is ready to use
}

Summary

In this article, we learned about .Net Core - C# web API with Apache Kafka integration.