官术网_书友最值得收藏!

Java Kafka producer example

We have covered different configurations and APIs in previous sections. Let's start coding one simple Java producer, which will help you create your own Kafka producer.

Prerequisite

  • IDE: We recommend that you use a Scala-supported IDE such as IDEA, NetBeans, or Eclipse. We have used JetBrains IDEA:
    https://www.jetbrains.com/idea/download/.
  • Build tool: Maven, Gradle, or others. We have used Maven to build our project.
  • Pom.xml: Add Kafka dependency to the pom file:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.0</version>
</dependency>

Java:

import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class DemoProducer {

public static void main(final String[] args) {
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:9092");
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("acks", "all");
producerProps.put("retries", 1);
producerProps.put("batch.size", 20000);
producerProps.put("linger.ms", 1);
producerProps.put("buffer.memory", 24568545);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(producerProps);

for (int i = 0; i < 2000; i++) {
ProducerRecord data = new ProducerRecord<String, String>("test1", "Hello this is record " + i);
Future<RecordMetadata> recordMetadata = producer.send(data);
}
producer.close();
}
}

Scala:

import java.util.Properties
import org.apache.kafka.clients.producer._

object DemoProducer extends App {
override def main(args: Array[String]): Unit = {

val producerProps = new Properties()
producerProps.put("bootstrap.servers", "localhost:9092")
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
producerProps.put("client.id", "SampleProducer")
producerProps.put("acks", "all")
producerProps.put("retries", new Integer(1))
producerProps.put("batch.size", new Integer(16384))
producerProps.put("linger.ms", new Integer(1))
producerProps.put("buffer.memory", new Integer(133554432))

val producer = new KafkaProducer[String, String](producerProps)

for (a <- 1 to 2000) {
val record: ProducerRecord[String, String] = new ProducerRecord("test1", "Hello this is record"+a)
producer.send(record);
}

producer.close()
}

}

The preceding example is a simple Java producer where we are producing string data without a key. We have also hardcoded the topic name, which probably can be read through configuration file or as an command line input. To understand producer, we have kept it simple. However, we will see good examples in upcoming chapters where we will follow good coding practice.

主站蜘蛛池模板: 东山县| 肥西县| 南召县| 项城市| 新建县| 会东县| 呼伦贝尔市| 图木舒克市| 宜兰县| 沭阳县| 荣成市| 邢台市| 哈尔滨市| 安远县| 栾川县| 汝城县| 聊城市| 浦北县| 班玛县| 渭源县| 莎车县| 棋牌| 东辽县| 彰化县| 林周县| 沂水县| 静乐县| 高陵县| 迁安市| 行唐县| 宜兰县| 阜阳市| 麦盖提县| 麻江县| 岱山县| 栾城县| 汝城县| 通渭县| 土默特左旗| 清镇市| 宽甸|