栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

深入理解kafka原理:java实现kafka生产者

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

深入理解kafka原理:java实现kafka生产者

深入理解kafka原理:java实现kafka生产者
  • 一、引入pom.xml依赖
  • 二、java实现kafka生产者
  • 三、发送消息到指定分区上

一、引入pom.xml依赖
		
			org.apache.kafka
			kafka-clients
			2.7.2
		
二、java实现kafka生产者
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Recordmetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class MyProducer {
    private final static String TOPIC_NAME = "optics-topic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        //设置kafka集群的地址
        props.put("bootstrap.servers", "10.129.88.26:9092,10.129.88.32:9092,10.129.88.39:9092");
        props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule " +
                "required username="debezium" password="NGFlM2I1NTJlNmFk";");
        props.put("security.protocol","SASL_PLAINTEXT");
        props.put("sasl.mechanism","PLAIN");
        //ack模式,all是最慢但最安全的
        props.put("acks", "-1");
        //失败重试次数
        props.put("retries", 0);
        //每个分区未发送消息总字节大小(单位:字节),超过设置的值就会提交数据到服务端
        props.put("batch.size", 10);
        //props.put("max.request.size",10);
        //消息在缓冲区保留的时间,超过设置的值就会被提交到服务端
        props.put("linger.ms", 10000);
        //整个Producer用到总内存的大小,如果缓冲区满了会提交数据到服务端
        //buffer.memory要大于batch.size,否则会报申请内存不足的错误
        props.put("buffer.memory", 10240);
        //序列化器
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer producer = new KafkaProducer(props);
        //key:作用是决定了往哪个分区上发,value:具体要发送的消息内容
        for (int i = 0; i < 10; i++) {
            Recordmetadata metadata = producer.send(new ProducerRecord(TOPIC_NAME, Integer.toString(i), "dd:" + i)).get();
            System.out.println("同步方式发送消息结果:" + "topic名称:" + metadata.topic() + "  | partition分区:" + metadata.partition() + " | offset偏移量:" + metadata.offset());
        }
    }
}

输出如下所示:

同步方式发送消息结果:topic名称:optics-topic  | partition分区:0 | offset偏移量:6
同步方式发送消息结果:topic名称:optics-topic  | partition分区:2 | offset偏移量:8
同步方式发送消息结果:topic名称:optics-topic  | partition分区:2 | offset偏移量:9
同步方式发送消息结果:topic名称:optics-topic  | partition分区:1 | offset偏移量:6
同步方式发送消息结果:topic名称:optics-topic  | partition分区:0 | offset偏移量:7
同步方式发送消息结果:topic名称:optics-topic  | partition分区:1 | offset偏移量:7
同步方式发送消息结果:topic名称:optics-topic  | partition分区:0 | offset偏移量:8
同步方式发送消息结果:topic名称:optics-topic  | partition分区:0 | offset偏移量:9
同步方式发送消息结果:topic名称:optics-topic  | partition分区:2 | offset偏移量:10
三、发送消息到指定分区上
        KafkaProducer producer = new KafkaProducer(props);
        //key:作用是决定了往哪个分区上发,value:具体要发送的消息内容
        for (int i = 0; i < 10; i++) {
            Recordmetadata metadata = producer.send(new ProducerRecord(TOPIC_NAME, 1,Integer.toString(i), "dd:" + i)).get();
            System.out.println("同步方式发送消息结果:" + "topic名称:" + metadata.topic() + "  | partition分区:" + metadata.partition() + " | offset偏移量:" + metadata.offset());
        }

输出如下所示:

同步方式发送消息结果:topic名称:optics-topic  | partition分区:1 | offset偏移量:9
同步方式发送消息结果:topic名称:optics-topic  | partition分区:1 | offset偏移量:10
同步方式发送消息结果:topic名称:optics-topic  | partition分区:1 | offset偏移量:11
同步方式发送消息结果:topic名称:optics-topic  | partition分区:1 | offset偏移量:12
同步方式发送消息结果:topic名称:optics-topic  | partition分区:1 | offset偏移量:13
同步方式发送消息结果:topic名称:optics-topic  | partition分区:1 | offset偏移量:14
同步方式发送消息结果:topic名称:optics-topic  | partition分区:1 | offset偏移量:15
同步方式发送消息结果:topic名称:optics-topic  | partition分区:1 | offset偏移量:16
同步方式发送消息结果:topic名称:optics-topic  | partition分区:1 | offset偏移量:17
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/667085.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号