Kafka Application With Confluent Cloud And Azure Function App Using .NET Core 7

In this application, we are going to create a Kafka application with the help of Confluent Cloud and Azure Function App and its step-by-step implementation using .NET Core 7

Agenda

  • Overview of Confluent Cloud
  • Setup of Confluent Cloud
  • Step-by-step implementation using .NET Core 7 Application

If you don’t know about Kafka and its architecture, I recommend you read this article for a better understanding.

Overview of Confluent Cloud

  • Confluent Cloud is the data streaming platform that enables us to access, manage, and store data.
  • Confluent Kafka provides different connectors to connect with different data sources easily.
  • Confluent Cloud has many features using which we can scale our Kafka cluster, maintain and monitor with high availability and zero downtime.

Setup of Confluent Cloud

Step 1

Open the Confluent Cloud site and log in the same.

Step 2

Add a new Kafka Cluster

Step 3

Select basic plan

Step 4

Next, choose Azure Cloud and region.

Step 5

Skip payment section

Step 6

Enter the Kafka Cluster name.

Step 7

Afterward, go to the cluster section and open the topic section from the side nav and create a new topic.

Step 8

Click on API Keys and add a new key that we need inside the .NET Core Application.

Step-by-step implementation using .NET Core 7 Application

Step 1

Create a new .NET Core Web API Application

Step 2

Configure the application

Step 3

Provide the additional details

Step 4

Install the following NuGet Packages.

Step 5

Create a new Car Details class.

namespace ConfluentKafkaDemo.Entities
{
    public class CarDetails
    {
        public int CarId { get; set; }
        public string CarName { get; set; }
        public string BookingStatus { get; set; }
    }
}

Step 6

Add Kafka Confluent Cloud configuration details inside the app settings file.

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning"
    }
  },
  "AllowedHosts": "*",
  "KafkaConfiguration": {
    "BootstrapServers": "<bootstrap server url>",
    "SaslUsername": "<API Key>",
    "SaslPassword": "<API Password>",
    "SecurityProtocol": "SaslSsl",
    "SaslMechanism": "Plain"
  },
  "TopicName": "topic_1"
}

Step 7

Open the Program file and register the few Kafka Services as shown below.

using Confluent.Kafka;
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
var producerConfiguration = new ProducerConfig();
builder.Configuration.Bind("KafkaConfiguration", producerConfiguration);
builder.Services.AddSingleton < ProducerConfig > (producerConfiguration);
builder.Services.AddControllers();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
var app = builder.Build();
// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment()) {
    app.UseSwagger();
    app.UseSwaggerUI();
}
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();

Step 8

Next, create a car details controller

using Confluent.Kafka;
using ConfluentKafkaDemo.Entities;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Newtonsoft.Json;

namespace ConfluentKafkaDemo.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class CarsController : ControllerBase
    {
        private ProducerConfig _configuration;
        private readonly IConfiguration _config;
        public CarsController(ProducerConfig configuration, IConfiguration config)
        {
            _configuration = configuration;
            _config = config;
        }
        [HttpPost("sendBookingDetails")]
        public async Task<ActionResult> Get([FromBody] CarDetails employee)
        {
            string serializedData = JsonConvert.SerializeObject(employee);

            var topic = _config.GetSection("TopicName").Value;

            using (var producer = new ProducerBuilder<Null, string>(_configuration).Build())
            {
                await producer.ProduceAsync(topic, new Message<Null, string> { Value = serializedData });
                producer.Flush(TimeSpan.FromSeconds(10));
                return Ok(true);
            }
        }
    }
}

Step 9

Add a new Azure Function App project inside the same solution.

Step 10

Configure the function app

Step 11

Provide the additional details and select the Kafka trigger as a function.

Step 12

Open Kafka Receiver and provide the details of the confluent cloud, like the API key, password, and bootstrap URL. Here I just hard code values at the top of the function, but you can store them inside the JSON file and access the same.

using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
using Microsoft.Extensions.Logging;

namespace KafkaFunctionApp
{
    public class KafkaReciever
    {
        // KafkaTrigger sample 
        // Consume the message from "topic" on the LocalBroker.
        // Add `BrokerList` and `KafkaPassword` to the local.settings.json
        // For EventHubs
        // "BrokerList": "{EVENT_HUBS_NAMESPACE}.servicebus.windows.net:9093"
        // "KafkaPassword":"{EVENT_HUBS_CONNECTION_STRING}
        [FunctionName("Function1")]
        public void Run(
            [KafkaTrigger("<bootstrap server url>",
                          "topic_1",
                          Username = "<API Key>",
                          Password = "<API Password>",
                          Protocol = BrokerProtocol.SaslSsl,
                          AuthenticationMode = BrokerAuthenticationMode.Plain,
                          ConsumerGroup = "$Default")] KafkaEventData<string>[] events,
            ILogger log)
        {
            foreach (KafkaEventData<string> eventData in events)
            {
                log.LogInformation($"C# Kafka trigger function processed a message: {eventData.Value}");
            }
        }
    }
}

Step 13

Select both projects as a startup project in the solution and start the same.

Step 14

Execute post request with few details.

Step 15

Open the message section on the confluent cloud topic and see the message we sent there.

Step 16

After sending the message, our Kafka receiver will consume the same.

GitHub URL

https://github.com/Jaydeep-007/ConfluentKafkaDemo

Conclusion

In this article, we discussed Confluent Cloud and Azure Kafka Trigger with the help of practical implementation using .NET Core 7.

Happy Coding!!!