- Building Data Streaming Applications with Apache Kafka
- Manish Kumar Chanchal Singh
- 134字
- 2022-07-12 10:38:18
Scala Kafka consumer
This is the Scala version of the previous program and will work the same as the previous snippet. Kafka allows you to write consumer in many languages including Scala.
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition
import org.apache.log4j.Logger
import java.util._
object DemoConsumer {
private val log: Logger = Logger.getLogger(classOf[DemoConsumer])
@throws[Exception]
def main(args: Array[String]) {
val topic: String = "test1"
val topicList: List[String] = new ArrayList[String]
topicList.add(topic)
val consumerProperties: Properties = new Properties
consumerProperties.put("bootstrap.servers", "10.200.99.197:6667")
consumerProperties.put("group.id", "Demo_Group")
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") consumerProperties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
consumerProperties.put("enable.auto.commit", "true")
consumerProperties.put("auto.commit.interval.ms", "1000")
consumerProperties.put("session.timeout.ms", "30000")
val demoKafkaConsumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](consumerProperties)
demoKafkaConsumer.subscribe(topicList)
log.info("Subscribed to topic " + topic)
val i: Int = 0
try
while (true) {
val records: ConsumerRecords[String, String] = demoKafkaConsumer.poll(2)
import scala.collection.JavaConversions._
for (record <- records) {
log.info("offset = " + record.offset + "key =" + record.key + "value =" + record.value)
System.out.print(record.value)
}
//TODO : Do processing for data here
demoKafkaConsumer.commitAsync(new OffsetCommitCallback() {
def onComplete(map: Map[TopicPartition, OffsetAndMetadata], e: Exception) {
}
})
}
catch {
case ex: Exception => {
//TODO : Log Exception Here
}
} finally try
demoKafkaConsumer.commitSync()
finally demoKafkaConsumer.close()
}
}
推薦閱讀
- 計(jì)算機(jī)網(wǎng)絡(luò)
- LabVIEW2018中文版 虛擬儀器程序設(shè)計(jì)自學(xué)手冊
- 算法零基礎(chǔ)一本通(Python版)
- Java游戲服務(wù)器架構(gòu)實(shí)戰(zhàn)
- Servlet/JSP深入詳解
- INSTANT CakePHP Starter
- 焊接機(jī)器人系統(tǒng)操作、編程與維護(hù)
- Learning ArcGIS for Desktop
- Java Web開發(fā)詳解
- Node.js:來一打 C++ 擴(kuò)展
- Python網(wǎng)絡(luò)爬蟲技術(shù)與應(yīng)用
- Getting Started with Python
- Java Web從入門到精通(第2版)
- 軟件測試分析與實(shí)踐
- Learning Cocos2d-JS Game Development