- Building Data Streaming Applications with Apache Kafka
- Manish Kumar Chanchal Singh
- 179字
- 2022-07-12 10:38:17
Java Kafka consumer
The following program is a simple Java consumer which consumes data from topic test. Please make sure data is already available in the mentioned topic otherwise no record will be consumed.
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.log4j.Logger;
import java.util.*;
public class DemoConsumer {
private static final Logger log = Logger.getLogger(DemoConsumer.class);
public static void main(String[] args) throws Exception {
String topic = "test1";
List<String> topicList = new ArrayList<>();
topicList.add(topic);
Properties consumerProperties = new Properties();
consumerProperties.put("bootstrap.servers", "localhost:9092");
consumerProperties.put("group.id", "Demo_Group");
consumerProperties.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProperties.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProperties.put("enable.auto.commit", "true");
consumerProperties.put("auto.commit.interval.ms", "1000");
consumerProperties.put("session.timeout.ms", "30000");
KafkaConsumer<String, String> demoKafkaConsumer = new KafkaConsumer<String, String>(consumerProperties);
demoKafkaConsumer.subscribe(topicList);
log.info("Subscribed to topic " + topic);
int i = 0;
try {
while (true) {
ConsumerRecords<String, String> records = demoKafkaConsumer.poll(500);
for (ConsumerRecord<String, String> record : records)
log.info("offset = " + record.offset() + "key =" + record.key() + "value =" + record.value());
//TODO : Do processing for data here
demoKafkaConsumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
}
});
}
} catch (Exception ex) {
//TODO : Log Exception Here
} finally {
try {
demoKafkaConsumer.commitSync();
} finally {
demoKafkaConsumer.close();
}
}
}
}
推薦閱讀
- C及C++程序設(shè)計(第4版)
- Oracle Database In-Memory(架構(gòu)與實踐)
- Java應(yīng)用開發(fā)與實踐
- 程序員考試案例梳理、真題透解與強化訓(xùn)練
- 薛定宇教授大講堂(卷Ⅳ):MATLAB最優(yōu)化計算
- The Computer Vision Workshop
- Java軟件開發(fā)基礎(chǔ)
- Hands-On Swift 5 Microservices Development
- ASP.NET開發(fā)與應(yīng)用教程
- Internet of Things with ESP8266
- Python 3.7從入門到精通(視頻教學(xué)版)
- Machine Learning With Go
- 代碼閱讀
- Python機(jī)器學(xué)習(xí)與量化投資
- Building a Media Center with Raspberry Pi