官术网_书友最值得收藏!

Scala Kafka consumer

This is the Scala version of the previous program and will work the same as the previous snippet. Kafka allows you to write consumer in many languages including Scala.

import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition
import org.apache.log4j.Logger
import java.util._


object DemoConsumer {
private val log: Logger = Logger.getLogger(classOf[DemoConsumer])

@throws[Exception]
def main(args: Array[String]) {
val topic: String = "test1"
val topicList: List[String] = new ArrayList[String]
topicList.add(topic)
val consumerProperties: Properties = new Properties
consumerProperties.put("bootstrap.servers", "10.200.99.197:6667")
consumerProperties.put("group.id", "Demo_Group")
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") consumerProperties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
consumerProperties.put("enable.auto.commit", "true")
consumerProperties.put("auto.commit.interval.ms", "1000")
consumerProperties.put("session.timeout.ms", "30000")
val demoKafkaConsumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](consumerProperties)
demoKafkaConsumer.subscribe(topicList)
log.info("Subscribed to topic " + topic)
val i: Int = 0
try
while (true) {
val records: ConsumerRecords[String, String] = demoKafkaConsumer.poll(2)
import scala.collection.JavaConversions._
for (record <- records) {
log.info("offset = " + record.offset + "key =" + record.key + "value =" + record.value)
System.out.print(record.value)
}
//TODO : Do processing for data here
demoKafkaConsumer.commitAsync(new OffsetCommitCallback() {
def onComplete(map: Map[TopicPartition, OffsetAndMetadata], e: Exception) {
}
})
}

catch {
case ex: Exception => {
//TODO : Log Exception Here
}
} finally try
demoKafkaConsumer.commitSync()
finally demoKafkaConsumer.close()
}
}

主站蜘蛛池模板: 泰兴市| 韶山市| 盐山县| 张家界市| 固镇县| 耒阳市| 邵东县| 丹巴县| 喜德县| 丹巴县| 恩平市| 桃源县| 娄底市| 吉水县| 清丰县| 辽源市| 新河县| 察隅县| 海淀区| 紫云| 察隅县| 靖西县| 绥棱县| 平利县| 濮阳市| 阿拉善盟| 六盘水市| 获嘉县| 香河县| 青州市| 贵南县| 正阳县| 平罗县| 阿克陶县| 名山县| 昭苏县| 九江市| 观塘区| 信阳市| 华池县| 繁昌县|