In this article, we learn about Rudderstack and an example using .NET Core Web API with the help of custom request-response middleware and logs API response inside Confluent Cloud from Rudderstack.
Agenda
- Overview of Rudderstack
- Configuration of Rudderstack
- Overview of Confluent Cloud
- Configuration of Confluent Cloud
- Practical Implementation within .NET Core 7 Web API Application
Overview of Rudderstack
- Rudderstack is the popular Customer Data Platform (CDP).
- It provides data pipelines that allow us to collect data from different parts of a web application, website, and SaaS Platform to activate data warehouses.
- Rudderstack has an open-source version and 80+ cloud and warehouse destination that helps us to collect and store events.
- Learn more about Rudderstack on the official website of Rudderstack https://www.rudderstack.com/.
Configuration of Rudderstack
Step 1
Login on Rudderstack Web.
Step 2
Open the connect section and note the Data Plane URL we need inside the .NET Core Application.
Step 3
Add a new source.
Note the above code snippet and Write Key that we need to configure Rudderstack inside the .NET Core Web Application.
Step 4
Next, add a new destination.
To get the credentials below, we need to log in on a confluent cloud and get details, as I showed in the Confluent Cloud Configuration Section.
If you want to add some transformation, that is also possible in the transformation tab inside that we can apply some JavaScript code on log events and transform data as per need.
Here we use Confluent Cloud as a destination application. You can use any destination warehouse to store event data like Snowflake, Data Lake, etc.
Overview of Confluent Cloud
- Confluent Cloud is the data streaming platform that enables us to access, manage, and store data.
- Confluent Kafka provides different connectors to connect with different data sources easily.
- Confluent Cloud has many features using which we can scale our Kafka cluster, maintain and monitor with high availability and zero downtime.
Configuration of Confluent Cloud
Step 1
Login to Confluent Cloud Website.
Step 2
Create a new cluster.
Step 2
Select cluster type
Step 3
Select the region/zone and create a new cluster.
Step 4
Create a new topic.
Step 5
Create and download an API KEY which we need while configuring the confluent cloud as a destination in the rudder stack.
Practical Implementation within .NET Core 7 Web API Application
Step 1
Create a new .NET Core Web API Application.
Step 2
Configure the application.
Step 3
Provide the additional details.
Step 4
Install the following NuGet Packages.
Step 5
Create a new Product Details class.
namespace RudderStackDemo.Entities {
public class ProductDetails {
public int Id {
get;
set;
}
public string ProductName {
get;
set;
}
public string ProductDescription {
get;
set;
}
public int ProductPrice {
get;
set;
}
public int ProductStock {
get;
set;
}
}
}
Step 6
Next, add a new DB Context class for data.
using Microsoft.EntityFrameworkCore;
using RudderStackDemo.Entities;
namespace RudderStackDemo.Data {
public class DbContextClass: DbContext {
public DbContextClass(DbContextOptions < DbContextClass > options): base(options) {}
public DbSet < ProductDetails > Products {
get;
set;
}
}
}
Step 7
Add seed data class which adds a few records inside the in-memory store while we run our application.
using Microsoft.EntityFrameworkCore;
using RudderStackDemo.Entities;
namespace RudderStackDemo.Data {
public class SeedData {
public static void Initialize(IServiceProvider serviceProvider) {
using(var context = new DbContextClass(serviceProvider.GetRequiredService < DbContextOptions < DbContextClass >> ())) {
if (context.Products.Any()) {
return;
}
context.Products.AddRange(new ProductDetails {
Id = 1,
ProductName = "IPhone",
ProductDescription = "IPhone 14",
ProductPrice = 120000,
ProductStock = 100
}, new ProductDetails {
Id = 2,
ProductName = "Samsung TV",
ProductDescription = "Smart TV",
ProductPrice = 400000,
ProductStock = 120
});
context.SaveChanges();
}
}
}
}
Step 8
Create IProductService and ProductService inside the repositories folder.
IProductService
using RudderStackDemo.Entities;
namespace RudderStackDemo.Repositories {
public interface IProductService {
public Task < List < ProductDetails >> ProductListAsync();
public Task < ProductDetails > GetProductDetailByIdAsync(int productId);
public Task < bool > AddProductAsync(ProductDetails productDetails);
public Task < bool > UpdateProductAsync(ProductDetails productDetails);
public Task < bool > DeleteProductAsync(int productId);
}
}
ProductService
using Microsoft.EntityFrameworkCore;
using RudderStackDemo.Data;
using RudderStackDemo.Entities;
namespace RudderStackDemo.Repositories {
public class ProductService: IProductService {
private readonly DbContextClass dbContextClass;
public ProductService(DbContextClass dbContextClass) {
this.dbContextClass = dbContextClass;
}
public async Task < List < ProductDetails >> ProductListAsync() {
return await dbContextClass.Products.ToListAsync();
}
public async Task < ProductDetails > GetProductDetailByIdAsync(int productId) {
return await dbContextClass.Products.Where(ele => ele.Id == productId).FirstOrDefaultAsync();
}
public async Task < bool > AddProductAsync(ProductDetails productDetails) {
await dbContextClass.Products.AddAsync(productDetails);
var result = await dbContextClass.SaveChangesAsync();
if (result > 0) {
return true;
} else {
return false;
}
}
public async Task < bool > UpdateProductAsync(ProductDetails productDetails) {
var isProduct = ProductDetailsExists(productDetails.Id);
if (isProduct) {
dbContextClass.Products.Update(productDetails);
var result = await dbContextClass.SaveChangesAsync();
if (result > 0) {
return true;
} else {
return false;
}
}
return false;
}
public async Task < bool > DeleteProductAsync(int productId) {
var findProductData = dbContextClass.Products.Where(_ => _.Id == productId).FirstOrDefault();
if (findProductData != null) {
dbContextClass.Products.Remove(findProductData);
var result = await dbContextClass.SaveChangesAsync();
if (result > 0) {
return true;
} else {
return false;
}
}
return false;
}
private bool ProductDetailsExists(int productId) {
return dbContextClass.Products.Any(e => e.Id == productId);
}
}
}
Step 9
Next, add a new Products Controller.
using Microsoft.AspNetCore.Mvc;
using RudderStackDemo.Entities;
using RudderStackDemo.Repositories;
namespace RudderStackDemo.Controllers {
[Route("api/[controller]")]
[ApiController]
public class ProductsController: ControllerBase {
private readonly IProductService _productService;
public ProductsController(IProductService productService) {
_productService = productService;
}
/// <summary>
/// Product List
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task < IActionResult > ProductListAsync() {
var productList = await _productService.ProductListAsync();
if (productList != null) {
return Ok(productList);
} else {
return NoContent();
}
}
/// <summary>
/// Get Product By Id
/// </summary>
/// <param name="productId"></param>
/// <returns></returns>
[HttpGet("{productId}")]
public async Task < IActionResult > GetProductDetailsByIdAsync(int productId) {
var productDetails = await _productService.GetProductDetailByIdAsync(productId);
if (productDetails != null) {
return Ok(productDetails);
} else {
return NotFound();
}
}
/// <summary>
/// Add a new product
/// </summary>
/// <param name="productDetails"></param>
/// <returns></returns>
[HttpPost]
public async Task < IActionResult > AddProductAsync(ProductDetails productDetails) {
var isProductInserted = await _productService.AddProductAsync(productDetails);
if (isProductInserted) {
return Ok(isProductInserted);
} else {
return BadRequest();
}
}
/// <summary>
/// Update product details
/// </summary>
/// <param name="productDetails"></param>
/// <returns></returns>
[HttpPut]
public async Task < IActionResult > UpdateProductAsync(ProductDetails productDetails) {
var isProductUpdated = await _productService.UpdateProductAsync(productDetails);
if (isProductUpdated) {
return Ok(isProductUpdated);
} else {
return BadRequest();
}
}
/// <summary>
/// Delete product by id
/// </summary>
/// <param name="productId"></param>
/// <returns></returns>
[HttpDelete]
public async Task < IActionResult > DeleteProductAsync(int productId) {
var isProductDeleted = await _productService.DeleteProductAsync(productId);
if (isProductDeleted) {
return Ok(isProductDeleted);
} else {
return BadRequest();
}
}
}
}
Step 10
- To get the user’s requests and response messages event, we use middleware and send that message to the rudder stack, and after that, the rudderstack will send that to the confluent cloud destination.
- In this application, I store API requests and response messages. Still, in real-time, we can store any type of information about the user, which helps us with real-time data analytics and many other purposes.
Request Middleware
using Microsoft.AspNetCore.Http.Extensions;
using RudderStack;
namespace RudderStackDemo.Middlewares {
public class RequestMiddleware {
private readonly RequestDelegate next;
public RequestMiddleware(RequestDelegate next) {
this.next = next;
}
public async Task Invoke(HttpContext context) {
var requestBodyStream = new MemoryStream();
var originalRequestBody = context.Request.Body;
await context.Request.Body.CopyToAsync(requestBodyStream);
requestBodyStream.Seek(0, SeekOrigin.Begin);
var url = UriHelper.GetDisplayUrl(context.Request);
var requestBodyText = new StreamReader(requestBodyStream).ReadToEnd();
RudderAnalytics.Client.Track("User12345", "Request", new Dictionary < string, object > {
{
"Request",
requestBodyText
},
});
requestBodyStream.Seek(0, SeekOrigin.Begin);
context.Request.Body = requestBodyStream;
await next(context);
context.Request.Body = originalRequestBody;
}
}
}
Response Middleware
using RudderStack;
namespace RudderStackDemo.Middlewares {
public class ResponseMiddleware {
private readonly RequestDelegate _next;
public ResponseMiddleware(RequestDelegate next) {
_next = next;
}
public async Task Invoke(HttpContext context) {
var bodyStream = context.Response.Body;
var responseBodyStream = new MemoryStream();
context.Response.Body = responseBodyStream;
await _next(context);
responseBodyStream.Seek(0, SeekOrigin.Begin);
var responseBody = new StreamReader(responseBodyStream).ReadToEnd();
RudderAnalytics.Client.Track("User12345", "Response", new Dictionary < string, object > {
{
"Response",
responseBody
},
});
responseBodyStream.Seek(0, SeekOrigin.Begin);
await responseBodyStream.CopyToAsync(bodyStream);
}
}
}
Step 11
Initialize the rudder stack inside the program class and register a different service we need in this application, along with our custom request and response middleware.
using Microsoft.EntityFrameworkCore;
using RudderStack;
using RudderStackDemo.Data;
using RudderStackDemo.Middlewares;
using RudderStackDemo.Repositories;
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
builder.Services.AddScoped < IProductService, ProductService > ();
builder.Services.AddDbContext < DbContextClass > (o => o.UseInMemoryDatabase("RudderStackDemo"));
//Initialize RudderAnalytics with Write Key of Source and Data plane URL.
RudderAnalytics.Initialize("<WRITE_KEY>", new RudderConfig(dataPlaneUrl: "<DATA_PLANE_URL>"));
builder.Services.AddControllers();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
var app = builder.Build();
using(var scope = app.Services.CreateScope()) {
var services = scope.ServiceProvider;
var context = services.GetRequiredService < DbContextClass > ();
SeedData.Initialize(services);
}
// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment()) {
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseMiddleware < RequestMiddleware > ();
app.UseMiddleware < ResponseMiddleware > ();
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();
Step 12
Finally, run the application and execute any endpoints that will capture request-response messages and send them to the rudder stack with the help of middleware.
Step 13
In the rudder stack dashboard, we see several events received and sent to the destination app in the corresponding section.
Step 14
Check the confluent topic message section to see the received API’s request and response inside the destination application. This type of data helps us to track users’ requirements and most frequently used things in real-time.
This is all about the Rudder Stack Customer Data Platform.
GitHub URL
https://github.com/Jaydeep-007/RudderStackDemo
Conclusion
In this article, we learned about rudder stack introduction and configuration with the help of practical implementation using .NET Core 7 Web API and Confluent Cloud as a destination application to store events.
Happy Coding!!!