- 前言
- 一、启动paascloud-provider-opc
- 1、初始化AliyunMqConfiguration
- 1.加载defaultMQPushConsumer()
- 1)AliyunMqTopicConstants.ConsumerTopics.OPT
- 2)consumer.subscribe(topic, tags)
- 3)设置MessageListener监听
- a、confirmReceiveMessage()
- b、joinPoint.proceed();进入修饰方法体
- c、消息消费后进入saveAndConfirmFinishMessage()
- 4)采用多线程启动消费者监听消息
- 二、打开邮箱,查看邮件
- 三、点击邮箱链接进行激活
- 1、sendEmailMq() 创建消息发送体
- 2、activeUser() 激活用户,再往rokectMq发送消息
- 四、登录
前言
上篇注册完成后,但是仍未激活。需要前往邮箱进行激活
查看RocketMq中的消息:
注册register()方法调用完成之后,只是往rocketMq发送了邮件消息体等内容。并没有真正消费消息,往目标邮箱发送邮件。
一、启动paascloud-provider-opc 1、初始化AliyunMqConfigurationAliyunMqConfiguration.java 中,随着spring加载注入到容器
并且注入defaultMQPushConsumer这个bean
@Bean
public DefaultMQPushConsumer defaultMQPushConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(paascloudProperties
.getAliyun().getRocketMq().getConsumerGroup());//CID_OPC
consumer.setNamesrvAddr(paascloudProperties.getAliyun().getRocketMq().getNamesrvAddr()); //localhost:9876
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //从最后一个offset开始消费
//"SEND_SMS_TOPIC" @"REGISTER_USER_AUTH_CODE",SEND_EMAIL_TOPIC" @"FORGOT_PASSWORD_AUTH_CODE"||"ACTIVE_USER"||"ACTIVE_USER_SUCCESS"||"RESET_LOGIN_PWD"||"RESET_USER_EMAIL"
String[] strArray = AliyunMqTopicConstants.ConsumerTopics.OPT.split(GlobalConstant.Symbol.COMMA);
for (String aStrArray : strArray) {
String[] topicArray = aStrArray.split(GlobalConstant.Symbol.AT);
String topic = topicArray[0]; //""SEND_EMAIL_TOPIC"
String tags = topicArray[1];//"FORGOT_PASSWORD_AUTH_CODE"||"ACTIVE_USER"||"ACTIVE_USER_SUCCESS"||"RESET_LOGIN_PWD"||"RESET_USER_EMAIL";
if (PublicUtil.isEmpty(tags)) {
tags = "*";
}
consumer.subscribe(topic, tags);//订阅
log.info("RocketMq OpcPushConsumer topic = {}, tags={}", topic, tags);
}
consumer.registerMessageListener(optPushConsumer); //OptPushMessageListener监听器
consumer.setConsumeThreadMax(2);
consumer.setConsumeThreadMin(2);
taskExecutor.execute(() -> {
try {
Thread.sleep(5000);
consumer.start();
log.info("RocketMq OpcPushConsumer OK.");
} catch (InterruptedException | MQClientException e) {
log.error("RocketMq OpcPushConsumer, 出现异常={}", e.getMessage(), e);
}
});
return consumer;
}
1)AliyunMqTopicConstants.ConsumerTopics.OPT
public static final String OPT = buildOpcConsumerTopics();
private static String buildOpcConsumerTopics() {
List topicObjList = new ArrayList<>();
Set sendSmsTagList = new HashSet<>();
sendSmsTagList.add(MqTagEnum.REGISTER_USER_AUTH_CODE.getTag()); //"REGISTER_USER_AUTH_CODE"
Set sendEmailTagList = new HashSet<>();
sendEmailTagList.add(MqTagEnum.FORGOT_PASSWORD_AUTH_CODE.getTag()); //"FORGOT_PASSWORD_AUTH_CODE"
sendEmailTagList.add(MqTagEnum.ACTIVE_USER.getTag()); //"ACTIVE_USER"
sendEmailTagList.add(MqTagEnum.ACTIVE_USER_SUCCESS.getTag()); //"ACTIVE_USER_SUCCESS"
sendEmailTagList.add(MqTagEnum.RESET_LOGIN_PWD.getTag());//"RESET_LOGIN_PWD"
sendEmailTagList.add(MqTagEnum.RESET_USER_EMAIL.getTag());//"RESET_USER_EMAIL"
topicObjList.add(new TopicObj(MqTopicEnum.SEND_SMS_TOPIC.getTopic(), sendSmsTagList)); //"SEND_SMS_TOPIC" sendSmsTagList
topicObjList.add(new TopicObj(MqTopicEnum.SEND_EMAIL_TOPIC.getTopic(), sendEmailTagList));//"SEND_EMAIL_TOPIC" sendEmailTagList
Set deleteMessageTag = new HashSet<>();
deleteMessageTag.add(MqTagEnum.DELETE_CONSUMER_MESSAGE.getTag());
deleteMessageTag.add(MqTagEnum.DELETE_PRODUCER_MESSAGE.getTag());
topicObjList.add(new TopicObj(MqTopicEnum.TPC_TOPIC.getTopic(), deleteMessageTag));//"TPC_TOPIC" deleteMessageTag
Set mdcMqTag = new HashSet<>();
mdcMqTag.add(MqTagEnum.UPDATE_ATTACHMENT.getTag());
mdcMqTag.add(MqTagEnum.DELETE_ATTACHMENT.getTag());
topicObjList.add(new TopicObj(MqTopicEnum.MDC_TOPIC.getTopic(), mdcMqTag)); //"MDC_TOPIC" mdcMqTag
return buildOpcConsumerTopics(topicObjList);
}
private static String buildOpcConsumerTopics(List topicList) {
//TopicObj["SEND_SMS_TOPIC" ,{"REGISTER_USER_AUTH_CODE"}]
//TopicObj["SEND_EMAIL_TOPIC" ,{"FORGOT_PASSWORD_AUTH_CODE","ACTIVE_USER","ACTIVE_USER_SUCCESS","RESET_LOGIN_PWD","RESET_USER_EMAIL"}]
StringBuilder result = new StringBuilder();
if (!CollectionUtils.isEmpty(topicList)) {
for (TopicObj topicObj : topicList) {
String topic = topicObj.getTopic();
Set tagList = topicObj.getTagList();
if (StringUtils.isEmpty(topic) || CollectionUtils.isEmpty(topicList)) {
continue;
}
StringBuilder tagInfo = new StringBuilder();
for (String tag : tagList) {
tagInfo.append(tag).append(GlobalConstant.Symbol.PIPE);
}
trimEnd(tagInfo, GlobalConstant.Symbol.PIPE);
result.append(topic).append(GlobalConstant.Symbol.AT).append(tagInfo).append(GlobalConstant.Symbol.COMMA);
}
}
trimEnd(result, GlobalConstant.Symbol.COMMA);
return result.toString();
}
简单说就是组装OPC模块下所有的topic和tags
2)consumer.subscribe(topic, tags)随即进行订阅。
3)设置MessageListener监听@Resource private OptPushMessageListener optPushConsumer; consumer.registerMessageListener(optPushConsumer); //OptPushMessageListener监听器
OptPushMessageListener 实现了MessageListenerConcurrently接口,重写consumeMessage方法,并且方法上有@MqConsumerStore注解
@Component
public class OptPushMessageListener implements MessageListenerConcurrently {
@MqConsumerStore
public ConsumeConcurrentlyStatus consumeMessage(List messageExtList, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//.....
}
先进入MqConsumerStoreAspect切面
@Pointcut("@annotation(com.paascloud.provider.annotation.MqConsumerStore)")
public void mqConsumerStoreAnnotationPointcut() {
}
@Around(value = "mqConsumerStoreAnnotationPointcut()")
public Object processMqConsumerStoreJoinPoint(ProceedingJoinPoint joinPoint) throws Throwable {
log.info("processMqConsumerStoreJoinPoint - 线程id={}", Thread.currentThread().getId());
Object result;
long startTime = System.currentTimeMillis();
Object[] args = joinPoint.getArgs(); //获取切面方法的传入参数 List ,ConsumeConcurrentlyContext
MqConsumerStore annotation = getAnnotation(joinPoint);
boolean isStorePreStatus = annotation.storePreStatus(); //true
List messageExtList;
//....
messageExtList = (List) args[0];
MqMessageData dto = this.getTpcMqMessageDto(messageExtList.get(0));
final String messageKey = dto.getMessageKey();
if (isStorePreStatus) {//true
// a、confirmReceiveMessage()
mqMessageService./confirm/iReceiveMessage(consumerGroup, dto);
}
String methodName = joinPoint.getSignature().getName();//注解修饰方法 consumeMessage
try {
//b、joinPoint.proceed();进入修饰方法体
result = joinPoint.proceed();
log.info("result={}", result);
if (CONSUME_SUCCESS.equals(result.toString())) {
//此时返回成功
//c、消息消费后进入saveAndConfirmFinishMessage()
mqMessageService.saveAnd/confirm/iFinishMessage(consumerGroup, messageKey);
}
} catch (Exception e) {
log.error("发送可靠消息, 目标方法[{}], 出现异常={}", methodName, e.getMessage(), e);
throw e;
} finally {
log.info("发送可靠消息 目标方法[{}], 总耗时={}", methodName, System.currentTimeMillis() - startTime);
}
return result;
}
a、confirmReceiveMessage()
此时由于设置了boolean isStorePreStatus = annotation.storePreStatus();默认值就是true,所以进入mqMessageService.confirmReceiveMessage(consumerGroup, dto);方法中。
@Override
@Transactional(rollbackFor = Exception.class)
public void /confirm/iReceiveMessage(String cid, MqMessageData messageData) {
final String messageKey = messageData.getMessageKey();
log.info("confirmReceiveMessage - 消费者={}, 确认收到key={}的消息", cid, messageKey);
// 先保存消息
messageData.setMessageType(MqMessageTypeEnum.CONSUMER_MESSAGE.messageType());//CONSUMER_MESSAGE(20, "消费者");
messageData.setId(UniqueIdGenerator.generateId());
mqMessageDataMapper.insertSelective(messageData);// pc_mq_message_data
Wrapper wrapper = tpcMqMessageFeignApi./confirm/iReceiveMessage(cid, messageKey);
log.info("tpcMqMessageFeignApi.confirmReceiveMessage result={}", wrapper);
if (wrapper == null) {
throw new TpcBizException(ErrorCodeEnum.GL99990002);
}
if (wrapper.error()) {
throw new TpcBizException(ErrorCodeEnum.TPC10050004, wrapper.getMessage(), messageKey);
}
}
第一步使用mqMessageDataMapper将**pc_mq_message_data**表中的数据修改,并且设置消息状态为20,代表是消费者。
第二步使用tpcMqMessageFeignApi远程调用/confirm/iReceiveMessage方法。
通过cid和messageKey查找pc_tpc_mq_/confirm/i确认表是否确实存在,并返回/confirm/iId。最后调用方法更新pc_tpc_mq_/confirm/i状态。
@Override
public void /confirm/iReceiveMessage(final String cid, final String messageKey) {
// 1. 校验cid
// 2. 校验messageKey
// 3. 校验cid 和 messageKey
Long confirmId = tpcMq/confirm/iMapper.getIdMq/confirm/i(cid, messageKey);
// 4. 更新消费信息的状态
// UPDATE pc_tpc_mq_/confirm/i
// SET status = 20, consume_count = consume_count + 1
// WHERe id = #{cid} AND status = 10
tpcMq/confirm/iMapper./confirm/iReceiveMessage(/confirm/iId);
}
总之,/confirm/iReceiveMessage干了两件事,一是将pc_mq_message_data表中的message_type更新为20。二是将pc_tpc_mq_/confirm/i表中的status更新为20,并且将消费者数量+1。
b、joinPoint.proceed();进入修饰方法体@Override @MqConsumerStore public ConsumeConcurrentlyStatus consumeMessage(ListmessageExtList, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt msg = messageExtList.get(0); String body = new String(msg.getBody()); String topicName = msg.getTopic(); String tags = msg.getTags(); String keys = msg.getKeys(); log.info("MQ消费Topic={},tag={},key={}", topicName, tags, keys); ValueOperations ops = srt.opsForValue(); // 控制幂等性使用的key try { MqMessage.checkMessage(body, topicName, tags, keys); String mqKV = null; if (srt.hasKey(keys)) { mqKV = ops.get(keys); } if (PublicUtil.isNotEmpty(mqKV)) { log.error("MQ消费Topic={},tag={},key={}, 重复消费", topicName, tags, keys); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } if (AliyunMqTopicConstants.MqTopicEnum.SEND_SMS_TOPIC.getTopic().equals(topicName)) { optSendSmsTopicService.handlerSendSmsTopic(body, topicName, tags, keys); } //此时topicName就是"SEND_EMAIL_TOPIC" = AliyunMqTopicConstants.MqTopicEnum.SEND_EMAIL_TOPIC.getTopic() if (AliyunMqTopicConstants.MqTopicEnum.SEND_EMAIL_TOPIC.getTopic().equals(topicName)) { optSendEmailTopicService.handlerSendEmailTopic(body, topicName, tags, keys); } if (AliyunMqTopicConstants.MqTopicEnum.TPC_TOPIC.getTopic().equals(topicName)) { mqMessageService.deleteMessageTopic(body, tags); } if (AliyunMqTopicConstants.MqTopicEnum.MDC_TOPIC.getTopic().equals(topicName)) { mdcTopicConsumer.handlerSendSmsTopic(body, topicName, tags, keys); } else { log.info("OPC订单信息消 topicName={} 不存在", topicName); } } catch (IllegalArgumentException ex) { log.error("校验MQ message 失败 ex={}", ex.getMessage(), ex); } catch (Exception e) { log.error("处理MQ message 失败 topicName={}, keys={}, ex={}", topicName, keys, e.getMessage(), e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } ops.set(keys, keys, 10, TimeUnit.DAYS); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }
此时获取到消息内容中,主题Topic就是"SEND_EMAIL_TOPIC",所以就进入handlerSendEmailTopic方法中。
//OptSendEmailTopicConsumer.java
public void handlerSendEmailTopic(String body, String topicName, String tags, String keys) {
MqMessage.checkMessage(body, keys, topicName);
PcSendEmailRequest request;
try {
//解析json为指定emai发送请求体
request = JacksonUtil.parseJson(body, PcSendEmailRequest.class);
} catch (Exception e) {
log.error("发送短信MQ出现异常={}", e.getMessage(), e);
throw new IllegalArgumentException("处理MQ信息,JSON转换异常");
}
String subject = request.getSubject();
String text = request.getText();
Set to = request.getTo();
optSendMailService.sendTemplateMail(subject, text, to);
}
@Resource
private JavaMailSender mailSender;
@Override
public int sendTemplateMail(String subject, String text, Set to) {
log.info("sendTemplateMail - 发送模板邮件. subject={}, text={}, to={}", subject, text, to);
int result = 1;
try {
//构建邮箱发送。使用JavaMailSender 进行发送
MimeMessage mimeMessage = getMimeMessage(subject, text, to);
taskExecutor.execute(() -> mailSender.send(mimeMessage));
} catch (Exception e) {
log.info("sendTemplateMail [FAIL] ex={}", e.getMessage(), e);
result = 0;
}
return result;
}
总之,consumeMessage方法就是真正的消费了消息,发送了邮箱内容。
c、消息消费后进入saveAndConfirmFinishMessage()//MqMessageServiceImpl.java
@Override
public void saveAnd/confirm/iFinishMessage(String cid, String messageKey) {
Wrapper wrapper = tpcMqMessageFeignApi./confirm/iConsumedMessage(cid, messageKey);
//...
}
远程调用tpcMqMessageFeignApi的/confirm/iConsumedMessage方法就是将pc_tpc_mq_/confirm/i表中的status状态改为30,表示已消费。
//TpcMqMessageFeignClient.java
@Override
@ApiOperation(httpMethod = "POST", value = "确认消费消息")
public Wrapper /confirm/iConsumedMessage(@RequestParam("cid") final String cid, @RequestParam("messageKey") final String messageKey) {
//...
tpcMqMessageService./confirm/iConsumedMessage(cid, messageKey);
return WrapMapper.ok();
}
@Override
public void /confirm/iConsumedMessage(final String cid, final String messageKey) {
Long confirmId = tpcMq/confirm/iMapper.getIdMq/confirm/i(cid, messageKey);
//UPDATE pc_tpc_mq_/confirm/i
//SET status = 30
//WHERe id = #{cid} AND status in (10, 20)
tpcMq/confirm/iMapper./confirm/iConsumedMessage(/confirm/iId);
}
4)采用多线程启动消费者监听消息
taskExecutor.execute(() -> {
try {
Thread.sleep(5000);
consumer.start();//开启线程
log.info("RocketMq OpcPushConsumer OK.");
} catch (InterruptedException | MQClientException e) {
log.error("RocketMq OpcPushConsumer, 出现异常={}", e.getMessage(), e);
}
});
二、打开邮箱,查看邮件
邮箱中的http地址是消息体参数中封装的。
//UacUserServiceImpl.java中
@Value("${paascloud.auth.active-user-url}")
private String activeUserUrl;
Map param = Maps.newHashMap();
param.put("loginName", registerDto.getLoginName());
param.put("email", registerDto.getEmail());
param.put("activeUserUrl", activeUserUrl + activeToken);
param.put("dateTime", DateUtil.formatDateTime(new Date()));
//paascloud-provider-uac-dev.yml
paascloud:
auth:
active-user-url: http://paascloud-gateway:7979/uac/auth/activeUser/
三、点击邮箱链接进行激活
http://paascloud-gateway:7979/uac/auth/activeUser/xxxx
通过gateway路由到paascloud-provider-uac模块下的activeUser()方法中
@GetMapping(value = "/activeUser/{activeUserToken}")
@ApiOperation(httpMethod = "POST", value = "激活用户")
public Wrapper activeUser(@PathVariable String activeUserToken) {
uacUserService.activeUser(activeUserToken);
return WrapMapper.ok("激活成功");
}
//UacUserServiceImpl.java
@Override
public void activeUser(String activeUserToken) {
//获取redis中存储的key,用于查redis
//paascloud:activeUser:xxxxxx
String activeUserKey = RedisKeyUtil.getActiveUserKey(activeUserToken);
//通过key查找邮箱地址
String email = redisService.getKey(activeUserKey);
// 修改用户状态, 绑定访客角色
UacUser uacUser = new UacUser();
uacUser.setEmail(email);
//pc_uac_user表中查找,在此之前该用户已经存入了,但是状态为未激活
uacUser = uacUserMapper.selectOne(uacUser);
if (uacUser == null) {
logger.error("找不到用户信息. email={}", email);
throw new UacBizException(ErrorCodeEnum.UAC10011004, email);
}
UacUser update = new UacUser();
update.setId(uacUser.getId());
//设置状态为激活
update.setStatus(UacUserStatusEnum.ENABLE.getKey());
LoginAuthDto loginAuthDto = new LoginAuthDto();
loginAuthDto.setUserId(uacUser.getId());
loginAuthDto.setUserName(uacUser.getLoginName());
loginAuthDto.setLoginName(uacUser.getLoginName());
update.setUpdateInfo(loginAuthDto);
UacUser user = this.queryByUserId(uacUser.getId());
Map param = Maps.newHashMap();
param.put("loginName", user.getLoginName());
param.put("dateTime", DateUtil.formatDateTime(new Date()));
Set to = Sets.newHashSet();
to.add(user.getEmail());
//1、sendEmailMq() 创建消息发送体
MqMessageData mqMessageData = emailProducer.sendEmailMq(to, UacEmailTemplateEnum.ACTIVE_USER_SUCCESS,
AliyunMqTopicConstants.MqTagEnum.ACTIVE_USER_SUCCESS, param);
//2、activeUser() 激活用户,再往rokectMq发送消息
userManager.activeUser(mqMessageData, update, activeUserKey);
}
1、sendEmailMq() 创建消息发送体
emailProducer.sendEmailMq(to, UacEmailTemplateEnum.ACTIVE_USER_SUCCESS, AliyunMqTopicConstants.MqTagEnum.ACTIVE_USER_SUCCESS, param);
此时消息Topic和消息模板都是"ACTIVE_USER_SUCCESS",其他的操作和注册的时候类似。
2、activeUser() 激活用户,再往rokectMq发送消息@MqProducerStore
public void activeUser(final MqMessageData mqMessageData, final UacUser uacUser, final String activeUserKey) {
log.info("激活用户. mqMessageData={}, user={}", mqMessageData, uacUser);
int result = uacUserMapper.updateByPrimaryKeySelective(uacUser);
if (result < 1) {
throw new UacBizException(ErrorCodeEnum.UAC10011038, uacUser.getId());
}
// 绑定一个访客角色默认值roleId=10000
final Long userId = uacUser.getId();
Preconditions.checkArgument(userId != null, "用戶Id不能爲空");
final Long roleId = 10000L;
UacRoleUser roleUser = new UacRoleUser();
roleUser.setUserId(userId);
roleUser.setRoleId(roleId);
uacRoleUserMapper.insertSelective(roleUser);
// 绑定一个组织
UacGroupUser groupUser = new UacGroupUser();
groupUser.setUserId(userId);
//超级管理员组织ID GlobalConstant.Sys.SUPER_MANAGER_GROUP_ID = 1L
groupUser.setGroupId(GlobalConstant.Sys.SUPER_MANAGER_GROUP_ID);
uacGroupUserMapper.insertSelective(groupUser);
// 删除 activeUserToken
redisService.deleteKey(activeUserKey);
}
同样被@MqProducerStore修饰,首先进入MqProducerStoreAspect切面中。和注册逻辑类似。进入被修饰的方法方法体后,有三步。
第一步:设置roleId=10000,通过uacRoleUserMapper在pc_uac_role_user表中插入一条数据,对应该注册激活用户的所属的权限,默认访客。
第二步:设置groupId=1L,通过uacGroupUserMapper在pc_uac_group_user表中插入一条数据,对应该注册激活用户的所属的组,默认超级管理员组。
第三步:删除redis中存的key。
至此,激活用户成功。就能登录了。
四、登录//paascloud-login-websrcviewsauthuser-loginindex.vue
login () {
let loginName = this.loginForm.loginName;
let loginPwd = this.loginForm.loginPwd;
this.$http({
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
'deviceId': this.deviceId
},
url: '/uac/auth/form',
auth: {
username: 'paascloud-client-uac',
password: 'paascloudClientSecret'
},
params: {
username: loginName,
password: loginPwd,
imageCode: this.loginForm.captchaCode
}
}).then((res) => {
this.getImage();
if (res && res.code === 200) {
this.$store.dispatch('update_auth_token', res.result);
window.location.href = this.redirectUri;
}
}).catch((err) => {
console.log(err);
});
},
请求的url: ‘/uac/auth/form’



