ACL全称access control list,俗称访问控制列表。主要包括如下角色
用户:用户是访问控制的基础要素,也不难理解,RocketMQ ACL 必然也会引入用户的概 念,即支持用户名、密码;资源:需要保护的对象,在 RocketMQ 中,消息发送涉及的 Topic、消息消费涉及的 消费组,应该进行保护,故可以抽象成资源; 权限:针对资源,能进行的操作;角色:RocketMQ 中,只定义两种是否是管理员另外,RocketMQ ACL还支持按照客户端 IP 进行白名单设置。 二、ACL请求基本流程图
三、RocKetMq 开启ACL 1、broker.conf的配置将aclEnable设置为true 开启状态
#此参数控制是否开启密码 aclEnable=true brokerIP1 = xxx.78.1xx.26 namesrvAddr = xxx.78.1xx.26:9876 #此参数控制是否创建会话主题topic autoCreateTopicEnable = true brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER #此参数控制刷盘方式 flushDiskType = ASYNC_FLUSH #消息写入到堆外内存,消费时从pageCache消费,读写分离,提升集群性能 transientStorePoolEnable=true #消息占用物理内存的大小通过accessMessageInMemoryMaxRatio来配置默认为40%; #如果消费的消息不在内存中,开启slaveReadEnable时会从slave节点读取;提高Master内存利用率 slaveReadEnable=true2、配置conf/plain_acl.yml 2.1配置远程地址的白名单
# 远程地址的白名单,就配置集群的机器,保证集群内部的交互不能有影响,配置集群的地址 globalWhiteRemoteAddresses: # - 10.10.103.* - zjj101 - zjj102 - zjj1032.2配置账号和主题权限
accounts: # 用户名和密码 - accessKey: RocketMQ secretKey: 12345678 # 白名单 whiteRemoteAddress: admin: false #默认Topic访问策略是拒绝 defaultTopicPerm: DENY #默认Group访问策略是只允许订阅 defaultGroupPerm: SUB # topic的权限 topicPerms: - topicA=DENY - topicB=PUB|SUB - topicC=SUB # group的权限 groupPerms: # the group should convert to retry topic #topicA拒绝 - groupA=DENY # 只允许订阅,不允许发布消息 - groupB=PUB|SUB #topicC只允许订阅 - groupC=SUB # 第二个账户,只要是来自192.168.1.*的IP,就可以访问所有资源 - accessKey: rocketmq2 secretKey: 12345678 whiteRemoteAddress: 192.168.1.* # if it is admin, it could access all resources admin: trueRocketMQ-Console对接 ACL
官方RocketMQ-Console下载地址:https://github.com/apache/rocketmq-externals.git
因为官网的RocketMQ-Console资源包还没有封装ACL 需要自己手动封装,自己封装了一个
地址为:https://download.csdn.net/download/weixin_44121378/82706084
开启ACL配置
#acl switch #开启配置 默认为false rocketmq.config.aclEnabled=true #设置账号密码 rocketmq.config.accessKey=rocketmq2 rocketmq.config.secretKey=12345678JAVA使用ACL鉴权发送消息
maven依赖:
org.apache.rocketmq
rocketmq-spring-boot
2.2.1
一、测试
Java 获取ACL加密
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("RocketMQ","12345678"));
}
新建一个消费者
* @description: 普通消息消费者(消费同步、异步、单向)
* @author TAO
* @date 2021/1/14 20:51
*/
public class OrdinaryConsumer {
public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(null,"OrdinaryConsumer_group",getAclRPCHook());
//2.指定Nameserver地址
consumer.setNamesrvAddr("x.xx.xxx.xxx:9876");
//3.订阅主题Topic和Tag3
consumer.subscribe("topicB", "*");
//consumer.subscribe("base", "TagB");
//消费所有"*",消费Tag1和Tag2 Tag1 || Tag2
//consumer.subscribe("base", "*");
//设定消费模式:负载均衡|广播模式 默认为负载均衡
//负载均衡10条消息,每个消费者共计消费10条
//广播模式10条消息,每个消费者都消费10条
//consumer.setMessageModel(MessageModel.BROADCASTING);
//4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
int num=0;
//接受消息内容
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(msg);
num +=1;
System.out.println(num);
//System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
// System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动消费者consumer
consumer.start();
}
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("RocketMQ","12345678"));
}
}
新建一个生产者
public class SyncProducer {
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("SyncProducer_group",getAclRPCHook());
//2.指定Nameserver地址
producer.setNamesrvAddr("x.xx.xxx.xxx:9876");
producer.setMaxMessageSize(4096);
producer.setVipChannelEnabled(false);
//3.启动producer
producer.start();
for (int i = 0; i < 10000; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
Message msg = new Message("topicB", "TagA", ("SyncProducer---Hello World" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//5.发送消息
SendResult result = producer.send(msg,10000);
//发送状态
SendStatus status = result.getSendStatus();
System.out.println("发送结果:" + result);
}
//6.关闭生产者producermeini
producer.shutdown();
}
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("RocketMQ","12345678"));
}
}
二、快捷使用开发
因为导入的rocketmq-spring-boot依赖所以只需要在application中配置对应信息即可,在项目启动中就可以自动注入,并在RocketMQTemplate类自动加入账号密钥封装等
在实际开发中只需要引用RocketMQTemplate类,即可调用发送信息
生产者示例
@Component
public class RocketMqProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
public void syncSendMessage(String destination,String messages){
SendResult sendResult = rocketMQTemplate.syncSend(destination,messages);
}
public void syncSendPayloadMessage(String destination, Object payload){
SendResult sendResult = rocketMQTemplate.syncSend(destination,payload);
}
public void asyncsendMessage(String destination,String msg){
rocketMQTemplate.asyncSend(destination,msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println(throwable);
}
});
}
public void syncSendOrderlyMessage(String destination, Object payload,Object hashKey){
SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination,payload,hashKey.toString());
}
public void asyncSendOrderlyMessage(String destination, Object payload,Object hashKey){
rocketMQTemplate.asyncSendOrderly(destination, payload, hashKey.toString(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(Throwable throwable) {
}
});
}
public void syncSendDelayMessage(String destination, Object payload,Integer delayLevel){
Message> message = MessageBuilder.withPayload(payload).build();
SendResult sendResult = rocketMQTemplate.syncSend(destination,message,3000L,delayLevel);
}
public void asyncSendDelayMessage(String destination, Object payload,Integer delayLevel){
Message> message = MessageBuilder.withPayload(payload).build();
rocketMQTemplate.asyncSend(destination, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(Throwable throwable) {
}
},3000L,delayLevel);
}
public void syncSendBatchMessage(String destination, Collection messages){
SendResult sendResult = rocketMQTemplate.syncSend(destination,messages);
}
public void syncSendBatchMessage(String destination,Object[] messages){
List messageList = Arrays.stream(messages)
.map(msg -> MessageBuilder.withPayload(msg).build())
.collect(Collectors.toList());
SendResult sendResult = rocketMQTemplate.syncSend(destination,messages);
}
public void sendMessageInTransaction(String destination, Message> message,Object arg){
SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination,message,arg);
}
public void sendMessageInTransaction(String destination, String msg,Object arg){
Message> message = MessageBuilder.withPayload(msg).build();
SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination,message,arg);
}
消费者示例



