栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Kafka消费者发送消息

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Kafka消费者发送消息

异步发送
Properties properties = new Properties();

// 连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

// 手动指定序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        
properties.put("client.username","jdq账号");
properties.put("client.password","jdq密码");

KafkaProducer kafkaProducer = new KafkaProducer(properties);
kafkaProducer.send(new ProducerRecord<>("String","String"));
kafkaProducer.close();

注意,jdq需要指定username和password。

回调异步发送

返回值实际上是等待队列发送的消息,且发送失败后会自动重试,不需要我们手动进行重试

@Slf4j
public class CustomProduct {

    public static void main(String[] args) {

        Properties properties = new Properties();

        // 连接集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");

        // 手动指定序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        properties.put("client.username", "jdq账号");
        properties.put("client.password", "jdq密码");


        KafkaProducer kafkaProducer = new KafkaProducer<>(properties);
        Future send = kafkaProducer.send(new ProducerRecord<>("String", "String"));
        kafkaProducer.send(new ProducerRecord<>("String", "String"), (recordMetadata, e) -> {
            if (e == null) {
                log.info("主题:{},分区:{}",recordMetadata.topic(),recordMetadata.partition());
            }
        });
        kafkaProducer.close();


    }

}
同步发送 

同步发送是指,发送到等待队列中的数据,必须等待消息已经发送到kafka集群中,生产者才会发送下一条消息。即等待队列等待数据累计到batch.size或者时间到达linger.ms之后,通过sender发送到kafka集群中,再根据ack应答的登记,最后由selector决定删除信息还是重试,最后直到清理队列中的消息,消费者再发送下一条数据。

同步发送的标志是让send方法有返回值,即

Future send = kafkaProducer.send(new ProducerRecord<>("String", "String"));
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/887947.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号