栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

微服务 Spring Cloud Alibaba 项目搭建(七、RocketMQ 集成)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

微服务 Spring Cloud Alibaba 项目搭建(七、RocketMQ 集成)

RocketMQ介绍

RocketMQ 是一个 队列模型 的消息中间件,具有高性能、高可靠、高实时、分布式 的特点。它是一个采用 Java 语言开发的分布式的消息系统,由阿里巴巴团队开发,在2016年底贡献给 Apache,成为了 Apache 的一个顶级项目。 在阿里内部,RocketMQ 很好地服务了集团大大小小上千个应用,在每年的双十一当天,更有不可思议的万亿级消息通过 RocketMQ 流转。

RocketMQ 特点
  • 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式等特点
  • Producer、Consumer、队列都可以分布式
  • Producer 向一些队列轮流发送消息,队列集合称为 Topic,Consumer 如果做广播消费,则一个 Consumer 实例消费这个 Topic 对应的所有队列,如果做集群消费,则多个 Consumer 实例平均消费这个 Topic 对应的队列集合
  • 能够保证严格的消息顺序
  • 支持拉(pull)和推(push)两种消息模式
  • 高效的订阅者水平扩展能力
  • 实时的消息订阅机制
  • 亿级消息堆积能力
  • 支持多种消息协议,如 JMS、OpenMessaging 等
  • 较少的依赖
    #kafka 、RocketMQ 、RabbitMQ 对比

RocketMQ安装

RocketMQ下载: rocketmq-all-4.8.0-bin-release.zip
1.RocketMQ zip包传入linux服务器

[root@localhost ]# cd usr/local/
[root@localhost local]# rz

2.解压缩

[root@localhost local]# unzip rocketmq-all-4.8.0-bin-release.zip

3.调整启动参数(修改默认启动参数,默认启动的最大内存为4G,比较大,修改小一点,否则如果服务器内存不够会启动失败)

[root@localhost local]# cd rocketmq-all-4.8.0-bin-release/bin
[root@localhost bin]# vim runserver.sh

  • -Xms4g -Xmx4g -Xmn2g 改为 -Xms256m -Xmx256m -Xmn128m

4.调整broker

[root@localhost bin]# vim runbroker.sh

  • -Xms8g -Xmx8g -Xmn4g 改为 -Xms256m -Xmx256m -Xmn128m

5.启动namesrv

[root@localhost bin]# nohup sh mqnamesrv &

6.启动broker,注意ip为公网ip,端口为navmesrv的默认端口9876

[root@localhost bin]# nohup ./mqbroker -n localhost:9876 &

7.检查是否启动成功

[root@localhost bin]# jps -l
  • 如果发现报错bash: jps: 未找到命令… 请更新以下命令
[root@localhost bin]# sudo yum install java-1.8.0-openjdk-devel.x86_64

  • 输入命令 jps -l

  • 关闭 RocketMQ 命令 (此处无需关闭,只用于了解)

./mqshutdown broker
./mqshutdown namesrv

#RocketMQ 控制台安装
1.克隆rocketmq项目

[root@localhost local]# cd /usr/local/
[root@localhost local]# git clone  https://github.com/apache/rocketmq-externals.git
  • 进入rocketmq-externalsrocketmq-consolesrcmainresources 下修改 application.properties 配置文件

  • 配置文件修改如下图

github提供了 Docker 和 非Docker 两种安装方法供其选择,这里使用非Docker方式进行安装

  • 在 rocketmq-externalsrocketmq-console 文件夹下打开控制台,输入以下命令进行maven打包

mvn clean package -Dmaven.test.skip=true

  • 进入 rocketmq-externalsrocketmq-consoletarget 文件夹下打开控制台,输入以下命令进行 jar包启动

java -jar rocketmq-console-ng-2.0.0.jar

  • 打开浏览器访问 localhost:9877,如果报错

  • 开放 10909 01911 9876 端口

firewall-cmd --zone=public --add-port=10909/tcp --permanent
firewall-cmd --zone=public --add-port=10911/tcp --permanent
firewall-cmd --zone=public --add-port=9876/tcp --permanent
systemctl restart firewalld.service
firewall-cmd --reload

  • 验证RocketMQ功能是够正常

1.验证生产消息正常,输入命令

[root@localhost rocketmq-all-4.8.0-bin-release]# export NAMESRV_ADDR=localhost:9876
[root@localhost rocketmq-all-4.8.0-bin-release]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

正常的情况下,会看到一堆的类似于如下的输出,这是生产消息后成功的result:

SendResult [sendStatus=SEND_OK, msgId=7F000001372329453F44466341350068, offsetMsgId=C0A8017600002A9F000000000000674E, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=3], queueOffset=33]

2.验证消费消息正常,执行如下命令:

[root@localhost rocketmq-all-4.8.0-bin-release]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

正常的情况下,会看到一堆的类似于如下的输出,这是消费的消息内容:

ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=localhost.localdomain, queueId=0, storeSize=201, queueOffset=0, sysFlag=0, bornTimestamp=1618387294736, bornHost=/192.168.1.118:34722, storeTimestamp=1618387294743, storeHost=/192.168.1.118:10911, msgId=C0A8017600002A9F0000000000000192, commitLogOffset=402, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=34, CONSUME_START_TIME=1618387666005, UNIQ_KEY=7F00000136FE29453F44466306100001, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId='null'}]]

  • 访问页面 http://localhost:9877 ,出现下图界面,安装成功
RocketMQ 集成 - 生产者
  • gateway下pom.xml文件添加依赖

        
            org.apache.rocketmq
            rocketmq-spring-boot-starter
            2.1.1
        
  • nacos 配置 RocketMQ

rocketmq:
  name-server: 192.168.190.129:9876
  producer:
    # 小坑:必须指定group
    group: test-group
  • common 下创建实体类 MyMessage.class
package com.bi.cloud.pojo;

import lombok.Data;

import java.io.Serializable;
import java.util.Date;

@Data
public class MyMessage implements Serializable {
    private Integer id;
    private String name;
    private String status;
    private Date createTime;
}

  • gateway下创建 TestProducerController.class
package com.bi.cloud.controller;

import com.bi.cloud.pojo.MyMessage;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Date;


@RestController
@RequestMapping("/api/testRocketMQ")
public class TestProducerController {

    
    @Resource
    public RocketMQTemplate rocketMQTemplate;

    @GetMapping("/sendMsg")
    public String testSendMsg() {
        String topic = "test-topic";
        MyMessage message = new MyMessage();
        message.setId(1);
        message.setName("王霄");
        message.setStatus("default");
        message.setCreateTime(new Date());
        // 发送消息
        rocketMQTemplate.convertAndSend(topic, message);

        return "send message success";
    }
}
  • Postman 调用接口

  • 如果报错 请关闭linux防火墙
systemctl stop firewalld
  • 消息发送成功后,可以到RocketMQ的控制台中进行查看:

RocketMQ 集成 - 消费者
  • engine下pom.xml文件添加依赖


        
            org.apache.rocketmq
            rocketmq-spring-boot-starter
            2.1.1
        
  • nacos 配置 RocketMQ

rocketmq:
  name-server: 192.168.190.129:9876
  producer:
    # 小坑:必须指定group
    group: test-group
  • engine 下创建消费者监听器 TestConsumerListener.class

package com.bi.cloud.service.Impl;

import com.alibaba.fastjson.JSON;
import com.bi.cloud.pojo.MyMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;


@Slf4j
@Component
// topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer-group")
public class TestConsumerListener implements RocketMQListener {

    
    @Override
    public void onMessage(MyMessage message) {
        log.info("从test-topic中监听到消息");
        log.info(JSON.toJSONString(message));
    }
}
  • 编写完成后启动项目,由于之前我们已经往队列里发送了消息,所以此时消费者项目一启动,就可以监听到消息并消费,控制台就会输出如下日志:

前往:第八章 Oauth2.0 安全认证子模块集成

参考文献:
https://github.com/apache/rocketmq-externals.git
https://blog.csdn.net/qq_40280582/article/details/111785355
https://zhuhuix.blog.csdn.net/article/details/108866638
https://blog.51cto.com/zero01/2426303

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/830117.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号