栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

rabbitmq上云实践

rabbitmq上云实践

    由于线上rabbitmq集群出现两次问题,一次由于阿里云机器本身宕机,阿里云自动在另一台机器上启动新的rabbitmq服务,导致新的服务中没有对应的队列等等,从而无法进行正常运转;另一次是因为小伙伴的不正确的编码,导致生产者产生的消息瞬间过多压垮了rabbitmq服务,基于两次问题,决定对rabbitmq进行上云尝试,希望依托阿里云来让rabbitmq的服务更加强劲。

上云方案

    为了实现无感知上云,设计了一套上云流程:

让一台机器的生产者和消费者都连接云上rabbitmq集群,确认云上集群能正常运转。

修改一台机器生产者连接云上,消费者连接云下,让这台机器保持到最后,作为殿后机器,保证云下有消费者消费最后剩余的消息。具体修改如下,增加一个新的mq连接配置,让消费者使用老得连接配置,生产者使用新的连接配置

@Configuration
class NewRabbitMqConfig {
    @Bean("newConnectionFactory")
    fun newConnectionFactory(@Value("%{spring.new-rabbitmq.addresses}") host: String,
                            @Value("%{spring.new-rabbitmq.username}") username: String,
                            @Value("%{spring.new-rabbitmq.password}") password: String): CachingConnectionFactory {
        val factory = CachingConnectionFactory()
        factory.setAddresses(host)
        factory.username = username
        factory.setPassword(password)
        return factory
    }

    @Bean
    fun messageConvert(): MessageConverter {
        val objectMapper = ObjectMapper().registerModules(JavaTimeModule(), KotlinModule(), JodaModule(), Jdk8Module())
        val messageConvert = ContentTypeDelegatingMessageConverter(Jackson2JsonMessageConverter(objectMapper))
        messageConvert.addDelegate("application/x-java-serialized-object", SimpleMessageConverter())
        return messageConvert
    }

    @Bean("newRabbitTemplate")
    fun newRabbitTemplate(@Qualifier("newConnectionFactory")newConnectionFactory: CachingConnectionFactory,messageConvert: MessageConverter): RabbitTemplate {
        val rabbitTemplate =  RabbitTemplate(newConnectionFactory)
        rabbitTemplate.messageConverter = messageConvert
        return rabbitTemplate
    }


    @Bean("connectionFactory")
    fun connectionFactory(@Value("%{spring.rabbitmq.addresses}") host: String,
                        @Value("%{spring.rabbitmq.username}") username: String,
                        @Value("%{spring.rabbitmq.password}") password: String): CachingConnectionFactory {
        val factory = CachingConnectionFactory()
        factory.setAddresses(host)
        factory.username = username
        factory.setPassword(password)
        return factory
    }


    @Bean("rabbitTemplate")
    fun rabbitTemplate(@Qualifier("connectionFactory")connectionFactory: CachingConnectionFactory,messageConvert: MessageConverter): RabbitTemplate {
        val rabbitTemplate =  RabbitTemplate(connectionFactory)
        rabbitTemplate.messageConverter = messageConvert
        return rabbitTemplate
    }

    @Bean("simpleRabbitListenerContainerFactory")
    fun simpleRabbitListenerContainerFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer,
                                            @Qualifier("connectionFactory")connectionFactory: CachingConnectionFactory
    ): SimpleRabbitListenerContainerFactory {
        val listenerFactory = SimpleRabbitListenerContainerFactory()
        configurer.configure(listenerFactory, connectionFactory)
        return listenerFactory
    }

    @Bean("rabbitAdmin")
    fun newRabbitAdmin(@Qualifier("connectionFactory")connectionFactory: CachingConnectionFactory): RabbitAdmin {
        return RabbitAdmin(connectionFactory)
    }


}        

使用这个新的rabbit连接配置来发送rabbitmq消息:

import io.zhudy.roar.admin.props.ServerProps
import io.zhudy.roar.admin.rabbitmq.message.*
import org.slf4j.LoggerFactory
import org.springframework.amqp.rabbit.core.RabbitTemplate
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.stereotype.Component


@Component
class RabbitMQHelper(
    @Qualifier("rabbitTemplate")private val rabbitTemplate: RabbitTemplate,
    @Qualifier("newRabbitTemplate")private val newRabbitTemplate: RabbitTemplate,
    private val serverProps: ServerProps
) {
    private val logger = LoggerFactory.getLogger(RabbitMQHelper::class.java)

    fun welcomeUserMessage(message: WelcomeUserMessage) {
        try {
            newRabbitTemplate.convertAndSend(
                "user",
                "user.welcome.${serverProps.env.name}",
                message
            )
        } catch (e: Exception) {
            logger.error("发布打招呼消息失败!$message", e)
        }
    }
}

陆续迁移其他机器,将他们的生产者和消费者都连接到云上

等云下集群所有的消息都被殿后机器消费后,将殿后机器的消费者连接到云上去(需要注意延时消息是否消费完,可以通过rabbitmq控台的channel中去查看剩余多少没有被消费,另外根据业务场景判断殿后机器到底需要等待多久)

出现的问题

整个上云过程还是蛮顺利的,可是上完之后发现问题了,主要两点:

阿里云的rabbitmq的延时消息,最大延时时常只有1天,而我们的业务场景中存在超过一天延时消息使用场景,如果超过一天,阿里云的rabbitmq消息就会被立马消费掉,当然我们可以基于一些方式解决这个问题,但是需要编码测试发版,也需要较长的时间。阿里云的错误消息重试机制,是存在最大重试次数,超过N次重试,mq就会抛弃消息,这种丢消息的情况对我们现有系统是不能接受的,当然网上也存在合理的解决方案,就是给每个消息创建死信队列,超过最大重试次数后,放入死信队列,等待认为介入,但是对于我们来说给改造代价较大。 结果

    最后基于评估,我们决定还是对自己原本的rabbitmq集群进行软硬件升级后,再重新放回到云下运行,整个下云流程和上云流程是一样的,将云下集群和云上集群反过来就好了。

额外总结

    涉及到延时队列,我们总有场景需要到长延时的使用情况,比如一个优惠券需要几天过期,一个会员卡可能几个月过期,而我们的延时队列就算不像阿里云只有1天的最大值,其实也是有本身的最大值的,即integer有符号整数的最大值代表的毫秒数,大约二十多天。那对于长延时场景如何处理呢,总结下来一般两种方式:

延时队列接力,即当队列进行消费时查看时间是否到了实际需要执行的时间,如果没有的话,重复发一个当前消息的延时消息,这样循环知道最后到了指定消费时间再消费。这种方式的优点是使用的东西相对单一,完全基于mq,编码较容易。缺点是如果消息数很多,就会导致mq集群上堆积过多没有消费的消息(这可能也是为啥阿里云的mq直允许最长1天的延时的原因之一),还有一个缺点就是,如果你需要迁移mq,比如我们这次的上云,对于已经堆积在mq上的长延时消息,你无法处理,只能等他自己到时间被消费。配合不频繁的轮训脚本。将数据落库,等到了接近执行的时间,再由轮训脚本,基于目标时间和当前脚本时间塞到延时队列中,等到了指定时间,由mq消费者进行消费。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/742810.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号