- Building Data Streaming Applications with Apache Kafka
- Manish Kumar Chanchal Singh
- 134字
- 2022-07-12 10:38:18
Scala Kafka consumer
This is the Scala version of the previous program and will work the same as the previous snippet. Kafka allows you to write consumer in many languages including Scala.
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition
import org.apache.log4j.Logger
import java.util._
object DemoConsumer {
private val log: Logger = Logger.getLogger(classOf[DemoConsumer])
@throws[Exception]
def main(args: Array[String]) {
val topic: String = "test1"
val topicList: List[String] = new ArrayList[String]
topicList.add(topic)
val consumerProperties: Properties = new Properties
consumerProperties.put("bootstrap.servers", "10.200.99.197:6667")
consumerProperties.put("group.id", "Demo_Group")
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") consumerProperties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
consumerProperties.put("enable.auto.commit", "true")
consumerProperties.put("auto.commit.interval.ms", "1000")
consumerProperties.put("session.timeout.ms", "30000")
val demoKafkaConsumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](consumerProperties)
demoKafkaConsumer.subscribe(topicList)
log.info("Subscribed to topic " + topic)
val i: Int = 0
try
while (true) {
val records: ConsumerRecords[String, String] = demoKafkaConsumer.poll(2)
import scala.collection.JavaConversions._
for (record <- records) {
log.info("offset = " + record.offset + "key =" + record.key + "value =" + record.value)
System.out.print(record.value)
}
//TODO : Do processing for data here
demoKafkaConsumer.commitAsync(new OffsetCommitCallback() {
def onComplete(map: Map[TopicPartition, OffsetAndMetadata], e: Exception) {
}
})
}
catch {
case ex: Exception => {
//TODO : Log Exception Here
}
} finally try
demoKafkaConsumer.commitSync()
finally demoKafkaConsumer.close()
}
}
推薦閱讀
- iOS面試一戰(zhàn)到底
- 劍指JVM:虛擬機(jī)實(shí)踐與性能調(diào)優(yōu)
- C語言程序設(shè)計(jì)基礎(chǔ)與實(shí)驗(yàn)指導(dǎo)
- Java Web及其框架技術(shù)
- 零基礎(chǔ)學(xué)Java程序設(shè)計(jì)
- Visual C++應(yīng)用開發(fā)
- SQL Server 2016 從入門到實(shí)戰(zhàn)(視頻教學(xué)版)
- OpenCV 3 Blueprints
- R Data Science Essentials
- Android Sensor Programming By Example
- Java從入門到精通(視頻實(shí)戰(zhàn)版)
- Spring Boot 2+Thymeleaf企業(yè)應(yīng)用實(shí)戰(zhàn)
- Kotlin程序員面試算法寶典
- Java Web程序員面試筆試寶典
- Python程序設(shè)計(jì)案例教程:從入門到機(jī)器學(xué)習(xí)(微課版)