Introduction
In this article, we will discuss how to combine NLog and Kafka to collect logging messages. NLog is an awesome blogging platform that allows us to write a custom target to do something we need.
There are also many targets we can use, such as Redis, ElasticeSearch, Kafka .etc.
We will write a custom async target here step by step so that we can collect log to Kafka.
Step 1
Install NuGet packages what we will use in the next step.
- dotnet add package NLog.Web.AspNetCore
- dotnet add package Confluent.Kafka --version 1.1.0
Step 2
Create a configuration file of NLog.
- <?xml version="1.0" encoding="utf-8" ?>
- <nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- autoReload="true"
- throwConfigExceptions="true">
-
- <extensions>
- <add assembly="NLog.Web.AspNetCore"/>
- </extensions>
-
- <targets>
-
- <target xsi:type="File"
- name="service"
- archiveFileName = "logs/NLogWithKafkaDemo-{##}.log"
- archiveAboveSize="5242880"
- maxArchiveFiles = "10"
- archiveNumbering="Rolling"
- fileName="logs/NLogWithKafkaDemo.log"
- encoding="utf-8"
- layout="${date:format=yyyy-MM-dd HH\:mm\:ss,fff} ${level:uppercase=true} ${logger:shortName=true} - ${message}" />
-
-
-
- <target
- xsi:type ="KafkaAsync"
- name="kafka"
- topic="VioDataCenter"
- bootstrapServers = "192.168.0.220:9092,192.168.0.221:9092,192.168.0.222:9092"
- traceId ="${aspnet-request:item=traceId}"
- requestIp = "${aspnet-request:item=requestIp}"
- layout="${message}"
- />
-
- </targets>
-
- <rules>
- <logger name="NLogWithKafkaDemo.*" minlevel="Debug" writeTo="service" />
- <logger name="NLogWithKafkaDemo.*" minlevel="Debug" writeTo="kafka" />
- </rules>
- </nlog>
There are two targets we have configured, but the type named KafkaAsync is not implemented yet!
Here we define two parameters, one is named topic which specifies the topic will use by Kafka, the other one is named bootstrapServers which specifies the bootstrap servers of Kafka.
We also add two custom layout renderers provided by NLog.Web.AspNetCore to record the traced id and user's IP address. Because we will read some request information from HttpContext, we cannot and should not use IHttpContextAccessor in the custom target class.
Step 3
Create a custom target class named KafkaAsyncTarget that inherits from NLog.Targets.AsyncTaskTarget.
NLog.Targets.AsyncTaskTarget was introduced with NLog 4.6.
- namespace NLogWithKafkaDemo
- {
- using Confluent.Kafka;
- using Newtonsoft.Json;
- using NLog;
- using NLog.Common;
- using NLog.Config;
- using NLog.Layouts;
- using NLog.Targets;
- using System;
- using System.Collections.Concurrent;
- using System.Net;
- using System.Threading;
- using System.Threading.Tasks;
-
- [Target("KafkaAsync")]
- public class KafkaAsyncTarget : AsyncTaskTarget
- {
-
- private readonly ConcurrentQueue<IProducer<Null, string>> _producerPool;
- private int _pCount;
- private int _maxSize;
-
-
- private ConcurrentDictionary<string, IpObj> _cache;
- private const string IP_CACHE_KEY = "memory:ipaddress";
-
- public KafkaAsyncTarget()
- {
- _producerPool = new ConcurrentQueue<IProducer<Null, string>>();
- _maxSize = 10;
- _cache = new ConcurrentDictionary<string, IpObj>();
- }
-
- [RequiredParameter]
- public Layout Topic { get; set; }
-
- [RequiredParameter]
- public string BootstrapServers { get; set; }
-
- [RequiredParameter]
- public Layout TraceId { get; set; }
-
- [RequiredParameter]
- public Layout RequestIp { get; set; }
-
- protected override void CloseTarget()
- {
- base.CloseTarget();
- _maxSize = 0;
- while (_producerPool.TryDequeue(out var context))
- {
- context.Dispose();
- }
- }
-
- private IProducer<Null, string> RentProducer()
- {
- if (_producerPool.TryDequeue(out var producer))
- {
- Interlocked.Decrement(ref _pCount);
-
- return producer;
- }
-
- var config = new ProducerConfig
- {
- BootstrapServers = BootstrapServers,
- };
-
- producer = new ProducerBuilder<Null, string>(config).Build();
-
- return producer;
- }
-
- private bool Return(IProducer<Null, string> producer)
- {
- if (Interlocked.Increment(ref _pCount) <= _maxSize)
- {
- _producerPool.Enqueue(producer);
-
- return true;
- }
-
- Interlocked.Decrement(ref _pCount);
-
- return false;
- }
-
- private string GetCurrentIpFromCache()
- {
- if (_cache.TryGetValue(IP_CACHE_KEY, out var obj))
- {
- return DateTimeOffset.UtcNow.Subtract(obj.Expiration) < TimeSpan.Zero
- ? obj.Ip
- : BuildCacheAndReturnIp();
- }
- else
- {
- return BuildCacheAndReturnIp();
- }
- }
-
- private string BuildCacheAndReturnIp()
- {
- var newObj = new IpObj
- {
- Ip = GetCurrentIp(),
- Expiration = DateTimeOffset.UtcNow.AddMinutes(5),
- };
-
- _cache.AddOrUpdate(IP_CACHE_KEY, newObj, (x, y) => newObj);
-
- return newObj.Ip;
- }
-
-
- private string GetCurrentIp()
- {
- var instanceIp = "127.0.0.1";
-
- try
- {
- IPHostEntry ipHost = Dns.GetHostEntry(Dns.GetHostName());
-
- foreach (var ipAddr in Dns.GetHostAddresses(Dns.GetHostName()))
- {
- if (ipAddr.AddressFamily.ToString() == "InterNetwork")
- {
- instanceIp = ipAddr.ToString();
- break;
- }
- }
- }
- catch
- {
- }
-
- return instanceIp;
- }
-
- protected override async Task WriteAsyncTask(LogEventInfo logEvent, CancellationToken cancellationToken)
- {
-
- var instanceIp = GetCurrentIpFromCache();
-
-
- string topic = base.RenderLogEvent(this.Topic, logEvent);
- string traceId = base.RenderLogEvent(this.TraceId, logEvent);
- string requestIp = base.RenderLogEvent(this.RequestIp, logEvent);
- string msg = base.RenderLogEvent(this.Layout, logEvent);
-
-
-
-
-
-
- var json = JsonConvert.SerializeObject(new
- {
- dateTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"),
- level = logEvent.Level.Name.ToUpper(),
- instanceIp = instanceIp,
- traceId = traceId,
- requestIp = requestIp,
- @class = logEvent.LoggerName,
- message = msg
- });
-
- var producer = RentProducer();
-
- try
- {
- await producer.ProduceAsync(topic, new Message<Null, string>()
- {
- Value = json
- });
- }
- catch (Exception ex)
- {
- InternalLogger.Error(ex, $"kafka published error.");
- }
- finally
- {
- var returned = Return(producer);
- if (!returned)
- {
- producer.Dispose();
- }
- }
- }
- }
- }
The most important method of KafkaAsyncTarget is WriteAsyncTask. We should read the structured logging message from logEvent parameter and construct the message that we will send to kafka.
NOTE:
We can use RenderLogEvent method to make optimal reuse of StringBuilder-buffers.
Step 4
Configure NLog in Program.cs
- public class Program
- {
- public static void Main(string[] args)
- {
- CreateWebHostBuilder(args).Build().Run();
- }
-
- public static IWebHostBuilder CreateWebHostBuilder(string[] args) =>
- WebHost.CreateDefaultBuilder(args)
- .UseStartup<Startup>()
- .UseNLog();
- }
After running up the project, we can get some information from kafka manager.
A topic named VioDataCenter was created.
After sending a message to Kafka, we have many ways to make it a visualization, such as kibana, graylog .ect.
For the demonstration, here I use graylog to show the logging messages.
Here is the source code you can find in my GitHub page.
Summary
This article introduced how to collect logging message to Kafka via writing a custom NLog target in ASP.NET Core.
I hope this helps you.