package com.abc.demo; import java.time.Duration; import java.util.Collections; import java.util.Properties; import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; public class KafkaConsumerTest { public static void main(String[] args) throws InterruptedException, ExecutionException{ //Create consumer property String bootstrapServer = "localhost:9092"; String groupId = "my-first-consumer-group"; String topicName = "my-first-topic"; Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); //Create consumer KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); //Subscribe consumer to topic(s) consumer.subscribe(Collections.singleton(topicName)); //Poll for new data while(true){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for(ConsumerRecord<String, String> record: records){ System.out.println(record.key() + record.value()); System.out.println(record.topic() + record.partition() + record.offset()); } //Commit consumer offset manually (recommended) consumer.commitAsync(); } } }
Saturday, 11 May 2019
Kafka Consumer Using Java
Subscribe to:
Post Comments (Atom)
Top CSS Interview Questions
These CSS interview questions are based on my personal interview experience. Likelihood of question being asked in the interview is from to...
-
Hi Readers, click here for updated kafka notes If you are planning or preparing for Apache Kafka Certification then this is the right ...
-
Implementation of Elevator/ Lift has been asked in many interviews. I have tried to implement it using muti-threading and TreeSet. TreeSet...
-
Avro Primitive Types null boolean int (32 bit) long (64 bit) float (32 bit) double (64 bit) byte[] (8 bit) string (char squ...
No comments:
Post a Comment