
Feature Name | Description |
---|---|
Topic |
|
Partitions |
|
Offset |
|
Offset Commit types A consumer may opt to commit offsets by itself (enable.auto.commit=false). Depending on when it chooses to commit offsets, there are delivery semantics available |
|
Keys |
|
Kafka message Kays and values are Serialized (StringSerializer, IntegerSerializer) |
|
Consumers |
|
Broker |
|
Kafka Cluster |
|
Replication | The replication factor is a topic setting and is specified at topic creation time.
|
Acks settings |
|
Kafka Topic Durability & Availability |
|
Zookeeper |
|
Fun Facts |
|
Kafka Idempotent Producer | Producer idempotence ensures that duplicates are not introduced due to unexpected retries. When enable.idempotence is set to true , each producer gets assigned a Producer Id (PID) and the PIDis included every time a producer sends messages to a broker. Additionally, each message gets a monotonically increasing sequence number (different from the offset - used only for protocol purposes). A separate sequence is maintained for each topic partition that a producer sends messages to. On the broker side, on a per partition basis, it keeps track of the largest PID-Sequence Number combination that is successfully written. When a lower sequence number is received, it is discarded.Starting with Kafka 3.0, producer are by default having enable.idempotence=true and acks=all |
package com.im.kafka.expset; import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ProducerImKeys { private static final Logger log = LoggerFactory.getLogger(ProducerImKeys.class.getSimpleName()); public static void main(String[] args) { log.info("Kafka Producer started!!"); // Create producer properties Properties properties = new Properties(); //properties.setProperty("bootstrap.servers", "127.0.0.1:9092"); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //properties.setProperty("partitioner.adaptive.partitioning.enable", "false"); // create producer KafkaProducer<String, String> producer = new KafkaProducer<>(properties); for(int i=0; i<10; i++) { String value = "Hello World "+i; String key = "id_"+i; // create producer record ProducerRecord<String, String> producerRecord = new ProducerRecord<>("im_java", key, value); // send data - asynchronous log.info("key: "+producerRecord.key() + "\n"); producer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception ex) { // executes as record sends or exception thrown if(ex == null) { // record sent with success log.info("Received new metadata \n" + // "Topic: "+ metadata.topic() + "\n" + "partition: "+metadata.partition() + "\n" // "Offset: "+metadata.offset() + "\n" + // "Timestamp: "+metadata.timestamp() ); } else { log.error("Error while producing", ex); } } }); producer.flush(); } // flush and close the producer - so it will wait till the data sent producer.close(); // also flush actually } }
package com.im.kafka.expset; import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ConsumerImShutdown { private static final Logger log = LoggerFactory.getLogger(ConsumerImShutdown.class.getSimpleName()); public static void main(String[] args) { log.info("Kafka Consumer started!!"); String bootstrapServers = "127.0.0.1:9092"; String groupId = "im-consumer3"; String topic = "im_java"; //create configs Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // create consumer KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); // get a reference to current thread final Thread mainThread = Thread.currentThread(); // adding shutdown hook Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { log.info("Detected shutdown, take measures ..."); consumer.wakeup(); // join the main thread try { mainThread.join(); } catch(InterruptedException e) { e.printStackTrace(); } } }); try { // subscribe consumer to topic consumer.subscribe(Arrays.asList(topic)); // poll data while(true) { log.info("Pulling..."); ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for(ConsumerRecord<String, String> record: records) { log.info("Key: "+ record.key() + "\n" + "Value: "+record.value() + "\n"+ "Partition: "+record.partition() + "\n" + "Offset: "+record.offset() ); } } } catch(WakeupException ex) { log.info("Wakeup exception!"); // ignore the rest } catch(Exception ex) { log.info("Unexpected exception!"); } finally { consumer.close(); log.info("Consumer is closed."); } } }
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class KafkaSender { @Autowired private KafkaTemplate<String, String> kafkaTemplate; String kafkaTopic = "java_in_use_topic"; public void send(String message) { kafkaTemplate.send(kafkaTopic, message); } }
// Java Program to Illustrate Kafka Configuration package com.amiya.kafka.apachekafkaconsumer.config; // Importing required classes import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; // Annotations @EnableKafka @Configuration // Class public class KafkaConfig { @Bean public ConsumerFactory<String, String> consumerFactory() { // Creating a Map of string-object pairs Map<String, Object> config = new HashMap<>(); // Adding the Configuration config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id"); config.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(config); } // Creating a Listener public ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory< String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
// Java Program to Illustrate Kafka Consumer package com.amiya.kafka.apachekafkaconsumer.consumer; // Importing required classes import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component // Class public class KafkaConsumer { @KafkaListener(topics = "NewTopic", groupId = "group_id") // Method public void consume(String message) { // Print statement System.out.println("message = " + message); } }