Nearly any engineer worth his or her salt will likely agree that consistency is important.
If they don’t, they probably haven’t ever worked on a large legacy application or with a team of any decent size. Everyone being roughly (sans tabs vs. spaces religious views) on the same page can go a long way in terms of productivity. Things look the same. Things feel the same. It’s great. Another benefit of consistency is that it makes inconsistencies stick out like sore thumbs. Something out of line or that just looks wrong and can swiftly be identified, corrected, etc.
Inconsistencies related to style and naming typically don't matter in the grand scheme of things. Compilers will usually just eat them up and they'll vanish into the abyss. But when those inconsistencies extended into the actual code itself and implementation details, that's when things can get dangerous. This post is a tale of one such inconsistency, which seemed innocuous at first glance, but eventually festered into something nasty to track down.
Setting the Stage
To really appreciate just how annoying this issue was, it's worth setting the stage a little. It involves two major components:
- .NET Producer
This is a basic .NET console application that reads data from a source and produces messages to send up to Kafka, which does all sorts of magic downstream.
- Kafka Streams Consumer
This is just an application that handles receiving messages from the producer to perform some enrichment processes (i.e. join the messages with another data source) downstream.
Without getting too much into the weeds, you just need to know that when messages are produced, they have a key associated with them. These keys are used to uniquely identify each message and they are used by Kafka when its determining which partition in a distributed environment that a given key should live on. Partitions are important to the story as well, since Kafka is distributed by nature, so a given key should only exist on a single partition in the entire environment.
This use case in the Kafka world is a pretty common one. There was no magic going on. Everything was a very vanilla set-up using out of the box / recommended settings. And shortly after running it, it seemed to be working as expected. Thousands upon thousands of messages flowing through per second, data flowing into the final, enriched landing ground.
The process ran overnight, but when I awoke to check the data, it was clear something was very wrong. All of the data was making its way from the producer to the consumer, logs indicated that the appropriate keys were present where they needed to be, but it appeared that the joins were failing.
That's no good.
Investigating the Data
Let's consider an analogy that might make this more familiar to folks with database (and not streaming) backgrounds,
You have an imaginary database with two identical tables.
You attempt to join these two tables on their keys, which are the exact same in each.
The join succeeds and returns ... nothing ... well ... sometimes.
Knowing that the joins were failing, I was a bit baffled. Some records were flowing through the pipeline past the join operations, but it didn't make any sense. The keys were there, I was sure of it. So, I decided to take a subset of the data and look at it a bit more carefully to make sure I wasn't going crazy,
Source A (Producer)
|
Source B (Consumer)
|
mawjuG0B9k3AiALz0_2S
|
0q0juG0B9k3AiALz8ApP
|
xEEcv20B9k3AiALzEN0m
|
m60juG0B9k3AiALz5gU5
|
ua0juG0B9k3AiALz7wqa
|
ua0juG0B9k3AiALz7wqa
|
m60juG0B9k3AiALz5gU5
|
xEEcv20B9k3AiALzEN0m
|
0q0juG0B9k3AiALz8ApP
|
mawjuG0B9k3AiALz0_2S
|
...
|
...
|
With this very small subset, which was reflective of the overall data, it was verified that out of over a million pairs of records, each pair of keys was present in the two sources being joined. Next, I resorted to trying an experiment with a very, very small subset of 25 records to see just how many made it through the pipeline and successfully joined: 5.
Now why would such a small fraction of the records make it through the entire processing pipeline and others not? It didn't make sense. It's almost as if it was random.
It was.
Distributed Stuff is Hard
After banging my head for hours upon hours and burning the late-night oil wondering just what might be wrong, a colleague mentioned just how random the issue seemed and it hit me,
It was random, but just not the kind of random I was looking for.
One of the challenges of working with Kafka is that it's intended to be used in distributed environments. The ability to divvy up messages across multiple nodes allows incredible performance, resiliency, and the ability to easily scale to suit your needs without missing a beat. But just how does Kafka manage to scale so well? The answer: partitioning.
Kafka by default handles divvying up work across multiple partitions and/or nodes by using an algorithm that peeks at the key for a given record and delegates it to a partition,
-
- return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
As you can see, it takes your message key, performs some operation on it, and takes the sum modulo the number of partitions you have and magically you have a partition for your record. Since this process is deterministic and dependent on the key, it will ensure that a given key is always assigned to the same partition. So, we had to investigate a bit further into this and instead of looking at the joins that were failing, and instead focus on those that were succeeding.
Bingo! After analyzing all the data in the previous subset, I found that all five of the successful joins had the same key present on the same partition,
Key
|
Partition A
|
Partition B
|
mawjuG0B9k3AiALz0_2S
|
8
|
8
|
xEEcv20B9k3AiALzEN0m
|
8
|
8
|
ua0juG0B9k3AiALz7wqa
|
6
|
6
|
m60juG0B9k3AiALz5gU5
|
1
|
1
|
0q0juG0B9k3AiALz8ApP
|
3
|
3
|
So why were some of the keys present on the same partitions and others weren't? There didn't appear to be any rhyme or reason behind which partition a given record landed on.
It was random and that was the problem.
Inconsistency
After rounds and rounds of analyzing the data, we had the following,
- All the data was emitted as expected from the producer application (with the appropriate keys)
- All the data was making it into the streams / Kafka ecosystem.
- Some of the join operations were failing, seemingly at random, despite the keys being present on both sides of the join.
Random keeps coming up throughout this post, and that's important because it's the crux of this entire issue. After stepping away from the data itself and focusing on the partitioning, a breakthrough emerged. Digging into the source code itself, which detailed that the default partitioning strategy used by Kafka was the murmur2_random hashing algorithm. However, after looking at the .NET Producer defaults, it uses the consistent_random algorithm!
Both technologies, designed to interact with one another, had an inconsistency with how each of them partitioned specific keys. Since Kafka depends on a given key being on one and only one specific partition, the previously failing joins would never succeed since the keys, while the same, were not present on the same partitions.
A quick adjustment to the .NET producer application resolved the issue,
-
- producerConfiguration.Partitioner = Partitioner.Murmur2Random;
After setting that single property and reprocessing all of my data: an immediate world of difference. Every join was succeeding, the entire pipeline was up and running just as intended. Life was good again. It's easy to look back and smile on the solution to the problem being so simple that
even the folks at XKCD had figured out a partitioning strategy that would have worked better,
At least that would have ensured all the keys ended up in their same respective partitions.
But in the real world, at some point there was a disconnect. Some silly miscommunication or issue that resulted in this inconsistency that lead me down a rabbit-hole of heartache, confusion, and doubt. These weren't explicit configuration settings - these were defaults.
This is why consistency is important.