The what, why and how of Apache Kafka

I'm the founder of Taaalk and a junior software engineer
Senior software developer for Red Hat, working on Kafka related products
Follow this Taaalk

4 followers

5,255 views

Joshua Summers
09:06, 06 Mar 22
Hi Liam
Thank you very much for Taaalking with me :)
My hope is to discuss Apache Kafka at a high level first, and the more the general concepts are understood, the deeper we can go in terms of detail.
Keeping that high-level perspective in mind, what is the problem that Apache Kafka is there to solve?
Liam
12:32, 06 Mar 22
Apache Kafka was built to solve problems of data resilience, and concurrent consumption, at scale.
While that sounds very buzzwordy, it's a reasonably correct description of why LinkedIn developed Kafka. A social network where your comments or likes are lost is not a very good social network, and while having a rapid growth in user numbers (Last I checked, LinkedIn had something like 800 million users last year) is a very good problem to have, it can be a hard problem to have. 
Apache Kafka was built on two key principles - in big systems, things fail, and spinning rust is cheap. Although many people use SSDs these days with Apache Kafka, as it's no longer 2011 when Apache Kafka was first open sourced so they're a lot cheaper,  the general design principle remains - write things to disk instead of keeping them in RAM, and expect members of the cluster to fail.
This allows for very easy horizontal scaling on cheap machines. And more cluster members means a more resilient system.
Joshua Summers
00:10, 08 Mar 22 (edit: 08:31, 08 Mar 22)
Right, so from what you are saying it sounds like there was an ugly pre-Kafka world, where companies faced problems when they needed to scale their computing power. The mechanics of actually scaling computing power are very vague to me, but I am guessing it is something like adding another rack of servers (virtual or on-prem), creating another instance of the application / service and then distributing some of the work load to that instance. Servers can do funny things and break down in ways that are hard to predict, and it sounds like when they did crash, the work that was sent to that instance would be lost for good. The result would be, as you say, that comments and likes that had been sent to the failed instance would be lost for good.
Is that the right understanding of the problem that led to Kafka be developed? 
If so, I think I understand the "in big systems, things fail" part. But still feel less clear on "concurrent consumption" and how that relates to "spinning rust" being cheap (I'm not 100% sure what rust is!). 
Would you mind expanding a bit on the inner workings of concurrent consumption (independent of Kafka)? Was my understanding of it correct (multiple instances of the same application / service)? For example, if you were to extend your LinkedIn example of comments and likes, how might that relate to concurrent consumption and the price of rust?
Liam
07:27, 08 Mar 22
Haha, sorry, spinning rust is a turn of phrase describing traditional hard drives (HDDs) instead of solid state drives (SSDs) - the spinning platters used to use iron oxide as the magnetic storage medium.
So when you're scaling large systems, you can either scale vertically or horizontally. Vertical is beefing up your existing servers - adding more CPUs/RAM/disks etc. Horizontal is just adding more servers, and can often be cheaper for a large system than vertical scaling, particularly when it comes to storage.
To give an example, I can buy 3 6TiB Seagate HDDs for less than the price of one 18TiB Seagate HDD. But If I'm scaling my storage vertically, I can only attach so many hard drives to my computer, so I need to use the largest drives I could. But if I'm scaling horizontally,  I can use cheaper hardware to achieve the same overall total storage capacity for less cost.
And the spinning rust HDDs come in here because they're significantly cheaper than SSDs. However, they're far slower for reads and writes, especially random ones. But for sequential reads/writes, their performance is acceptable.
So this is the "spinning rust is cheap" foundation of Kafka's design.
Liam
07:58, 08 Mar 22 (edit: 08:01, 08 Mar 22)
The other key foundation I mentioned was "things fail in a large system".
If you're trying to build a large distributed system from cheap commodity hardware, this is a fact of life. individual hard drives will fail, entire servers will fail, you will most likely face an entire datacentre going dark in your career. (Probably far more often than you like). And the more components in a system, the more points of failure.
So if we go back to the "3 x 6TiB HDD vs 1 x 18TiB HDD" example from the discussion of vertical vs. horizontal scaling, there are 3 hard drives that could fail, vs. 1 that could fail. So (assuming the 6TiB and 18TiB have the same failure rate), using 3 hard drives increases your probability of having to deal with a HDD failure by 66%. And if I were to run, say, 3 servers with 3 6 TiB HDDs each, to reach 46TiB total storage, compared to running 1 server with 3 18 TiB HDDs, you can see that there's so many more potential points of failure, and not just at the disk level, more power supplies, more network interfaces, more sticks of RAM.
So the cheapness of scaling horizontally on commodity hardware brings with it the cost of surviving the inevitable failures.
So if you want to scale horizontally effectively, you have to accept that failures are normal, and build your system accordingly.
And Kafka does this in two ways - in the control plane, and in the data plane.
Firstly, for the control plane, it's focused on cluster state (e.g., which brokers are in the cluster, which brokers are leaders for a given partition etc.). Production Apache Kafka clusters use ZooKeeper to maintain this state resiliently in the face of probable failures. (It's worth mentioning that a ZooKeeper replacement called KRaft is under active development in Kafka 3.0+, but still isn't production ready).
ZK is a very well put together piece of software that implements the Paxos consensus algorithm. KRaft is a feature (that is still in development) that uses Kafka brokers running as electors to achieve consensus using the Raft algorithm.
And I'd like to quote the excellent Jepsen analysis of ZooKeeper here, because it's pretty unequivocal: 
Recommendations
Use Zookeeper. It’s mature, well-designed, and battle-tested. 
Paxos and Raft have many subtleties and differences, but from the point of view of someone running a Kafka cluster, they're essentially the same, and the main point of both protocols is this - consensus about the cluster state is achievable as long as a majority of electors are still available. Which can be expressed as Q = 2N + 1, where Q is the number of electors you need to survive N elector failures. (It can also be expressed as N = floor((Q / 2) - 1))
In other words, for a ZK ensemble with 5 members, 2 can be unavailable but consensus can still be maintained, as a majority remains. 7 members means 3 can fail while retaining consensus, and so on and so forth. (These equations also express why running an even number of ZK nodes is a waste of money - a 3 ZK cluster can survive 1 failure, a 4 ZK cluster can survive 1 failure, a 5 ZK cluster can survive 2...)
The next plank of Kafka's resilience is in the data plane, which is where how topic replication factor, minimum in-sync replicas, and producer acknowledgements come in, and I'll discuss those in my next comments. 
Liam
08:10, 08 Mar 22
And I haven't forgotten about the concurrent consumers aspect ;) 
Joshua Summers
08:37, 12 Mar 22
So after learning more about the Paxos consensus algorithm, for those not in the know, it is an algorithm designed to propagate an agreed upon state across a group of machines. It relies on Proposers, which propose a state; Acceptors, which can accept or reject this proposal based on their knowledge of other proposals they have received; and the initial Proposer listening for these acceptance/rejection responses, to discover if they have the "allegiance" of the Acceptors (that there is a majority of Acceptors in acceptance to their proposal). 
I am guessing this is why an odd number of machines is the way to go? It minimises the cost required to maintain the numerical majority of acceptance?
Reading through your last response, this line really stood out:
Firstly, for the control plane, it's focused on cluster state (e.g., which brokers are in the cluster, which brokers are leaders for a given partition etc.)
This is because it's filled with words that I overhear the Kafka-crew use at my work - brokersclusterspartitions
If you don't mind talking at a basic level for a while longer, I would love to know what brokers, clusters and partitions are, and what it means for a broker to lead a partition. 
Liam
03:03, 13 Mar 22
Yep, you're spot on about why Zookeeper is deployed in an odd number of instances - there's no increase in failure resilience until the next odd number, so having 4 ZKs costs more than 3 ZKs, but offers no additional resilience.
On the terms :) So, a Kafka cluster is just a group of 1 to N brokers that know about each other (by asking Zookeeper) and then work together. A Kafka broker accepts connections from producers and consumers, and stores data to / serves data from disk. 
You can easily run a development "cluster" on your laptop with 1 Zookeeper and 1 Kafka broker, or if using KRaft, just a single broker, but running such a cluster in production would make any devop, sysop, or data engineer very nervous indeed! 
Like pretty much every other Big Data adjacent tool or system, clustering is what gives you disaster resilience, and horizontal scaling.
To enable horizontal scaling at the topic level, each topic is divided into partitions. And this is where the partition leader comes in. 
Each partition has 1 to N replicas. One replica will be elected leader, the rest becomes followers, and this means that all producers writing data to that partition of the topic will send that data to the leader, and only the leader, then the followers replicate the data from the leader. 
Consumers, likewise, fetch data from the leader*. So which brokers are leaders for a topic's partitions is cluster metadata both clients and brokers acting as followers for a replica need to know - hence using a Paxos or Raft protocol to ensure that the metadata is also resilient in the face of disaster. 
*Slight caveat here - if a Kafka broker is configured for rack awareness, consumers can be configured to fetch data from the closest replica instead of the leader, if the replica is caught up to the leader.
Hadoop before v2 faced this problem where the Hadoop Distributed File System (HDFS) had a single point of failure - the NameNode, which stored metadata about where files were in HDFS, so if the NameNode failed, the entire cluster was inaccessible.
Joshua Summers
19:10, 20 Mar 22
So I'm still trying to understand the basic structures of Kafka. When you say:
A Kafka broker accepts connections from producers and consumers, and stores data to / serves data from disk. 
I'm trying to understand, is a broker a combination of a Kafka producer and consumer or are these three separate things?
Let's say I have a Rails app with an external service written in Java to transform Word documents into PDFs, and I'm sending data over Kafka from my app to my Java service through protobuf messages. Imagine I'm running it locally with 1 Zookeeper and 1 Kafka broker. When it comes to what exists on the Kafka side, would I have a "producer", a "broker",  and a "consumer" instantiated from "Kafka code"? Or would I simply have a broker, with the producer being my Rails app and the consumer being my Java service?
Also, is a topic something that I would control? For example would I tag my Word document transformation messages with "word-doc-transform" as a topic label? And then I added another external service to transform Excel documents into PDFs, would I then tag these with a different topic label (e.g., "excel-doc-transform")? And if so I guess there is something on the Kafka side that tries to distribute my topics evenly around the brokers in order to maximise the resilience? (So all the "word-doc-transform" messages were not in one location that could crash.) 
Liam
06:47, 23 Mar 22
So those are three separate things. A producer is an application sending data to a Kafka broker using the Kafka protocol, and a consumer is reading that data using the same protocol. The broker is in between the two - it receives the data from the producer, stores it, and serves it to consumers that request it.
So yes, your Rails app is the producer writing to Kafka, and your Java service is the consumer reading from it. 
And yep, topics are something you control - you name them, decide how partitions they have, and how many replicas should exist for each topic partition. When you send data to Kafka you have to tell it which topic you are sending it to. Writes occur at the partition level, but generally you don't need to specify which partition, Kafka clients by default choose partitions for the write - for records with a key, the partition is chosen based on the hash of that key, for records without a key,  the partition is determined in a way to ensure that writes are distributed evenly across partitions in a topic.
The Kafka brokers distributes things at the topic partition replica level, to achieve two things: data resilience, and data availability. 
So, replicas are distributed across brokers for data resilience, so that if a broker fails catastrophically, there's still a copy or copies on other brokers. 
Kafka also spreads replicas to ensure that if a broker is temporarily unavailable the partition is still available for writing to or reading from.
So generally, the safest option is to have a replication factor for a topic that is the same as the number of brokers you have. 
Kafka also chooses which replicas become the leader for the partition in a manner that spread load and storage usage evenly across brokers. 
Joshua Summers
10:14, 09 Apr 22
Got it re the producer and consumer. So the broker; when you say:
The broker is in between the two - it receives the data from the producer, stores it, and serves it to consumers that request it.
Where does the broker store the data? And is this data stored in a persistent way? If I have a Rails app with a PostgreSQL database, would it store the data there? Or does Kafka have its own database and storage location?
And is a broker a 'machine'. By that I mean, when we were discussing the Paxos consensus algorithm, we were talking about there being an odd number of machines. Does each machine host one broker? Or does a machine host a copy of your application and a broker?
I have questions on partitions, but let's get brokers nailed first and then move onto them.
Liam
11:10, 09 Apr 22
Kafka stores records sent via it to disk. Always. And replicated data is stored to disk on other brokers. And yep, all that data is persistent - which is why you need to set retention policies for topics carefully to avoid running out of disk. 
Kafka writes data in its own append only log format. It doesn't use a database.
Generally, Kafka brokers tend to be given their own machine to ensure best performance, as then they're not sharing network bandwidth or disk IO with other applications.
 But there's no requirement to do so, it's just a way to vertically scale your brokers, so you need less of them to service the same load. 
Joshua Summers
23:40, 09 Apr 22
Ok, so going back to horizonal scaling once again. Let's say I want to add two more machines running my application and I want to add another broker. I would need to add three machines. Is that correct?
And my broker would just log transactions to itself and eventually delete them depending on my retention policy? If my broker crashes would it lose the history of all it's logs? Or can they persist after a failure? If so... How could that happen without a database? Is it just writing to one massive log file which is saved on the machine?
Liam
08:20, 10 Apr 22
On the machines, to reiterate, best practice is to give a Kafka broker their own machine/K8s node, as Kafka brokers tend to be limited by two things - network bandwidth, and disk IO. So if they don't have to share disk or network, then you can get more performance out of that broker.
Also, having Kafka brokers on separate machines minimises the impact of disk failure. If you have two Kafka brokers on one machine, they're sharing the file system and the underlying storage volumes - so if those volumes hit an issue, then two of your brokers are affected, not just one. 
So, yes, each broker writes your data down to its log directories. And they run an async proces that removes data based on various policies - retention time (e.g., hold my data for 7 days), retention size (e.g., I want to keep only 10TiB of data, delete data when I breach this), as well as the log compacting policy - I only want to retain the most recent record for a given key. 
If your broker crashes, it's highly unlikely to lose the data log directories unless a disk failure is what caused the crash. And this why Kafka uses replicas - to ensure that a broker losing its hard drives doesn't cause data loss for your cluster. So even if your broker's disks did fail, once they were fixed and the broker joined the cluster again, the first thing it would do was ensure that it copied its lost data from existing replicas of that data.
And so for how it persists, yep, it writes a log for each topic partition to disk, and each topic partition is replicated as many times as you configure that topic's replication factor for. 
These links explain the principles here better than I do :) 
https://kafka.apache.org/documentation/#persistence
https://kafka.apache.org/documentation/#replication
Joshua Summers
19:39, 24 Apr 22
When it comes to replication, what would be a healthy perspective for a Kafka based system to take when it comes to the distribution of replicas? For example, if you had five Kafka brokers, would it be sensible to write replicas to all of them?
And tangentially, is a Kafka broker a "big thing" or a "small thing", as in if I was handling a reasonable amount of network traffic (nothing close to FANG volumes, but just a typical SaaS business with decent traction), would I expect to have a system with 3 brokers, 20 brokers or, say, 100 brokers? What are the factors that determine the decision on when it's time to add or remove a broker?
Follow this Taaalk

4 followers

5,255 views

Start your own Taaalk, leave your details or meet someone to Taaalk with.