栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

基于腾讯云tdmq消息队列封装SpringBootStarter(二)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

基于腾讯云tdmq消息队列封装SpringBootStarter(二)

关于腾讯云tdmq的基本使用参见《基于腾讯云tdmq消息队列封装SpringBootStarter(一)》,这里我们基于之前的内容在次进行优化封装。

一、创建消费者注解(TdmqConsumer)和生产者注解(TdmqProducer)

1.1、基础工程回顾

首先我们回顾下上一章完成的基础功能。

工程目录

上一章我们创建了配置目录config、生产者和消费者目录,以及meta-INF目录和spring.factories配置文件。

在此基础上我们继续完善我们的工程。

1.2、创建注解

在该工程上新建annotation包,并在annotation包下创建TdmqProducer和TdmqConsumer注解。并且在消费者注解TdmqConsumer注解中新增一下属性:topic、clazz、SubscriptionType、consumerName、subscriptionName。

最终消费者注解内容如下:

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface TdmqConsumer {
    
    String topic();

    
    Class clazz() default byte[].class;

    
    SubscriptionType[] subscriptionType() default {};

    
    String consumerName() default "";

    
    String subscriptionName() default "";
}

生产者注解:

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface TdmqProducer {
}

1.3、创建收集器

我们在工程中创建生产者收集器(ProducerCollector)和消费者收集器(ConsumerCollector),创建收集器的目的是在springBoot项目启动中,扫描所有带有TdmqProducer和TdmqConsumer注解的Bean对象,并将其统一管理。

在工程中创建collector包,并在该包下创建ProducerCollector和ConsumerCollector。

在创建消费者收集器前我们需要创建个对象类ConsumerHolder和ProducerCollector,用来绑定注解和实现类的绑定关系。

1.3.1、创建消费者绑定对象

public class ConsumerHolder {
    private final TdmqConsumer annotation;
    private final Method handler;
    private final Object bean;
    private final Class type;

    ConsumerHolder(TdmqConsumer annotation, Method handler, Object bean, Class type) {
        this.annotation = annotation;
        this.handler = handler;
        this.bean = bean;
        this.type = type;
    }

    public TdmqConsumer getAnnotation() {
        return annotation;
    }

    public Method getHandler() {
        return handler;
    }

    public Object getBean() {
        return bean;
    }

    public Class getType() {
        return type;
    }

    public boolean isWrapped() {
        return type.isAssignableFrom(Object.class);
    }
}

1.3.2 生产者绑定对象

public class ProducerHolder {
    private final String topic;
    private final Class clazz;
    private final String serialization;

    public ProducerHolder(String topic, Class clazz, String serialization) {
        this.topic = topic;
        this.clazz = clazz;
        this.serialization = serialization;
    }

    public String getTopic() {
        return topic;
    }

    public Class getClazz() {
        return clazz;
    }

    public String getSerialization() {
        return serialization;
    }

}

1.3.3、创建消费者收集器`ConsumerCollector`.

@Configuration
public class ConsumerCollector implements BeanPostProcessor {
    
    private Map consumers = new ConcurrentHashMap<>();

    
    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) {
        final Class beanClass = bean.getClass();
        // 过滤所有的Bean对象,如果包含TdmqConsumer注解的加入到consumers中
        consumers.putAll(Arrays.stream(beanClass.getDeclaredMethods())
                .filter($ -> $.isAnnotationPresent(TdmqConsumer.class))
                .collect(Collectors.toMap(
                        method -> buildConsumerName(beanClass, method),
                        method -> new ConsumerHolder(method.getAnnotation(TdmqConsumer.class), method, bean,
                                getParameterType(method)))));

        return bean;
    }


    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) {
        return bean;
    }

    public Map getConsumers() {
        return consumers;
    }

    public Optional getConsumer(String methodDescriptor) {
        return Optional.ofNullable(consumers.get(methodDescriptor));
    }

    public static Class getParameterType(Method method) {
        return method.getParameterTypes()[0];
    }

    
    public String buildConsumerName(Class clazz, Method method) {
        return clazz.getName() + method.getName() + Arrays
                .stream(method.getGenericParameterTypes())
                .map(Type::getTypeName)
                .collect(Collectors.joining());
    }
}

1.3.4、创建生产者收集器

@Component
public class ProducerCollector implements BeanPostProcessor, EmbeddedValueResolverAware {

    private final PulsarClient pulsarClient;
    private final TdmqProperties tdmqProperties;

    private final Map producers = new ConcurrentHashMap<>();

    private StringValueResolver stringValueResolver;

    public ProducerCollector(PulsarClient pulsarClient, TdmqProperties tdmqProperties) {
        this.pulsarClient = pulsarClient;
        this.tdmqProperties = tdmqProperties;
    }

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) {
        final Class beanClass = bean.getClass();
        if (beanClass.isAnnotationPresent(TdmqProducer.class) && bean instanceof IProducerFactory) {
            producers.putAll(((IProducerFactory) bean).getTopics().entrySet().stream()
                    .map($ -> new ProducerHolder(
                            stringValueResolver.resolveStringValue($.getKey()),
                            $.getValue().left,
                            $.getValue().right))
                    .collect(Collectors.toMap(ProducerHolder::getTopic, this::buildProducer)));
        }

        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) {
        return bean;
    }

    private Producer buildProducer(ProducerHolder holder) {
        try {
            return pulsarClient.newProducer(getSchema(holder))
                    .topic(buildTopicUrl(holder.getTopic()))
                    .create();
        } catch (PulsarClientException e) {
            throw new RuntimeException(e);
        }
    }

    private  Schema getSchema(ProducerHolder holder) throws RuntimeException {
        return getGenericSchema(holder.getSerialization(), holder.getClazz());
    }

    public Producer getProducer(String topic) {
        return producers.get(stringValueResolver.resolveStringValue(topic));
    }

    @Override
    public void setEmbeddedValueResolver(StringValueResolver stringValueResolver) {
        this.stringValueResolver = stringValueResolver;
    }

    public String buildTopicUrl(String topic) {
        return tdmqProperties.getClusterId() + "/" + tdmqProperties.getEnvironmentId() +
                "/" + topic;
    }


    private static  Schema getGenericSchema(String type, Class clazz) throws RuntimeException {
        switch (type) {
            case "JSON": {
                return Schema.JSON(clazz);
            }
            case "AVRO": {
                return Schema.AVRO(clazz);
            }
            case "STRING": {
                return Schema.STRING;
            }
            default: {
                throw new RuntimeException("Unknown producer schema.");
            }
        }
    }
}

1.4、创建消费者消息处理聚合器

我们通过postProcessBeforeInitialization方法以及将全部带有TdmqConsumer注解的对象收集起来,接下来我们定义个消费者消息处理器,来出来这些Bean对象,这里也是本篇文章的核心内容。

@Component
@DependsOn({"pulsarClient"})
public class ConsumerAggregator implements EmbeddedValueResolverAware {
    private final ConsumerCollector consumerCollector;
    private final PulsarClient pulsarClient;
    private final static SubscriptionType DEFAULT_SUBscriptION_TYPE = SubscriptionType.Exclusive;
    private final TdmqProperties tdmqProperties;
    private StringValueResolver stringValueResolver;
    private List consumers;

    public ConsumerAggregator(ConsumerCollector consumerCollector, PulsarClient pulsarClient, TdmqProperties tdmqProperties) {
        this.consumerCollector = consumerCollector;
        this.pulsarClient = pulsarClient;
        this.tdmqProperties = tdmqProperties;
    }

    
    @EventListener(ApplicationReadyEvent.class)
    public void init() {
        //获取收集器中所有的消费者对象
        consumers = consumerCollector.getConsumers().entrySet().stream()
                .map(holder -> subscribe(holder.getKey(), holder.getValue()))
                .collect(Collectors.toList());
    }

    
    private Consumer subscribe(String generatedConsumerName, ConsumerHolder holder) {
        try {
            //从注解中获取消费名称
            final String consumerName = stringValueResolver.resolveStringValue(holder.getAnnotation().consumerName());
            //从注解中获取订阅名称
            final String subscriptionName = stringValueResolver.resolveStringValue(holder.getAnnotation().subscriptionName());
            //从注解中获取队列topic名称
            final String topicName = stringValueResolver.resolveStringValue(holder.getAnnotation().topic());
            //获取消费者类型--参考官方文档类型说明
            final SubscriptionType subscriptionType = getSubscriptionType(holder);
            //通过pulsarClient构建consumerBuilder
            final ConsumerBuilder consumerBuilder = pulsarClient
                    .newConsumer()
                    .consumerName(consumerName)
                    .subscriptionName(subscriptionName)
                    .topic(buildTopicUrl(topicName))
                    .subscriptionType(subscriptionType)
                    .messageListener((consumer, msg) -> {
                        try {
                            //从绑定关系中获取需执行的方法
                            final Method method = holder.getHandler();
                            method.setAccessible(true);
                            //通过反射执行注解所在的方法,并将监听到的消息作为参数进行传递
                            method.invoke(holder.getBean(), msg.getValue());
                            //消息执行后手动ack消息
                            consumer.acknowledge(msg);
                        } catch (Exception e) {
                            //消息处理执行异常,进行negativeAcknowledge操作
                            consumer.negativeAcknowledge(msg);
                        }
                    });
            buildDeadLetterPolicy(holder, consumerBuilder);
            return consumerBuilder.subscribe();
        } catch (PulsarClientException e) {
            //应该自定义异常,这里暂时不做处理
            throw new RuntimeException(e);
        }
    }

    private SubscriptionType getSubscriptionType(ConsumerHolder holder) {
        SubscriptionType subscriptionType = Arrays.stream(holder.getAnnotation().subscriptionType())
                .findFirst().orElse(null);
        if (subscriptionType == null) {
            subscriptionType = DEFAULT_SUBscriptION_TYPE;
        }
        return subscriptionType;
    }

    public void buildDeadLetterPolicy(ConsumerHolder holder, ConsumerBuilder consumerBuilder) {
        DeadLetterPolicy.DeadLetterPolicyBuilder deadLetterBuilder =
                DeadLetterPolicy.builder().maxRedeliverCount(-1);
    }


    public List getConsumers() {
        return consumers;
    }


    @Override
    public void setEmbeddedValueResolver(StringValueResolver stringValueResolver) {
        this.stringValueResolver = stringValueResolver;
    }

    public String buildTopicUrl(String topic) {
        return tdmqProperties.getClusterId() + "/" + tdmqProperties.getEnvironmentId() +
                "/" + topic;
    }
}

1.5、创建生产者工厂和模版处理类

1.5.1、创建生产者工厂接口`IProducerFactory`

public interface IProducerFactory {
    Map, String>> getTopics();
}

1.5.2、创建工程对象

@TdmqProducer
public class ProducerFactory implements IProducerFactory {

    private final Map, String>> topics = new HashMap<>();

    public ProducerFactory addProducer(String topic) {
        return addProducer(topic, byte[].class, "JSON");
    }

    public ProducerFactory addProducer(String topic, Class clazz) {
        topics.put(topic, new ImmutablePair<>(clazz, "JSON"));
        return this;
    }

    public ProducerFactory addProducer(String topic, Class clazz, String serialization) {
        topics.put(topic, new ImmutablePair<>(clazz, serialization));
        return this;
    }

    @Override
    public Map, String>> getTopics() {
        return topics;
    }
}

1.5.3、构建TdmqTemplate

public class TdmqTemplate {
    private final ProducerCollector producerCollector;

    public TdmqTemplate(ProducerCollector producerCollector) {
        this.producerCollector = producerCollector;
    }

    
    public MessageId send(String topic, T msg) throws PulsarClientException {
        return producerCollector.getProducer(topic).send(msg);
    }

    
    public CompletableFuture sendAsync(String topic, T message) {
        return producerCollector.getProducer(topic).sendAsync(message);
    }

    
    public TypedMessageBuilder createMessage(String topic, T message) {
        return producerCollector.getProducer(topic).newMessage().value(message);
    }
}

1.6、整合生产消息者配置

将生产者和消费者配置到TdmqAutoConfiguration文件中,完整的TdmqAutoConfiguration内容如下:

@Slf4j
@Data
@EnableConfigurationProperties({TdmqProperties.class})
public class TdmqAutoConfiguration {
    
    @Bean
    @ConditionalOnMissingBean(PulsarClient.class)
    @ConditionalOnProperty(name = "tdmq.enable", havingValue = "true")
    public PulsarClient pulsarClient(TdmqProperties mqProperties) throws PulsarClientException {
        log.info("-----------------");
        return PulsarClient.builder()
                .serviceUrl(mqProperties.getServiceUrl())
                .authentication(AuthenticationFactory.token(mqProperties.getToken()))
                .build();
    }

    
    @Bean
    @ConditionalOnMissingBean(ConsumerCollector.class)
    @ConditionalOnProperty(name = "tdmq.enable", havingValue = "true")
    public ConsumerCollector consumerCollector() {
        return new ConsumerCollector();
    }

    
    @Bean
    @ConditionalOnMissingBean(ConsumerAggregator.class)
    @ConditionalOnProperty(name = "tdmq.enable", havingValue = "true")
    public ConsumerAggregator consumerAggregator(ConsumerCollector consumerCollector, PulsarClient pulsarClient, TdmqProperties tdmqProperties) {
        return new ConsumerAggregator(consumerCollector, pulsarClient, tdmqProperties);
    }

    
    @Bean
    @ConditionalOnMissingBean(ProducerCollector.class)
    @ConditionalOnProperty(name = "tdmq.enable", havingValue = "true")
    public ProducerCollector producerCollector(PulsarClient pulsarClient,
                                               TdmqProperties tdmqProperties) {
        return new ProducerCollector(pulsarClient, tdmqProperties);
    }

    
    @Bean
    @ConditionalOnMissingBean(TdmqTemplate.class)
    @ConditionalOnProperty(name = "tdmq.enable", havingValue = "true")
    public TdmqTemplate pulsarTemplate(ProducerCollector producerCollector) {
        return new TdmqTemplate(producerCollector);
    }

}

二、使用案例

我们这里使用自定义的TdmqConsumer和TdmqTemplate来完成一个生产消费者的案例。

2.1、创建生产者配置类

创建生产者配置类ProducerConfiguration,该配置类,主要将消息队列队列名称绑定到ProducerFactory上下文中,我们可以通过TdmqTemplate去直接使用。

@Configuration
public class ProducerConfiguration {
    
    public static final String MESSAGE_LOGGING_TOPIC = "message_logging";

    @Bean
    public ProducerFactory producerFactory() {
        //将队列添加到ProducerFactory上下文中
        return new ProducerFactory()
                .addProducer(MESSAGE_LOGGING_TOPIC, String.class);
    }
}

创建消费者监听

@Slf4j
@Service
public class MessageLoggingListener {
    public static final String MESSAGE_LOGGING_TOPIC = "message_logging";

    @TdmqConsumer(topic = MESSAGE_LOGGING_TOPIC, consumerName = "message_logging", clazz = String.class, subscriptionName = "message_logging_es")
    void consume(String msg) {
        log.info("------------{}", msg);
    }
}

去除之前消费者TdmqConsumer.class.

修改单元测试SpringBootStarterTdmqApplicationTests

@Slf4j
@SpringBootTest
class SpringBootStarterTdmqApplicationTests {
    @Autowired
    private TdmqTemplate proucer;

    @Test
    public void producer() throws PulsarClientException {
        MessageId messageId = proucer.send("message_logging", "发送消息测试");
        log.info("send msg is success Id = {}", messageId);
    }

}

将之前的TdmqProucer改为TdmqTemplate;

启动单元测试:

查看测试结果:

测试结果

三、说明:

1、配置生产者工厂
@Configuration
public class ProducerConfiguration {
    
    public static final String MESSAGE_LOGGING_TOPIC = "message_logging";

    @Bean
    public ProducerFactory producerFactory() {
        //将队列添加到ProducerFactory上下文中
        return new ProducerFactory()
                .addProducer(MESSAGE_LOGGING_TOPIC, String.class);
    }
}

2、创建消费者实现类
@Slf4j
@Service
public class MessageLoggingListener {
    public static final String MESSAGE_LOGGING_TOPIC = "message_logging";

    @TdmqConsumer(topic = MESSAGE_LOGGING_TOPIC, consumerName = "message_logging", clazz = String.class, subscriptionName = "message_logging_es")
    void consume(String msg) {
        log.info("------------{}", msg);
    }
}

主要在方法上增加TdmqConsumer注解。

3、`生产者TdmqTemplate`模版使用
@Slf4j
@SpringBootTest
class SpringBootStarterTdmqApplicationTests {
    @Autowired
    private TdmqTemplate proucer;

    @Test
    public void producer() throws PulsarClientException {
        MessageId messageId = proucer.send("message_logging", "发送消息测试");
        log.info("send msg is success Id = {}", messageId);
    }

}

4、使用配置文件
tdmq:
  enable: true
  serviceUrl: serviceUrl
  token: token
  clusterId: clusterId
  environmentId: environmentId

源码地址:

hongjieWang/spring-boot-starter-tdmq: spring-boot-starter-tdmq (github.com)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/694197.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号