Kafka

Architecture picture
Feature Name Description
Topic
  • any kind of message in any format, and the sequence of all these messages is called a data stream.
  • deleted after one week by default (also called the default message retention period), and this value is configurable
  • Kafka topics are immutable: once data is written to a partition, it cannot be changed
  • The serialization and deserialization format of a topic must not change during a topic lifecycle. If you intend to switch a topic data format (for example from JSON to Avro), it is considered best practice to create a new topic and migrate your applications to leverage that new topic.
Partitions
  • The number of partitions of a topic is specified at the time of topic creation.
  • The offset is an integer value that Kafka adds to each message as it is written into a partition. Each message in a given partition has a unique offset.
  • If a topic has more than one partition, Kafka guarantees the order of messages within a partition, but there is no ordering of messages across partitions.
Offset
  • Starts with 0
  • Unique for each partition
  • Stores in special topic __consumer_offsets
  • each message in a Kafka topic has a partition ID and an offset ID attached to it
  • The process of committing offsets is not done for every message consumed (because this would be inefficient), and instead is a periodic process. controlled by the enable.auto.commit=true property every auto.commit.interval.ms (5 seconds by default) when .poll() is called
  • when a specific offset is committed, all previous messages that have a lower offset are also considered to be committed.
Offset Commit types
A consumer may opt to commit offsets by itself (enable.auto.commit=false). Depending on when it chooses to commit offsets, there are delivery semantics available
  • At most once:

    • Offsets are committed as soon as the message is received.

    • If the processing goes wrong, the message will be lost (it won’t be read again).

  • At least once (usually preferred):

    • Offsets are committed after the message is processed.

    • If the processing goes wrong, the message will be read again.

    • This can result in duplicate processing of messages. Therefore, it is best practice to make sure data processing is idempotent (i.e. processing the same message twice won't produce any undesirable effects

  • Exactly once:

    • This can only be achieved for Kafka topic to Kafka topic workflows using the transactions API. The Kafka Streams API simplifies the usage of that API and enables exactly once using the setting processing.guarantee=exactly_once_v2 (exactly_once on Kafka < 2.5)

    • For Kafka topic to External System workflows, to effectively achieve exactly once, you must use an idempotent consumer.

Keys
  • In case the key (key=null) is not specified by the producer, messages are distributed evenly across partitions in a topic by round-robin
  • If a key is sent (key != null), then all messages that share the same key will always be sent and stored in the same Kafka partition.
  • A key can be anything to identify a message - a string, numeric value, binary value, etc.
  • In the default Kafka partitioner, the keys are hashed using the murmur2 algorithm (could be overridden)
Kafka message Kays and values are Serialized (StringSerializer, IntegerSerializer)
  • Key. Key is optional in the Kafka message and it can be null. A key may be a string, number, or any object and then the key is serialized into binary format.

  • Value. The value represents the content of the message and can also be null. The value format is arbitrary and is then also serialized into binary format.

  • Compression Type. Kafka messages may be compressed. The compression type can be specified as part of the message. Options are none, gzip, lz4, snappy, and zstd

  • Headers. There can be a list of optional Kafka message headers in the form of key-value pairs. It is common to add headers to specify metadata about the message, especially for tracing.

  • Partition + Offset. Once a message is sent into a Kafka topic, it receives a partition number and an offset id. The combination of topic+partition+offset uniquely identifies the message

  • Timestamp. A timestamp is added either by the user or the system in the message.

Consumers
  • can be grouped together as a Kafka consumer group.
  • consumers within the group will coordinate to split the work of reading from different partitions.
  • each topic partition is only assigned to one consumer within a consumer group, but a consumer from a consumer group can be assigned multiple partitions.
  • If there are more consumers than the number of partitions of a topic, then some consumers will remain inactive
  • In case a new consumer is added to a group, consumer group rebalance happens
  • Kafka consumers read by default from the partition leader. since Apache Kafka 2.4, it is possible to configure consumers to read from in-sync replicas instead (usually the closest).
Broker
  • A single Kafka server is called a Kafka Broker. That Kafka broker is a program that runs on the Java Virtual Machine
  • Kafka brokers store data in a directory on the server disk they run on. Each topic-partition receives its own sub-directory with the associated name of the topic.
  • If there are multiple Kafka brokers in a cluster, then partitions for a given topic will be distributed among the brokers evenly, to achieve load balancing and scalability.
Kafka Cluster
  • An ensemble of Kafka brokers working together is called a Kafka cluster. Some clusters may contain just one broker or others may contain three or potentially hundreds of brokers.
  • A broker in a cluster is identified by a unique numeric ID.
  • A client may connect to any broker in the cluster. Every broker in the cluster has metadata that consists of a list of all the brokers in the cluster
  • any broker in the cluster is also called a bootstrap server.
  • In practice, it is common for the Kafka client to reference at least two bootstrap servers in its connection URL, in the case one of them not being available
Replication The replication factor is a topic setting and is specified at topic creation time.
  • A replication factor of 1 means no replication.
  • Factor means on how many brokers one partition is replicated
  • For a given topic-partition, one Kafka broker is designated by the cluster to be responsible for sending and receiving data to clients - leader broker
  • Any other broker that is storing replicated data for that topic-partition is referred to as a replica.
  • each partition has one leader and multiple replicas.
  • In-Sync Replica is a replica that is up to date with the leader broker for a partition. If no - out of sync
  • If the broker fails - new leader is assigned
  • The preferred leader is the designated leader broker for a partition at topic creation time
Acks settings
  • Kafka producers only write data to the current leader broker for a partition.
  • producers must specify a level of acknowledgment acks - message written to a minimum number of replicas before being successful.
  • default Kafka < v3.0, acks=1; Kafka ≷= v3.0, acks=all
  • acks=0 producers consider messages as "written successfully" the moment the message was sent
  • acks=1 - only the leader.
  • acks=all message is accepted by all in-sync replicas
  • The leader checks to see if there are enough in-sync replicas for safely writing the message (controlled by the broker setting min.insync.replicas)
  • The min.insync.replicas can be configured both at the topic and the broker-level.
Kafka Topic Durability & Availability
  • Durability - for a replication factor of N, you can permanently lose up to N-1 brokers and still recover your data.
  • Availability Reads - As long as one partition is up and considered an ISR, the topic will be available for reads
  • Availability Writes - in summary, when acks=all with a replication.factor=N and min.insync.replicas=M we can tolerate N-M brokers going down for topic availability purposes
acks=all and min.insync.replicas=2 is the most popular option for data durability and availability and allows you to withstand at most the loss of one Kafka broker
Zookeeper
  • Zookeeper is used to track cluster state, membership, and leadership
  • Zookeeper is used for metadata management
  • keeps track of which brokers are part of the Kafka cluster
  • determine which broker is the leader of a given partition and topic and perform leader elections
  • stores configurations for topics and permissions
  • sends notifications to Kafka in case of changes (e.g. new topic, broker dies, broker comes up, delete topics, etc.
  • A Zookeeper cluster is called an ensemble. It is recommended to operate the ensemble with an odd number of servers, e.g., 3, 5, 7
  • Kafka 4.x will not have Zookeeper
Zookeeper picture

Fun Facts
  • The Kafka max message size is 1MB (can be changed, need config on kafka, producer and consumer side).
  • When the producer sends messages to a broker, the broker can return either a success or an error code. Retriable errors and Nonretriable errors.
  • The retries setting determines how many times the producer will attempt to send a message before marking it as failed
  • delivery.timeout.ms setting determines how long the producer will attempt to send a message before marking it as failed
  • By default, the producer will wait 100ms between retries, but you can control this using the retry.backoff.ms parameter.
  • Kafka supports two types of compression: producer-side and broker-side.
  • auto.offset.reset - how to read from Topic. latest (default), earliest, none (throw exception if no previous offset is found)
Kafka Idempotent Producer Producer idempotence ensures that duplicates are not introduced due to unexpected retries.
When enable.idempotence is set to true, each producer gets assigned a Producer Id (PID) and the PIDis included every time a producer sends messages to a broker. Additionally, each message gets a monotonically increasing sequence number (different from the offset - used only for protocol purposes). A separate sequence is maintained for each topic partition that a producer sends messages to. On the broker side, on a per partition basis, it keeps track of the largest PID-Sequence Number combination that is successfully written. When a lower sequence number is received, it is discarded.
Starting with Kafka 3.0, producer are by default having enable.idempotence=true and acks=all

Examples

Producer
package com.im.kafka.expset;

import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerImKeys {
	
	private static final Logger log = LoggerFactory.getLogger(ProducerImKeys.class.getSimpleName());

	public static void main(String[] args) {
		
		log.info("Kafka Producer started!!");
		
		// Create producer properties
		Properties properties = new Properties();
		//properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
		properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
		
		properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
		properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
		
		//properties.setProperty("partitioner.adaptive.partitioning.enable", "false");
				
		// create producer
		KafkaProducer<String, String> producer = new KafkaProducer<>(properties);	
		
		for(int i=0; i<10; i++) {			
			String value = "Hello World "+i;
			String key = "id_"+i;
			
			// create producer record
			ProducerRecord<String, String> producerRecord = 
					new ProducerRecord<>("im_java", key, value);
			// send data - asynchronous
			log.info("key: "+producerRecord.key() + "\n");
	
			producer.send(producerRecord, new Callback() {
				
				@Override
				public void onCompletion(RecordMetadata metadata, Exception ex) {
					// executes as record sends or exception thrown
					if(ex == null) {
						// record sent with success
						log.info("Received new metadata \n" + 
							//	"Topic: "+ metadata.topic() + "\n" +
								"partition: "+metadata.partition() + "\n" 
							//	"Offset: "+metadata.offset() + "\n" +
							//	"Timestamp: "+metadata.timestamp()
								);
					} else {
						log.error("Error while producing", ex);
					}
				}
					
			});
			producer.flush();
		}		
		// flush and close the producer - so it will wait till the data sent
		producer.close(); // also flush actually
	}
}
Consumer
package com.im.kafka.expset;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerImShutdown {
	
	private static final Logger log = LoggerFactory.getLogger(ConsumerImShutdown.class.getSimpleName());

	public static void main(String[] args) {		
		log.info("Kafka Consumer started!!");
	
		String bootstrapServers = "127.0.0.1:9092";
		String groupId = "im-consumer3";
		String topic = "im_java";
		
		//create configs
		Properties properties = new Properties();
		properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());		
		
		properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
		properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");		
		
		// create consumer
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
		
		// get a reference to current thread
		final Thread mainThread = Thread.currentThread();
		
		// adding shutdown hook
		Runtime.getRuntime().addShutdownHook(new Thread() {
			public void run() {
				log.info("Detected shutdown, take measures ...");
				consumer.wakeup();
				
				// join the main thread
				try {
					mainThread.join();
				} catch(InterruptedException e) {
					e.printStackTrace();
				}			
			}			
		});
				
		try {		
			// subscribe consumer to topic
			consumer.subscribe(Arrays.asList(topic));		
			// poll data			
			while(true) {
				log.info("Pulling...");
				
				ConsumerRecords<String, String> records = 
						consumer.poll(Duration.ofMillis(1000));
				
				for(ConsumerRecord<String, String> record: records) {
					log.info("Key: "+ record.key() + "\n" +
							"Value: "+record.value() + "\n"+
							"Partition: "+record.partition() + "\n" +
							"Offset: "+record.offset()						
							);
				}				
			}		
		} catch(WakeupException ex) {
			log.info("Wakeup exception!");
			// ignore the rest
		} catch(Exception ex) {
			log.info("Unexpected exception!");
		} finally {
			consumer.close();
			log.info("Consumer is closed.");
		}	
	}
}
Example Kafka Spring Producer
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaSender {
	
	@Autowired
	private KafkaTemplate<String, String> kafkaTemplate;
	
	String kafkaTopic = "java_in_use_topic";
	
	public void send(String message) {
	    
	    kafkaTemplate.send(kafkaTopic, message);
	}
}
Example Kafka Spring Consumer with Config
// Java Program to Illustrate Kafka Configuration

package com.amiya.kafka.apachekafkaconsumer.config;

// Importing required classes
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

// Annotations
@EnableKafka
@Configuration

// Class
public class KafkaConfig {

	@Bean
	public ConsumerFactory<String, String> consumerFactory()
	{

		// Creating a Map of string-object pairs
		Map<String, Object> config = new HashMap<>();

		// Adding the Configuration
		config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
				"127.0.0.1:9092");
		config.put(ConsumerConfig.GROUP_ID_CONFIG,
				"group_id");
		config.put(
			ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
			StringDeserializer.class);
		config.put(
			ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
			StringDeserializer.class);

		return new DefaultKafkaConsumerFactory<>(config);
	}

	// Creating a Listener
	public ConcurrentKafkaListenerContainerFactory
	concurrentKafkaListenerContainerFactory()
	{
		ConcurrentKafkaListenerContainerFactory<
			String, String> factory
			= new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(consumerFactory());
		return factory;
	}
}
// Java Program to Illustrate Kafka Consumer

package com.amiya.kafka.apachekafkaconsumer.consumer;

// Importing required classes
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component

// Class
public class KafkaConsumer {

	@KafkaListener(topics = "NewTopic",
				groupId = "group_id")

	// Method
	public void
	consume(String message)
	{
		// Print statement
		System.out.println("message = " + message);
	}
}