- 一、引入pom.xml依赖
- 二、java实现kafka生产者
- 三、发送消息到指定分区上
二、java实现kafka生产者org.apache.kafka kafka-clients 2.7.2
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Recordmetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class MyProducer {
private final static String TOPIC_NAME = "optics-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
//设置kafka集群的地址
props.put("bootstrap.servers", "10.129.88.26:9092,10.129.88.32:9092,10.129.88.39:9092");
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule " +
"required username="debezium" password="NGFlM2I1NTJlNmFk";");
props.put("security.protocol","SASL_PLAINTEXT");
props.put("sasl.mechanism","PLAIN");
//ack模式,all是最慢但最安全的
props.put("acks", "-1");
//失败重试次数
props.put("retries", 0);
//每个分区未发送消息总字节大小(单位:字节),超过设置的值就会提交数据到服务端
props.put("batch.size", 10);
//props.put("max.request.size",10);
//消息在缓冲区保留的时间,超过设置的值就会被提交到服务端
props.put("linger.ms", 10000);
//整个Producer用到总内存的大小,如果缓冲区满了会提交数据到服务端
//buffer.memory要大于batch.size,否则会报申请内存不足的错误
props.put("buffer.memory", 10240);
//序列化器
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer(props);
//key:作用是决定了往哪个分区上发,value:具体要发送的消息内容
for (int i = 0; i < 10; i++) {
Recordmetadata metadata = producer.send(new ProducerRecord(TOPIC_NAME, Integer.toString(i), "dd:" + i)).get();
System.out.println("同步方式发送消息结果:" + "topic名称:" + metadata.topic() + " | partition分区:" + metadata.partition() + " | offset偏移量:" + metadata.offset());
}
}
}
输出如下所示:
同步方式发送消息结果:topic名称:optics-topic | partition分区:0 | offset偏移量:6 同步方式发送消息结果:topic名称:optics-topic | partition分区:2 | offset偏移量:8 同步方式发送消息结果:topic名称:optics-topic | partition分区:2 | offset偏移量:9 同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:6 同步方式发送消息结果:topic名称:optics-topic | partition分区:0 | offset偏移量:7 同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:7 同步方式发送消息结果:topic名称:optics-topic | partition分区:0 | offset偏移量:8 同步方式发送消息结果:topic名称:optics-topic | partition分区:0 | offset偏移量:9 同步方式发送消息结果:topic名称:optics-topic | partition分区:2 | offset偏移量:10三、发送消息到指定分区上
KafkaProducerproducer = new KafkaProducer (props); //key:作用是决定了往哪个分区上发,value:具体要发送的消息内容 for (int i = 0; i < 10; i++) { Recordmetadata metadata = producer.send(new ProducerRecord (TOPIC_NAME, 1,Integer.toString(i), "dd:" + i)).get(); System.out.println("同步方式发送消息结果:" + "topic名称:" + metadata.topic() + " | partition分区:" + metadata.partition() + " | offset偏移量:" + metadata.offset()); }
输出如下所示:
同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:9 同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:10 同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:11 同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:12 同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:13 同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:14 同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:15 同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:16 同步方式发送消息结果:topic名称:optics-topic | partition分区:1 | offset偏移量:17



