栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Pulsar 生产和消费Java实战

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Pulsar 生产和消费Java实战

前言 1.生产数据
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.*;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
 
@Slf4j
public class PulsarProducer {
    
    private PulsarClient pulsarClient;
    
    private Producer producer;
    
    private String localClusterUrl = "pulsar://xxx.xxx.xxx.xx:29095";


    public PulsarProducer(PulsarClient pulsarClient, Producer producer) {
        this.pulsarClient = pulsarClient;
        this.producer = producer;
    }

    public PulsarProducer() {
    }

    public PulsarProducer(String topicName) {
        initPulsarClientConnection(topicName);
    }

    
    private void initPulsarClientConnection(String topic) {
        try {
            pulsarClient = PulsarClient
                    .builder()
                    .serviceUrl(localClusterUrl)
                    .build();

            //创建producer
            producer = pulsarClient.newProducer()
                    .topic(topic)
                    // 是否开启批量处理消息,默认true,需要注意的是enableBatching只在异步发送sendAsync生效,同步发送send失效。因此建议生产环境若想使用批处理,则需使用异步发送,或者多线程同步发送
                    .enableBatching(true)
                    // 消息压缩(四种压缩方式:LZ4,ZLIB,ZSTD,SNAPPY),consumer端不用做改动就能消费,开启后大约可以降低3/4带宽消耗和存储(官方测试)
                    .compressionType(CompressionType.LZ4)
                    // 设置将对发送的消息进行批处理的时间段,10ms;可以理解为若该时间段内批处理成功,则一个batch中的消息数量不会被该参数所影响。
                    .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                    // 设置发送超时0s;如果在sendTimeout过期之前服务器没有确认消息,则会发生错误。默认30s,设置为0代表无限制,建议配置为0
                    .sendTimeout(0, TimeUnit.SECONDS)
                    // 批处理中允许的最大消息数。默认1000
                    .batchingMaxMessages(1000)
                    // 设置等待接受来自broker确认消息的队列的最大大小,默认1000
                    .maxPendingMessages(1000)
                    // 设置当消息队列中等待的消息已满时,Producer.send 和 Producer.sendAsync 是否应该block阻塞。默认为false,达到maxPendingMessages后send操作会报错,设置为true后,send操作阻塞但是不报错。建议设置为true
                    .blockIfQueueFull(true)
                    // 向不同partition分发消息的切换频率,默认10ms,可根据batch情况灵活调整
                    .roundRobinRouterBatchingPartitionSwitchFrequency(10)
                    // key_Shared模式要用KEY_baseD,才能保证同一个key的message在一个batch里
                    .batcherBuilder(BatcherBuilder.DEFAULT)
                    .create();
        } catch (PulsarClientException e) {
            log.info("failed to create pulsar connection :{}", e);
        }
    }

    
    public void sendMessage(String data) throws PulsarClientException {
        // 同步消息发送
        syncSend(producer, data);
        // 异步消息发送
//        asyncSend(producer, data);
    }

    
    public void asyncSend(Producer producer, String data) {
        // 异步发送
        CompletableFuture future = producer.sendAsync(data.getBytes());

        // future 执行完成后会将执行结果和执行过程中抛出的异常传入回调方法,如果是正常执行的则传入的异常为null
        future.handle((v, exception) -> {
            if (exception == null) {
                log.info("asynchronous push message is successful");
            } else {
                log.info("asynchronous push failed, error message is " + exception);
            }
            return null;
        });
    }

    
    public void syncSend(Producer producer, String data) throws PulsarClientException {
        // 同步发送
        producer.send(data.getBytes());
    }

    
    public void closeConnection() {
        try {
            // 关闭producer
            producer.close();
            // 关闭client
            pulsarClient.close();
        } catch (PulsarClientException e) {
            throw new RuntimeException(e);
        }
    }
}

2.消费者
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;

import java.util.concurrent.TimeUnit;


public class PulsarConsumer {
    public static void main(String[] args) throws Exception {
        String localClientUrl = "pulsar://xxx.xxx.xx.xx:29095";
        // String localClientUrl = "pulsar://10.26.114.120:6650";
        // 需要订阅的topic name
        String topicName = "persistent://public/default/hsh5-topic";
        // 订阅名
        String subscriptionName = "my-sub";
        consumerPulsarInfo(localClientUrl, topicName, subscriptionName);
    }


    
    public static void consumerPulsarInfo(String localClientUrl, String topicName, String subscriptionName) throws Exception {
        // 构造Pulsar client
        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(localClientUrl).build();
        //创建consumer
        Consumer consumer = pulsarClient.newConsumer()
                .topic(topicName)
                .subscriptionName(subscriptionName)
                // 指定消费模式,包含:Exclusive,Failover,Shared,Key_Shared。默认Exclusive模式
                .subscriptionType(SubscriptionType.Exclusive)
                // 指定从哪里开始消费还有Latest,valueof可选,默认Latest
//                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                // 指定消费失败后延迟多久broker重新发送消息给consumer,默认60s
                .negativeAckRedeliveryDelay(60, TimeUnit.SECONDS)
                .subscribe();
        //消费消息
        while (true) {
            Message message = consumer.receive();
            try {
                System.out.printf("Message received: %s%n", new String(message.getData()));
                consumer.acknowledge(message);
            } catch (Exception e) {
                e.printStackTrace();
                consumer.negativeAcknowledge(message);
            }
        }
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/270487.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号