官术网_书友最值得收藏!

Java Kafka producer example

We have covered different configurations and APIs in previous sections. Let's start coding one simple Java producer, which will help you create your own Kafka producer.

Prerequisite

  • IDE: We recommend that you use a Scala-supported IDE such as IDEA, NetBeans, or Eclipse. We have used JetBrains IDEA:
    https://www.jetbrains.com/idea/download/.
  • Build tool: Maven, Gradle, or others. We have used Maven to build our project.
  • Pom.xml: Add Kafka dependency to the pom file:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.0</version>
</dependency>

Java:

import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class DemoProducer {

public static void main(final String[] args) {
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:9092");
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("acks", "all");
producerProps.put("retries", 1);
producerProps.put("batch.size", 20000);
producerProps.put("linger.ms", 1);
producerProps.put("buffer.memory", 24568545);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(producerProps);

for (int i = 0; i < 2000; i++) {
ProducerRecord data = new ProducerRecord<String, String>("test1", "Hello this is record " + i);
Future<RecordMetadata> recordMetadata = producer.send(data);
}
producer.close();
}
}

Scala:

import java.util.Properties
import org.apache.kafka.clients.producer._

object DemoProducer extends App {
override def main(args: Array[String]): Unit = {

val producerProps = new Properties()
producerProps.put("bootstrap.servers", "localhost:9092")
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
producerProps.put("client.id", "SampleProducer")
producerProps.put("acks", "all")
producerProps.put("retries", new Integer(1))
producerProps.put("batch.size", new Integer(16384))
producerProps.put("linger.ms", new Integer(1))
producerProps.put("buffer.memory", new Integer(133554432))

val producer = new KafkaProducer[String, String](producerProps)

for (a <- 1 to 2000) {
val record: ProducerRecord[String, String] = new ProducerRecord("test1", "Hello this is record"+a)
producer.send(record);
}

producer.close()
}

}

The preceding example is a simple Java producer where we are producing string data without a key. We have also hardcoded the topic name, which probably can be read through configuration file or as an command line input. To understand producer, we have kept it simple. However, we will see good examples in upcoming chapters where we will follow good coding practice.

主站蜘蛛池模板: 顺平县| 阳山县| 华亭县| 香港 | 安平县| 洛宁县| 天柱县| 嘉鱼县| 张家界市| 新兴县| 陇西县| 临潭县| 黄龙县| 华坪县| 宜君县| 万州区| 资阳市| 许昌市| 禹城市| 松原市| 庄河市| 三河市| 安国市| 水富县| 威远县| 当涂县| 嵊州市| 平舆县| 苍山县| 横山县| 固安县| 乐平市| 涪陵区| 秀山| 全州县| 乌审旗| 平乐县| 兴仁县| 高碑店市| 北流市| 元阳县|