Combine NLog And Kafka To Collect Logging Message In ASP.NET Core

Introduction

 
In this article, we will discuss how to combine NLog and Kafka to collect logging messages. NLog is an awesome blogging platform that allows us to write a custom target to do something we need.
 
There are also many targets we can use, such as Redis, ElasticeSearch, Kafka .etc.
 
We will write a custom async target here step by step so that we can collect log to Kafka.
Combine NLog And Kafka To Collect Logging Message In ASP.NET Core
Step 1
 
Install NuGet packages what we will use in the next step.
  1. dotnet add package NLog.Web.AspNetCore  
  2. dotnet add package Confluent.Kafka --version 1.1.0  
Step 2
 
Create a configuration file of NLog.
  1. <?xml version="1.0" encoding="utf-8" ?>  
  2. <nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd"  
  3.       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  4.       autoReload="true"  
  5.       throwConfigExceptions="true">  
  6.   
  7.   <extensions>  
  8.     <add assembly="NLog.Web.AspNetCore"/>  
  9.   </extensions>  
  10.     
  11.   <targets>  
  12.   
  13.     <target xsi:type="File"  
  14.         name="service"  
  15.         archiveFileName = "logs/NLogWithKafkaDemo-{##}.log"  
  16.         archiveAboveSize="5242880"  
  17.         maxArchiveFiles = "10"  
  18.         archiveNumbering="Rolling"  
  19.         fileName="logs/NLogWithKafkaDemo.log"  
  20.         encoding="utf-8"  
  21.         layout="${date:format=yyyy-MM-dd HH\:mm\:ss,fff} ${level:uppercase=true} ${logger:shortName=true} - ${message}" />  
  22.   
  23.     <!--xsi:type="Kafka"-->  
  24.     <!--xsi:type="KafkaAsync"-->  
  25.     <target   
  26.             xsi:type ="KafkaAsync"  
  27.             name="kafka"             
  28.             topic="VioDataCenter"              
  29.             bootstrapServers = "192.168.0.220:9092,192.168.0.221:9092,192.168.0.222:9092"  
  30.             traceId ="${aspnet-request:item=traceId}"  
  31.             requestIp = "${aspnet-request:item=requestIp}"  
  32.             layout="${message}"  
  33.             />     
  34.      
  35.   </targets>  
  36.   
  37.   <rules>  
  38.     <logger name="NLogWithKafkaDemo.*" minlevel="Debug" writeTo="service" />  
  39.     <logger name="NLogWithKafkaDemo.*" minlevel="Debug" writeTo="kafka" />  
  40.   </rules>  
  41. </nlog>  
There are two targets we have configured, but the type named KafkaAsync is not implemented yet!
 
Here we define two parameters, one is named topic which specifies the topic will use by Kafka, the other one is named bootstrapServers which specifies the bootstrap servers of Kafka.
 
We also add two custom layout renderers provided by NLog.Web.AspNetCore to record the traced id and user's IP address. Because we will read some request information from HttpContext, we cannot and should not use IHttpContextAccessor in the custom target class.
 
Step 3 
 
Create a custom target class named KafkaAsyncTarget that inherits from NLog.Targets.AsyncTaskTarget.
 
NLog.Targets.AsyncTaskTarget was introduced with NLog 4.6.
  1. namespace NLogWithKafkaDemo  
  2. {  
  3.     using Confluent.Kafka;  
  4.     using Newtonsoft.Json;  
  5.     using NLog;  
  6.     using NLog.Common;  
  7.     using NLog.Config;  
  8.     using NLog.Layouts;  
  9.     using NLog.Targets;  
  10.     using System;  
  11.     using System.Collections.Concurrent;  
  12.     using System.Net;  
  13.     using System.Threading;  
  14.     using System.Threading.Tasks;  
  15.   
  16.     [Target("KafkaAsync")]  
  17.     public class KafkaAsyncTarget : AsyncTaskTarget  
  18.     {  
  19.         // Pooling  
  20.         private readonly ConcurrentQueue<IProducer<Null, string>> _producerPool;  
  21.         private int _pCount;  
  22.         private int _maxSize;  
  23.   
  24.         // we should caching the instance ip here   
  25.         private ConcurrentDictionary<string, IpObj> _cache;  
  26.         private const string IP_CACHE_KEY = "memory:ipaddress";  
  27.   
  28.         public KafkaAsyncTarget()  
  29.         {  
  30.             _producerPool = new ConcurrentQueue<IProducer<Null, string>>();  
  31.             _maxSize = 10;  
  32.             _cache = new ConcurrentDictionary<string, IpObj>();  
  33.         }  
  34.   
  35.         [RequiredParameter]  
  36.         public Layout Topic { getset; }  
  37.   
  38.         [RequiredParameter]  
  39.         public string BootstrapServers { getset; }  
  40.   
  41.         [RequiredParameter]  
  42.         public Layout TraceId { getset; }  
  43.   
  44.         [RequiredParameter]  
  45.         public Layout RequestIp { getset; }  
  46.   
  47.         protected override void CloseTarget()  
  48.         {  
  49.             base.CloseTarget();  
  50.             _maxSize = 0;  
  51.             while (_producerPool.TryDequeue(out var context))  
  52.             {  
  53.                 context.Dispose();  
  54.             }  
  55.         }  
  56.   
  57.         private IProducer<Null, string> RentProducer()  
  58.         {  
  59.             if (_producerPool.TryDequeue(out var producer))  
  60.             {  
  61.                 Interlocked.Decrement(ref _pCount);  
  62.   
  63.                 return producer;  
  64.             }  
  65.   
  66.             var config = new ProducerConfig  
  67.             {  
  68.                 BootstrapServers = BootstrapServers,  
  69.             };  
  70.   
  71.             producer = new ProducerBuilder<Null, string>(config).Build();  
  72.   
  73.             return producer;  
  74.         }  
  75.   
  76.         private bool Return(IProducer<Null, string> producer)  
  77.         {  
  78.             if (Interlocked.Increment(ref _pCount) <= _maxSize)  
  79.             {  
  80.                 _producerPool.Enqueue(producer);  
  81.   
  82.                 return true;  
  83.             }  
  84.   
  85.             Interlocked.Decrement(ref _pCount);  
  86.   
  87.             return false;  
  88.         }  
  89.   
  90.         private string GetCurrentIpFromCache()  
  91.         {  
  92.             if (_cache.TryGetValue(IP_CACHE_KEY, out var obj))  
  93.             {  
  94.                 return DateTimeOffset.UtcNow.Subtract(obj.Expiration) < TimeSpan.Zero  
  95.                                     ? obj.Ip  
  96.                                     : BuildCacheAndReturnIp();  
  97.             }  
  98.             else  
  99.             {  
  100.                 return BuildCacheAndReturnIp();  
  101.             }  
  102.         }  
  103.   
  104.         private string BuildCacheAndReturnIp()  
  105.         {  
  106.             var newObj = new IpObj  
  107.             {  
  108.                 Ip = GetCurrentIp(),  
  109.                 Expiration = DateTimeOffset.UtcNow.AddMinutes(5),  
  110.             };  
  111.   
  112.             _cache.AddOrUpdate(IP_CACHE_KEY, newObj, (x, y) => newObj);  
  113.   
  114.             return newObj.Ip;  
  115.         }  
  116.   
  117.   
  118.         private string GetCurrentIp()  
  119.         {  
  120.             var instanceIp = "127.0.0.1";  
  121.   
  122.             try  
  123.             {  
  124.                 IPHostEntry ipHost = Dns.GetHostEntry(Dns.GetHostName());  
  125.   
  126.                 foreach (var ipAddr in Dns.GetHostAddresses(Dns.GetHostName()))  
  127.                 {  
  128.                     if (ipAddr.AddressFamily.ToString() == "InterNetwork")  
  129.                     {  
  130.                         instanceIp = ipAddr.ToString();  
  131.                         break;  
  132.                     }  
  133.                 }  
  134.             }  
  135.             catch  
  136.             {  
  137.             }  
  138.   
  139.             return instanceIp;  
  140.         }  
  141.   
  142.         protected override async Task WriteAsyncTask(LogEventInfo logEvent, CancellationToken cancellationToken)  
  143.         {  
  144.             // Read from cache  
  145.             var instanceIp = GetCurrentIpFromCache();  
  146.   
  147.             // Using RenderLogEvent will allow NLog-Target to make optimal reuse of StringBuilder-buffers.  
  148.             string topic = base.RenderLogEvent(this.Topic, logEvent);  
  149.             string traceId = base.RenderLogEvent(this.TraceId, logEvent);  
  150.             string requestIp = base.RenderLogEvent(this.RequestIp, logEvent);  
  151.             string msg = base.RenderLogEvent(this.Layout, logEvent);  
  152.   
  153.             //string topic = this.Topic.Render(logEvent);  
  154.             //string traceId = this.TraceId.Render(logEvent);  
  155.             //string requestIp = this.RequestIp.Render(logEvent);  
  156.             //string msg = this.Layout.Render(logEvent);  
  157.   
  158.             var json = JsonConvert.SerializeObject(new  
  159.             {  
  160.                 dateTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"),  
  161.                 level = logEvent.Level.Name.ToUpper(),  
  162.                 instanceIp = instanceIp,  
  163.                 traceId = traceId,  
  164.                 requestIp = requestIp,  
  165.                 @class = logEvent.LoggerName,  
  166.                 message = msg  
  167.             });  
  168.   
  169.             var producer = RentProducer();  
  170.   
  171.             try  
  172.             {  
  173.                 await producer.ProduceAsync(topic, new Message<Null, string>()  
  174.                 {  
  175.                     Value = json  
  176.                 });  
  177.             }  
  178.             catch (Exception ex)  
  179.             {  
  180.                 InternalLogger.Error(ex, $"kafka published error.");  
  181.             }  
  182.             finally  
  183.             {  
  184.                 var returned = Return(producer);  
  185.                 if (!returned)  
  186.                 {  
  187.                     producer.Dispose();  
  188.                 }  
  189.             }  
  190.         }  
  191.     }  
  192. }  
The most important method of KafkaAsyncTarget is WriteAsyncTask. We should read the structured logging message from logEvent parameter and construct the message that we will send to kafka.
 
NOTE:
 
We can use RenderLogEvent method to make optimal reuse of StringBuilder-buffers.
 
Step 4
 
Configure NLog in Program.cs
  1. public class Program  
  2. {  
  3.     public static void Main(string[] args)  
  4.     {  
  5.         CreateWebHostBuilder(args).Build().Run();  
  6.     }  
  7.   
  8.     public static IWebHostBuilder CreateWebHostBuilder(string[] args) =>  
  9.         WebHost.CreateDefaultBuilder(args)  
  10.             .UseStartup<Startup>()  
  11.             .UseNLog();  
  12. }  
After running up the project, we can get some information from kafka manager.
 
A topic named VioDataCenter was created.
 
Combine NLog And Kafka To Collect Logging Message In ASP.NET Core
 
After sending a message to Kafka, we have many ways to make it a visualization, such as kibana, graylog .ect.
 
For the demonstration, here I use graylog to show the logging messages.
 
Combine NLog And Kafka To Collect Logging Message In ASP.NET Core
 
Here is the source code you can find in my GitHub page.

Summary

 
This article introduced how to collect logging message to Kafka via writing a custom NLog target in ASP.NET Core.
 
I hope this helps you.