需求:需要单独设计一个模块用来封装rabbitmq 其他应用之间引用,做简单少量的配置,即可发送消息,消息的发送要保证可靠的投递
2.组件架构选型消息组件:rabbitMQ
消息可靠性保证:使用elasticjob查询数据库相关表记录来对消息进行可靠投递
相关依赖
1.rabbit- api4.0.0 rabbit-common rabbit-api rabbit-core-producer rabbit-task rabbit-test spring-boot-starter-parent org.springframework.boot 2.1.5.RELEASE cn.xp.rabbitmq.parent rabbitmq-parent1.0-SNAPSHOT rabbit-parent pom UTF-8 8 8 8 3.1.4 1.9.13 1.0.24 2.1.4 20.0 1.2.2 3.3.1 2.4 3.2.2 2.11.0 1.1.26 org.springframework.boot spring-boot-starterorg.springframework.boot spring-boot-starter-testtest org.projectlombok lombokprovided com.google.guava guava${guava.version} commons-fileupload commons-fileupload${commons-fileupload.version} org.apache.commons commons-lang3commons-io commons-io${commons-io.version} com.alibaba fastjson${fastjson.version} com.fasterxml.jackson.core jackson-databindorg.codehaus.jackson jackson-mapper-asl${org.codehaus.jackson.version} com.fasterxml.uuid java-uuid-generator${fasterxml.uuid.version}
1.message消息传输类
package com.xp.rabbitmq.api;
import lombok.Data;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
@Data
public class Message implements Serializable {
private String messageId;
private String messageTopic="";
private String messageRouteKey="";
private Map messageAttribute=new HashMap<>();
private String messageType=MessageType./confirm/i;
private Long messageDelay;
public Message() {
}
public Message(String messageId, String messageTopic, String messageRouteKey, Map messageAttribute, String messageType, Long messageDelay) {
this.messageId = messageId;
this.messageTopic = messageTopic;
this.messageRouteKey = messageRouteKey;
this.messageAttribute = messageAttribute;
this.messageType = messageType;
this.messageDelay = messageDelay;
}
public Message(String messageId, String messageTopic, String messageRouteKey, Map messageAttribute, Long messageDelay) {
this.messageId = messageId;
this.messageTopic = messageTopic;
this.messageRouteKey = messageRouteKey;
this.messageAttribute = messageAttribute;
this.messageDelay = messageDelay;
}
}
2.使用建造者模式构造message—MessageBuilder
package com.xp.rabbitmq.api;
import com.xp.rabbitmq.exception.MessageRuntimeException;
import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
public class MessageBuilder {
private String messageId;
private String messageTopic;
private String messageRouteKey="";
private Map messageAttribute=new HashMap<>();
private String messageType=MessageType./confirm/i;
private Long messageDelay;
public MessageBuilder() {
}
public static MessageBuilder builder(){
return new MessageBuilder();
}
public MessageBuilder withMessageId(String messageId){
this.messageId=messageId;
return this;
}
public MessageBuilder withMessageTopic(String messageTopic){
this.messageTopic=messageTopic;
return this;
}
public MessageBuilder withMessageRouteKey(String messageRouteKey){
this.messageRouteKey=messageRouteKey;
return this;
}
public MessageBuilder withMessageAttributes(Map messageAttribute){
this.messageAttribute=messageAttribute;
return this;
}
public MessageBuilder withMessageAttribute(String key,Object properties){
this.messageAttribute.put(key,properties);
return this;
}
public MessageBuilder withMessageType(String messageType){
this.messageType=messageType;
return this;
}
public MessageBuilder withMessageDelay(Long messageDelay){
this.messageDelay=messageDelay;
return this;
}
public Message build(){
if(StringUtils.isEmpty(this.messageId)){
this.messageId= UUID.randomUUID().toString();
}
if(StringUtils.isEmpty((this.messageTopic))){
throw new MessageRuntimeException("消息主题不能为空");
}
Message message=new Message(messageId,messageTopic,messageRouteKey,messageAttribute,messageType,messageDelay);
return message;
}
}
3.消息发送类MessageProducer
package com.xp.rabbitmq.api;
import com.xp.rabbitmq.exception.MessageRuntimeException;
import java.util.List;
public interface MessageProducer {
void send(Message message) throws MessageRuntimeException;
void send(Message message,SendCallback sendCallback) throws MessageRuntimeException;
void send(List messageList) throws MessageRuntimeException;
void send(List messageList,SendCallback sendCallbackList) throws MessageRuntimeException;
}
4.消息分类MessageType
package com.xp.rabbitmq.api;
public class MessageType {
public static final String RAPID="0";
public static final String ConFIRM="1";
public static final String RELIABLE="2";
}
5.相关异常处理
package com.xp.rabbitmq.exception;
public class MessageException extends Exception{
public MessageException() {
super();
}
public MessageException(String exception) {
super(exception);
}
public MessageException(String exception,Throwable throwable) {
super(exception,throwable);
}
public MessageException(Throwable throwable) {
super(throwable);
}
}
package com.xp.rabbitmq.exception;
public class MessageRuntimeException extends RuntimeException{
public MessageRuntimeException() {
super();
}
public MessageRuntimeException(String exception) {
super(exception);
}
public MessageRuntimeException(String exception,Throwable throwable) {
super(exception,throwable);
}
public MessageRuntimeException(Throwable throwable) {
super(throwable);
}
}
2.rabbit-common
包含消息进行序列化 反序列化处理
1.序列化接口Serializer
package com.xp.rabbit.common.serializer;
public interface Serializer {
//对象序列化为byte数组
byte[] serializerRaw(Object object);
//对象序列化string
String serializer(Object object);
//string反序列化对象
T deserializer(String content);
//数组反序列化对象
T deserializer( byte[] content);
}
2.相关实现 SerializerImpl
package com.xp.rabbit.common.impl;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.xp.rabbit.common.serializer.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class SerializerImpl implements Serializer {
private static final Logger LOGGER= LoggerFactory.getLogger(SerializerImpl.class);
private static final ObjectMapper mapper=new ObjectMapper();
static {
mapper.disable(SerializationFeature.INDENT_OUTPUT);
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
mapper.configure(JsonParser.Feature.ALLOW_COMMENTS,true);
mapper.configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER,true);
mapper.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS,true);
mapper.configure(JsonParser.Feature.ALLOW_NUMERIC_LEADING_ZEROS,true);
mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES,true);
mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS,true);
mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES,true);
}
private final JavaType type;
public SerializerImpl(JavaType type) {
this.type = type;
}
public static SerializerImpl createParametricType(Class> cls){
return new SerializerImpl(mapper.getTypeFactory().constructType(cls));
}
@Override
public byte[] serializerRaw(Object object) {
try{
return mapper.writevalueAsBytes(object);
}catch (JsonProcessingException e){
LOGGER.error("序列化出错",e);
}
return null;
}
@Override
public String serializer(Object object) {
try{
return mapper.writevalueAsString(object);
}catch (JsonProcessingException e){
LOGGER.error("序列化出错",e);
}
return null;
}
@Override
public T deserializer(String content) {
try{
return mapper.readValue(content,type);
}catch (IOException e){
LOGGER.error("反序列化出错",e);
}
return null;
}
@Override
public T deserializer(byte[] content) {
try{
return mapper.readValue(content,type);
}catch (IOException e){
LOGGER.error("反序列化出错",e);
}
return null;
}
}
3.单例工厂模式产生序列化对象
package com.xp.rabbit.common.serializer;
public interface SerializerFactory {
Serializer create();
}
package com.xp.rabbit.common.impl;
import com.xp.rabbit.common.serializer.Serializer;
import com.xp.rabbit.common.serializer.SerializerFactory;
import com.xp.rabbitmq.api.Message;
public class SerializerFactoryImpl implements SerializerFactory {
public static final SerializerFactoryImpl INSTANCE=new SerializerFactoryImpl();
@Override
public Serializer create() {
return SerializerImpl.createParametricType(Message.class);
}
}
4.对象转json类
package com.xp.rabbit.common.util;
import java.util.ArrayList;
import java.util.List;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
public class FastJsonConvertUtil {
private static final SerializerFeature[] featuresWithNullValue = { SerializerFeature.WriteMapNullValue, SerializerFeature.WriteNullBooleanAsFalse,
SerializerFeature.WriteNullListAsEmpty, SerializerFeature.WriteNullNumberAsZero, SerializerFeature.WriteNullStringAsEmpty };
public static T convertJSONToObject(String data, Class clzss) {
try {
T t = JSON.parseObject(data, clzss);
return t;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public static T convertJSONToObject(JSonObject data, Class clzss) {
try {
T t = JSONObject.toJavaObject(data, clzss);
return t;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public static List convertJSONToArray(String data, Class clzss) {
try {
List t = JSON.parseArray(data, clzss);
return t;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public static List convertJSONToArray(List data, Class clzss) {
try {
List t = new ArrayList();
for (JSonObject jsonObject : data) {
t.add(convertJSONToObject(jsonObject, clzss));
}
return t;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public static String convertObjectToJSON(Object obj) {
try {
String text = JSON.toJSonString(obj);
return text;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public static JSonObject convertObjectToJSONObject(Object obj){
try {
JSonObject jsonObject = (JSONObject) JSONObject.toJSON(obj);
return jsonObject;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public static String convertObjectToJSONWithNullValue(Object obj) {
try {
String text = JSON.toJSonString(obj, featuresWithNullValue);
return text;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public static void main(String[] args) {
System.err.println(System.getProperties());
}
}
package com.xp.rabbit.common.mybatis.handler; import java.sql.CallableStatement; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import com.xp.rabbit.common.util.FastJsonConvertUtil; import com.xp.rabbitmq.api.Message; import org.apache.commons.lang3.StringUtils; import org.apache.ibatis.type.baseTypeHandler; import org.apache.ibatis.type.JdbcType; public class MessageJsonTypeHandler extends baseTypeHandler{ @Override public void setNonNullParameter(PreparedStatement ps, int i, Message parameter, JdbcType jdbcType) throws SQLException { ps.setString(i, FastJsonConvertUtil.convertObjectToJSON(parameter)); } @Override public Message getNullableResult(ResultSet rs, String columnName) throws SQLException { String value = rs.getString(columnName); if(null != value && !StringUtils.isBlank(value)) { return FastJsonConvertUtil.convertJSONToObject(rs.getString(columnName), Message.class); } return null; } @Override public Message getNullableResult(ResultSet rs, int columnIndex) throws SQLException { String value = rs.getString(columnIndex); if(null != value && !StringUtils.isBlank(value)) { return FastJsonConvertUtil.convertJSONToObject(rs.getString(columnIndex), Message.class); } return null; } @Override public Message getNullableResult(CallableStatement cs, int columnIndex) throws SQLException { String value = cs.getString(columnIndex); if(null != value && !StringUtils.isBlank(value)) { return FastJsonConvertUtil.convertJSONToObject(cs.getString(columnIndex), Message.class); } return null; } }
package com.xp.rabbit.common.convert;
import com.google.common.base.Preconditions;
import com.xp.rabbit.common.serializer.Serializer;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
public class GenericMessageConvert implements MessageConverter {
private Serializer serializer;
public GenericMessageConvert(Serializer serializer) {
Preconditions.checkNotNull(serializer);
this.serializer = serializer;
}
@Override
public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
return new Message(serializer.serializerRaw(o),messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
return serializer.deserializer(message.getBody());
}
}
代理模式创建自己的json转换器
package com.xp.rabbit.common.convert;
import com.google.common.base.Preconditions;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
public class RabbitMessageConvert implements MessageConverter {
private GenericMessageConvert delegate;
public RabbitMessageConvert(GenericMessageConvert delegate) {
Preconditions.checkNotNull(delegate);
this.delegate = delegate;
}
@Override
public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
//可以增加过期时间设置
return this.delegate.toMessage(o,messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
return (com.xp.rabbitmq.api.Message ) this.delegate.fromMessage(message);
}
}
3.rabbit-core-producer
这个模块主要是消息投递
1.配置spring.factories实现自动加载 RabbitProducerAutoConfiguration
# Auto Configure org.springframework.boot.autoconfigure.EnableAutoConfiguration= com.xp.rabbit.producer.autoconfigure.RabbitProducerAutoConfiguration
package com.xp.rabbit.producer.autoconfigure;
import com.xp.rabbit.task.annotation.EnableElasticJob;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@EnableElasticJob
@Configuration
@ComponentScan({"com.xp.rabbit.producer.*"})
public class RabbitProducerAutoConfiguration {
}
2.rabbit-producer-message.properties
rabbit.producer.druid.type=com.alibaba.druid.pool.DruidDataSource rabbit.producer.druid.jdbc.url=jdbc:mysql://ip:port/broker_message?characterEncoding=UTF-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useUnicode=true&serverTimezone=GMT rabbit.producer.druid.jdbc.driver-class-name=com.mysql.jdbc.Driver rabbit.producer.druid.jdbc.username= rabbit.producer.druid.jdbc.password= rabbit.producer.druid.jdbc.initialSize=5 rabbit.producer.druid.jdbc.minIdle=1 rabbit.producer.druid.jdbc.maxActive=100 rabbit.producer.druid.jdbc.maxWait=60000 rabbit.producer.druid.jdbc.timeBetweenEvictionRunsMillis=60000 rabbit.producer.druid.jdbc.minEvictableIdleTimeMillis=300000 rabbit.producer.druid.jdbc.validationQuery=SELECt 1 FROM DUAL rabbit.producer.druid.jdbc.testWhileIdle=true rabbit.producer.druid.jdbc.testonBorrow=false rabbit.producer.druid.jdbc.testonReturn=false rabbit.producer.druid.jdbc.poolPreparedStatements=true rabbit.producer.druid.jdbc.maxPoolPreparedStatementPerConnectionSize= 20 rabbit.producer.druid.jdbc.filters=stat,wall,log4j rabbit.producer.druid.jdbc.useGlobalDataSourceStat=true
3.引入消息中间类 落库保证可靠性投递 BrokerMessage
package com.xp.rabbit.producer.entity;
import com.xp.rabbitmq.api.Message;
import java.io.Serializable;
import java.util.Date;
public class BrokerMessage implements Serializable {
private static final long serialVersionUID = 7447792462810110841L;
private String messageId;
private Message message;
private Integer tryCount = 0;
private String status;
private Date nextRetry;
private Date createTime;
private Date updateTime;
public String getMessageId() {
return messageId;
}
public void setMessageId(String messageId) {
this.messageId = messageId == null ? null : messageId.trim();
}
public Message getMessage() {
return message;
}
public void setMessage(Message message) {
this.message = message;
}
public Integer getTryCount() {
return tryCount;
}
public void setTryCount(Integer tryCount) {
this.tryCount = tryCount;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status == null ? null : status.trim();
}
public Date getNextRetry() {
return nextRetry;
}
public void setNextRetry(Date nextRetry) {
this.nextRetry = nextRetry;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
}
mapper
package com.xp.rabbit.producer.mapper;
import com.xp.rabbit.producer.entity.BrokerMessage;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.Date;
import java.util.List;
@Mapper
public interface BrokerMessageMapper {
int deleteByPrimaryKey(String messageId);
int insert(BrokerMessage record);
int insertSelective(BrokerMessage record);
BrokerMessage selectByPrimaryKey(String messageId);
int updateByPrimaryKeySelective(BrokerMessage record);
int updateByPrimaryKeyWithBLOBs(BrokerMessage record);
int updateByPrimaryKey(BrokerMessage record);
void changeBrokerMessageStatus(@Param("brokerMessageId")String brokerMessageId, @Param("brokerMessageStatus")String brokerMessageStatus, @Param("updateTime")Date updateTime);
List queryBrokerMessageStatus4Timeout(@Param("brokerMessageStatus")String brokerMessageStatus);
List queryBrokerMessageStatus(@Param("brokerMessageStatus")String brokerMessageStatus);
int update4TryCount(@Param("brokerMessageId")String brokerMessageId, @Param("updateTime") Date updateTime);
}
xml
message_id, message, try_count, status, next_retry, create_time, update_time
service
package com.xp.rabbit.producer.service;
import com.xp.rabbit.producer.entity.BrokerMessage;
import org.apache.ibatis.annotations.Param;
import java.util.Date;
import java.util.List;
public interface BrokerMessageService {
//插入记录
void insertBrokerMessage(BrokerMessage brokerMessage);
//成功修改记录
void updateSuccessBrokerMessage(String brokerMessageId);
//成功修改记录
void updateFailBrokerMessage(String brokerMessageId);
//通过id查询是否有记录
BrokerMessage selectBrokerMessageById(String brokerMessageId);
//查询发送失败的记录
List queryBrokerMessageStatus4Timeout(String brokerMessageStatus);
//更新重试次数
int update4TryCount(String brokerMessageId);
}
package com.xp.rabbit.producer.service.impl;
import com.xp.rabbit.producer.constant.BrokerStatus;
import com.xp.rabbit.producer.entity.BrokerMessage;
import com.xp.rabbit.producer.mapper.BrokerMessageMapper;
import com.xp.rabbit.producer.service.BrokerMessageService;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.List;
@Service
public class BrokerMessageServiceImpl implements BrokerMessageService {
@Autowired
private BrokerMessageMapper brokerMessageMapper;
@Override
public void insertBrokerMessage(BrokerMessage brokerMessage) {
brokerMessageMapper.insert(brokerMessage);
}
@Override
public void updateSuccessBrokerMessage(String brokerMessageId) {
brokerMessageMapper.changeBrokerMessageStatus(brokerMessageId,BrokerStatus.SEND_FAIL.getCode(),new Date() );
}
@Override
public void updateFailBrokerMessage(String brokerMessageId) {
brokerMessageMapper.changeBrokerMessageStatus(brokerMessageId, BrokerStatus.SEND_FAIL.getCode(), new Date() );
}
@Override
public BrokerMessage selectBrokerMessageById(String brokerMessageId) {
return brokerMessageMapper.selectByPrimaryKey(brokerMessageId);
}
@Override
public List queryBrokerMessageStatus4Timeout(String brokerMessageStatus) {
return brokerMessageMapper.queryBrokerMessageStatus4Timeout(brokerMessageStatus);
}
@Override
public int update4TryCount(String brokerMessageId) {
return brokerMessageMapper.update4TryCount(brokerMessageId,new Date());
}
}
4.操作数据库配置
package com.xp.rabbit.producer.config.database;
import javax.sql.DataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.jdbc.datasource.init.DataSourceInitializer;
import org.springframework.jdbc.datasource.init.DatabasePopulator;
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
@Configuration
public class BrokerMessageConfiguration {
@Autowired
private DataSource rabbitProducerDataSource;
@Value("classpath:rabbit-producer-message-schema.sql")
private Resource schemascript;
@Bean
public DataSourceInitializer initDataSourceInitializer() {
System.err.println("--------------rabbitProducerDataSource-----------:" + rabbitProducerDataSource);
final DataSourceInitializer initializer = new DataSourceInitializer();
initializer.setDataSource(rabbitProducerDataSource);
initializer.setDatabasePopulator(databasePopulator());
return initializer;
}
private DatabasePopulator databasePopulator() {
final ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
populator.addscript(schemascript);
return populator;
}
}
package com.xp.rabbit.producer.config.database;
import java.sql.SQLException;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.PropertySource;
@Configuration
@PropertySource({"classpath:rabbit-producer-message.properties"})
public class RabbitProducerDataSourceConfiguration {
private static Logger LOGGER = org.slf4j.LoggerFactory.getLogger(RabbitProducerDataSourceConfiguration.class);
@Value("${rabbit.producer.druid.type}")
private Class extends DataSource> dataSourceType;
@Bean(name = "rabbitProducerDataSource")
@Primary
@ConfigurationProperties(prefix = "rabbit.producer.druid.jdbc")
public DataSource rabbitProducerDataSource() throws SQLException {
DataSource rabbitProducerDataSource = DataSourceBuilder.create().type(dataSourceType).build();
LOGGER.info("============= rabbitProducerDataSource : {} ================", rabbitProducerDataSource);
return rabbitProducerDataSource;
}
public DataSourceProperties primaryDataSourceProperties(){
return new DataSourceProperties();
}
public DataSource primaryDataSource(){
return primaryDataSourceProperties().initializeDataSourceBuilder().build();
}
}
package com.xp.rabbit.producer.config.database;
import javax.annotation.Resource;
import javax.sql.DataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
@Configuration
@AutoConfigureAfter(value = {RabbitProducerDataSourceConfiguration.class})
public class RabbitProducerMyBatisConfiguration {
@Resource(name= "rabbitProducerDataSource")
private DataSource rabbitProducerDataSource;
@Bean(name="rabbitProducerSqlSessionFactory")
public SqlSessionFactory rabbitProducerSqlSessionFactory(DataSource rabbitProducerDataSource) {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(rabbitProducerDataSource);
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
try {
bean.setMapperLocations(resolver.getResources("classpath:mapper
public interface BrokerConstant {
public final Integer TRY_TIME=1;
//最大发送次数
public final Integer MAX_TRY_TIME=3;
}
package com.xp.rabbit.producer.constant;
public enum BrokerStatus {
//发送中
SENDING("0"),
//发送成功
SEND_OK("1"),
//发送失败
SEND_FAIL("2");
private String code;
private String sendMessages;
BrokerStatus(String code) {
this.code = code;
}
public String getCode(){
return this.code;
}
}
6.生产者实现
package com.xp.rabbit.producer.broker;
import com.xp.rabbitmq.api.Message;
import com.xp.rabbitmq.api.SendCallback;
public interface RabbitBroker {
void onRapid(Message message);
void on/confirm/i(Message message);
void onReliable(Message message);
void onListSend();
SendCallback on/confirm/iToCallBack(Message message);
}
package com.xp.rabbit.producer.broker;
import com.xp.rabbit.producer.constant.BrokerConstant;
import com.xp.rabbit.producer.constant.BrokerStatus;
import com.xp.rabbit.producer.entity.BrokerMessage;
import com.xp.rabbit.producer.service.BrokerMessageService;
import com.xp.rabbitmq.api.Message;
import com.xp.rabbitmq.api.MessageType;
import com.xp.rabbitmq.api.SendCallback;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.List;
@Component
@Slf4j
public class RabbitBrokerImpl implements RabbitBroker{
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private BrokerMessageService brokerMessageService;
@Override
public void onRapid(Message message) {
sendKernel(message);
}
@Override
public void on/confirm/i(Message message) {
message.setMessageType(MessageType./confirm/i);
sendKernel(message);
}
@Override
public void onReliable(Message message) {
message.setMessageType(MessageType.RELIABLE);
//查询是否有相关记录
BrokerMessage brokerMessageInRecord = brokerMessageService.selectBrokerMessageById(message.getMessageId());
if(brokerMessageInRecord==null){
//构建重发记录
Date now = new Date();
BrokerMessage brokerMessage = new BrokerMessage();
brokerMessage.setMessage(message);
brokerMessage.setStatus(BrokerStatus.SENDING.getCode());
brokerMessage.setMessageId(message.getMessageId());
brokerMessage.setCreateTime(now);
brokerMessage.setUpdateTime(now);
brokerMessage.setNextRetry(DateUtils.addMinutes(now, BrokerConstant.TRY_TIME));
brokerMessageService.insertBrokerMessage(brokerMessage);
}
sendKernel(message);
}
@Override
public void onListSend() {
List messages = MessageHolder.clear();
messages.forEach(message -> {
MessageHolderAyncQueue.submit((Runnable) () -> {
CorrelationData correlationData =
new CorrelationData(String.format("%s#%s#%s",
message.getMessageId(),
System.currentTimeMillis(),
message.getMessageType()));
String topic = message.getMessageTopic();
String routingKey = message.getMessageRouteKey();
rabbitTemplate.convertAndSend(topic, routingKey, message, correlationData);
log.info("#RabbitBrokerImpl.sendMessages# send to rabbitmq, messageId: {}", message.getMessageId());
});
});
}
@Override
public SendCallback on/confirm/iToCallBack(Message message) {
return null;
}
private void sendKernel(Message message){
AsyncbaseQueue.submit((Runnable)() ->{
//ID=id+时间戳
CorrelationData correlationData=new CorrelationData(String.format("%s#%s#%s",message.getMessageId(),System.currentTimeMillis(),message.getMessageType()));
rabbitTemplate.convertAndSend(message.getMessageTopic(),message.getMessageRouteKey(),message,correlationData);
log.info("#RabbitBrokerImpl.sendKernel#发送消息:messageId:{}",message.getMessageId());
});
}
}
异步线程池发送相关消息
package com.xp.rabbit.producer.broker;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.task.support.ExecutorServiceAdapter;
import org.springframework.core.task.support.TaskExecutorAdapter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.*;
@Slf4j
public class AsyncbaseQueue {
private static final int THREAD_SIZE=Runtime.getRuntime().availableProcessors();
private static final int QUEUE_SIZE=1000;
private static ExecutorService sendAsync=
new ThreadPoolExecutor(THREAD_SIZE,
THREAD_SIZE,
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_SIZE),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("rabbitmq_client_async_sender");
return t;
}
}, new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.error("async sender is error,runnable:{},executor:{}",r,executor);
}
});
public static void submit(Runnable runnable){
sendAsync.submit(runnable);
}
}
package com.xp.rabbit.producer.broker;
import java.util.List;
import com.google.common.collect.Lists;
import com.xp.rabbitmq.api.Message;
public class MessageHolder {
private List messages = Lists.newArrayList();
@SuppressWarnings({"rawtypes", "unchecked"})
public static final ThreadLocal holder = new ThreadLocal() {
@Override
protected Object initialValue() {
return new MessageHolder();
}
};
public static void add(Message message) {
holder.get().messages.add(message);
}
public static List clear() {
List tmp = Lists.newArrayList(holder.get().messages);
holder.remove();
return tmp;
}
}
package com.xp.rabbit.producer.broker;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MessageHolderAyncQueue {
private static final int THREAD_SIZE = Runtime.getRuntime().availableProcessors();
private static final int QUEUE_SIZE = 10000;
private static ExecutorService senderAsync =
new ThreadPoolExecutor(THREAD_SIZE,
THREAD_SIZE,
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue(QUEUE_SIZE),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("rabbitmq_client_async_sender");
return t;
}
},
new java.util.concurrent.RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.error("async sender is error rejected, runnable: {}, executor: {}", r, executor);
}
});
public static void submit(Runnable runnable) {
senderAsync.submit(runnable);
}
}
容器化rabbit template
package com.xp.rabbit.producer.broker;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Maps;
import com.xp.rabbit.common.convert.GenericMessageConvert;
import com.xp.rabbit.common.convert.RabbitMessageConvert;
import com.xp.rabbit.common.impl.SerializerFactoryImpl;
import com.xp.rabbit.common.serializer.Serializer;
import com.xp.rabbit.common.serializer.SerializerFactory;
import com.xp.rabbit.producer.constant.BrokerStatus;
import com.xp.rabbit.producer.service.BrokerMessageService;
import com.xp.rabbitmq.api.Message;
import com.xp.rabbitmq.api.MessageType;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Component;
import sun.plugin.services.BrowserService;
import java.util.List;
import java.util.Map;
@Component
@Slf4j
public class RabbitTemplateContainer implements RabbitTemplate.ConfirmCallback {
Map rabbitTemplateMap= Maps.newConcurrentMap();
private SerializerFactory serializerFactory= SerializerFactoryImpl.INSTANCE;
@Autowired
ConnectionFactory connectionFactory;
@Autowired
private BrokerMessageService brokerMessageService;
public RabbitTemplate getRabbitTemplate(Message message){
Preconditions.checkNotNull(message);
String messageTopic = message.getMessageTopic();
RabbitTemplate rabbitTemplate = rabbitTemplateMap.get(messageTopic);
if(rabbitTemplate!=null){
return rabbitTemplate;
}
//根据topic创建
RabbitTemplate newRabbitTemplate = new RabbitTemplate(connectionFactory);
newRabbitTemplate.setExchange(messageTopic);
newRabbitTemplate.setRoutingKey(message.getMessageRouteKey());
newRabbitTemplate.setRetryTemplate(new RetryTemplate());
//序列化内容
Serializer serializer = serializerFactory.create();
GenericMessageConvert gmc=new GenericMessageConvert(serializer);
RabbitMessageConvert rmc=new RabbitMessageConvert(gmc);
newRabbitTemplate.setMessageConverter(rmc);
//根据消息发送类型,设置回调
if(!message.getMessageType().equals(MessageType.RAPID))
{
newRabbitTemplate.setConfirmCallback(this);
//支持事务
newRabbitTemplate.setChannelTransacted(true);
}
rabbitTemplateMap.putIfAbsent(messageTopic,newRabbitTemplate);
return rabbitTemplateMap.get(messageTopic);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//1.取得id
List list = Splitter.on("#").splitToList(correlationData.getId());
String id=list.get(0);
Long time=Long.parseLong(list.get(1));
String messageType = list.get(2);
if (ack){
log.info("消息发送成功,id:{},时间:{}",id,time);
//如果消息为可靠模式,发送成功 将记录的日志更改为成功状态
if(MessageType./confirm/i.equals(messageType)){
brokerMessageService.updateSuccessBrokerMessage(id);
}
}else {
log.error("消息发送失败,id:{},时间:{}",id,time);
}
}
}
package com.xp.rabbit.producer.broker;
import com.google.common.base.Preconditions;
import com.xp.rabbitmq.api.Message;
import com.xp.rabbitmq.api.MessageProducer;
import com.xp.rabbitmq.api.MessageType;
import com.xp.rabbitmq.api.SendCallback;
import com.xp.rabbitmq.exception.MessageRuntimeException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class ProducerClient implements MessageProducer {
@Autowired
private RabbitBroker rabbitBroker;
@Override
public void send(Message message) throws MessageRuntimeException {
//1.判断message主题是否为空
Preconditions.checkNotNull(message.getMessageTopic());
//2.根据message类型发送消息
switch(message.getMessageType()){
case MessageType.RAPID:
rabbitBroker.onRapid(message);
break;
case MessageType./confirm/i:
rabbitBroker.on/confirm/i(message);
break;
case MessageType.RELIABLE:
rabbitBroker.onReliable(message);
break;
default :
}
}
@Override
public void send(Message message, SendCallback sendCallback) throws MessageRuntimeException {
}
@Override
public void send(List messageList) throws MessageRuntimeException {
messageList.forEach(message -> {
message.setMessageType(MessageType.RAPID);
MessageHolder.add(message);
});
}
@Override
public void send(List messageList, SendCallback sendCallbackList) throws MessageRuntimeException {
}
}
4.rabbit-task
使用elastic- job查询数据库进行可靠性投递保证
1.自动加载spring.factories
# Auto Configure org.springframework.boot.autoconfigure.EnableAutoConfiguration= com.xp.rabbit.task.autoconfigure.JobParseAutoConfiguration
2.相关常量
package com.xp.rabbit.task.enums;
public enum ElasticJobTypeEnum {
SIMPLE("SimpleJob", "简单类型job"),
DATAFLOW("DataflowJob", "流式类型job"),
script("scriptJob", "脚本类型job");
private String type;
private String desc;
private ElasticJobTypeEnum(String type, String desc) {
this.type = type;
this.desc = desc;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
}
3.elastic- job对zookeeper的配置
package com.xp.rabbit.task.autoconfigure;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "elastic.job.zk")
@Data
public class JobZookeeperProperties {
private String namespace;
private String serverLists;
private int maxRetries = 3;
private int connectionTimeoutMilliseconds = 15000;
private int sessionTimeoutMilliseconds = 60000;
private int baseSleepTimeMilliseconds = 1000;
private int maxSleepTimeMilliseconds = 3000;
private String digest = "";
}
4.job解析配置
package com.xp.rabbit.task.autoconfigure;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import com.xp.rabbit.task.parse.ElasticJobParse;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConditionalOnProperty(prefix = "elastic.job.zk",name = {"namespace","serverLists"},matchIfMissing = false)
@EnableConfigurationProperties(JobZookeeperProperties.class)
@Slf4j
public class JobParseAutoConfiguration {
@Bean(initMethod = "init")
public ZookeeperRegistryCenter registryCenter(JobZookeeperProperties jobZookeeperProperties) {
ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(jobZookeeperProperties.getServerLists(), jobZookeeperProperties.getNamespace());
zkConfig.setbaseSleepTimeMilliseconds(jobZookeeperProperties.getbaseSleepTimeMilliseconds());
zkConfig.setMaxSleepTimeMilliseconds(jobZookeeperProperties.getMaxSleepTimeMilliseconds());
zkConfig.setConnectionTimeoutMilliseconds(jobZookeeperProperties.getConnectionTimeoutMilliseconds());
zkConfig.setSessionTimeoutMilliseconds(jobZookeeperProperties.getSessionTimeoutMilliseconds());
zkConfig.setMaxRetries(jobZookeeperProperties.getMaxRetries());
zkConfig.setDigest(jobZookeeperProperties.getDigest());
log.info("初始化job注册中心配置成功, zkaddress : {}, namespace : {}", jobZookeeperProperties.getServerLists(), jobZookeeperProperties.getNamespace());
return new ZookeeperRegistryCenter(zkConfig);
}
@Bean
public ElasticJobParse registryElasticJobParse(JobZookeeperProperties jobZookeeperProperties,ZookeeperRegistryCenter zookeeperRegistryCenter){
return new ElasticJobParse(jobZookeeperProperties,zookeeperRegistryCenter);
}
}
5.job的解析类 在Springboot所有类创建好的时候进行相关配置加载
package com.xp.rabbit.task.parse;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import com.xp.rabbit.task.autoconfigure.JobParseAutoConfiguration;
import com.xp.rabbit.task.autoconfigure.JobZookeeperProperties;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.beans.factory.support.ManagedList;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.util.StringUtils;
import com.xp.rabbit.task.annotation.ElasticJobConfig;
import com.xp.rabbit.task.autoconfigure.JobZookeeperProperties;
import com.xp.rabbit.task.enums.ElasticJobTypeEnum;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.JobTypeConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.config.script.scriptJobConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
import com.dangdang.ddframe.job.executor.handler.JobProperties;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ElasticJobParse implements ApplicationListener {
private JobZookeeperProperties jobZookeeperProperties;
private ZookeeperRegistryCenter zookeeperRegistryCenter;
public ElasticJobParse(JobZookeeperProperties jobZookeeperProperties,
ZookeeperRegistryCenter zookeeperRegistryCenter) {
this.jobZookeeperProperties = jobZookeeperProperties;
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
try {
ApplicationContext applicationContext = event.getApplicationContext();
Map beanMap = applicationContext.getBeansWithAnnotation(ElasticJobConfig.class);
for(Iterator> it = beanMap.values().iterator(); it.hasNext();) {
Object confBean = it.next();
Class> clazz = confBean.getClass();
if(clazz.getName().indexOf("$") > 0) {
String className = clazz.getName();
clazz = Class.forName(className.substring(0, className.indexOf("$")));
}
// 获取接口类型 用于判断是什么类型的任务
String jobTypeName = clazz.getInterfaces()[0].getSimpleName();
// 获取配置项 ElasticJobConfig
ElasticJobConfig conf = clazz.getAnnotation(ElasticJobConfig.class);
String jobClass = clazz.getName();
String jobName = this.jobZookeeperProperties.getNamespace() + "." + conf.name();
String cron = conf.cron();
String shardingItemParameters = conf.shardingItemParameters();
String description = conf.description();
String jobParameter = conf.jobParameter();
String jobExceptionHandler = conf.jobExceptionHandler();
String executorServiceHandler = conf.executorServiceHandler();
String jobShardingStrategyClass = conf.jobShardingStrategyClass();
String eventTraceRdbDataSource = conf.eventTraceRdbDataSource();
String scriptCommandLine = conf.scriptCommandLine();
boolean failover = conf.failover();
boolean misfire = conf.misfire();
boolean overwrite = conf.overwrite();
boolean disabled = conf.disabled();
boolean monitorExecution = conf.monitorExecution();
boolean streamingProcess = conf.streamingProcess();
int shardingTotalCount = conf.shardingTotalCount();
int monitorPort = conf.monitorPort();
int maxTimeDiffSeconds = conf.maxTimeDiffSeconds();
int reconcileIntervalMinutes = conf.reconcileIntervalMinutes();
// 先把当当网的esjob的相关configuration
JobCoreConfiguration coreConfig = JobCoreConfiguration
.newBuilder(jobName, cron, shardingTotalCount)
.shardingItemParameters(shardingItemParameters)
.description(description)
.failover(failover)
.jobParameter(jobParameter)
.misfire(misfire)
.jobProperties(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), jobExceptionHandler)
.jobProperties(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER.getKey(), executorServiceHandler)
.build();
// 我到底要创建什么样的任务.
JobTypeConfiguration typeConfig = null;
if(ElasticJobTypeEnum.SIMPLE.getType().equals(jobTypeName)) {
typeConfig = new SimpleJobConfiguration(coreConfig, jobClass);
}
if(ElasticJobTypeEnum.DATAFLOW.getType().equals(jobTypeName)) {
typeConfig = new DataflowJobConfiguration(coreConfig, jobClass, streamingProcess);
}
if(ElasticJobTypeEnum.script.getType().equals(jobTypeName)) {
typeConfig = new scriptJobConfiguration(coreConfig, scriptCommandLine);
}
// LiteJobConfiguration
LiteJobConfiguration jobConfig = LiteJobConfiguration
.newBuilder(typeConfig)
.overwrite(overwrite)
.disabled(disabled)
.monitorPort(monitorPort)
.monitorExecution(monitorExecution)
.maxTimeDiffSeconds(maxTimeDiffSeconds)
.jobShardingStrategyClass(jobShardingStrategyClass)
.reconcileIntervalMinutes(reconcileIntervalMinutes)
.build();
// 创建一个Spring的beanDefinition
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(SpringJobScheduler.class);
factory.setInitMethodName("init");
factory.setScope("prototype");
// 1.添加bean构造参数,相当于添加自己的真实的任务实现类
if (!ElasticJobTypeEnum.script.getType().equals(jobTypeName)) {
factory.addConstructorArgValue(confBean);
}
// 2.添加注册中心
factory.addConstructorArgValue(this.zookeeperRegistryCenter);
// 3.添加LiteJobConfiguration
factory.addConstructorArgValue(jobConfig);
// 4.如果有eventTraceRdbDataSource 则也进行添加
if (StringUtils.hasText(eventTraceRdbDataSource)) {
BeanDefinitionBuilder rdbFactory = BeanDefinitionBuilder.rootBeanDefinition(JobEventRdbConfiguration.class);
rdbFactory.addConstructorArgReference(eventTraceRdbDataSource);
factory.addConstructorArgValue(rdbFactory.getBeanDefinition());
}
// 5.添加监听
List> elasticJobListeners = getTargetElasticJobListeners(conf);
factory.addConstructorArgValue(elasticJobListeners);
// 接下来就是把factory 也就是 SpringJobScheduler注入到Spring容器中
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory();
String registerBeanName = conf.name() + "SpringJobScheduler";
defaultListableBeanFactory.registerBeanDefinition(registerBeanName, factory.getBeanDefinition());
SpringJobScheduler scheduler = (SpringJobScheduler)applicationContext.getBean(registerBeanName);
scheduler.init();
log.info("启动elastic-job作业: " + jobName);
}
log.info("共计启动elastic-job作业数量为: {} 个", beanMap.values().size());
} catch (Exception e) {
log.error("elasticjob 启动异常, 系统强制退出", e);
System.exit(1);
}
}
private List getTargetElasticJobListeners(ElasticJobConfig conf) {
List result = new ManagedList(2);
String listeners = conf.listener();
if (StringUtils.hasText(listeners)) {
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(listeners);
factory.setScope("prototype");
result.add(factory.getBeanDefinition());
}
String distributedListeners = conf.distributedListener();
long startedTimeoutMilliseconds = conf.startedTimeoutMilliseconds();
long completedTimeoutMilliseconds = conf.completedTimeoutMilliseconds();
if (StringUtils.hasText(distributedListeners)) {
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(distributedListeners);
factory.setScope("prototype");
factory.addConstructorArgValue(Long.valueOf(startedTimeoutMilliseconds));
factory.addConstructorArgValue(Long.valueOf(completedTimeoutMilliseconds));
result.add(factory.getBeanDefinition());
}
return result;
}
}
6.使用注解的方式配置,加载
package com.xp.rabbit.task.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ElasticJobConfig {
String name(); //elasticjob的名称
String cron() default "";
int shardingTotalCount() default 1;
String shardingItemParameters() default "";
String jobParameter() default "";
boolean failover() default false;
boolean misfire() default true;
String description() default "";
boolean overwrite() default false;
boolean streamingProcess() default false;
String scriptCommandLine() default "";
boolean monitorExecution() default false;
public int monitorPort() default -1; //must
public int maxTimeDiffSeconds() default -1; //must
public String jobShardingStrategyClass() default ""; //must
public int reconcileIntervalMinutes() default 10; //must
public String eventTraceRdbDataSource() default ""; //must
public String listener() default ""; //must
public boolean disabled() default false; //must
public String distributedListener() default "";
public long startedTimeoutMilliseconds() default Long.MAX_VALUE; //must
public long completedTimeoutMilliseconds() default Long.MAX_VALUE; //must
public String jobExceptionHandler() default "com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler";
public String executorServiceHandler() default "com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler";
}
package com.xp.rabbit.task.annotation;
import com.xp.rabbit.task.autoconfigure.JobParseAutoConfiguration;
import org.springframework.context.annotation.import;
import java.lang.annotation.*;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@documented
@Inherited
@import(JobParseAutoConfiguration.class)
public @interface EnableElasticJob {
}
7.使用
在RabbitProducerAutoConfiguration开启EnableElasticJob进行
EnableElasticJob配置解析
package com.xp.rabbit.producer.autoconfigure;
import com.xp.rabbit.task.annotation.EnableElasticJob;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@EnableElasticJob
@Configuration
@ComponentScan({"com.xp.rabbit.producer.*"})
public class RabbitProducerAutoConfiguration {
}
在相关job配置zookeeper和其他配置
package com.xp.rabbit.producer.task;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.xp.rabbit.producer.broker.RabbitBroker;
import com.xp.rabbit.producer.constant.BrokerConstant;
import com.xp.rabbit.producer.constant.BrokerStatus;
import com.xp.rabbit.producer.entity.BrokerMessage;
import com.xp.rabbit.producer.service.BrokerMessageService;
import com.xp.rabbit.task.annotation.ElasticJobConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
@Component
@Slf4j
@ElasticJobConfig(
name= "com.xp.rabbit.producer.task.RetryMessageDataflowJob",
cron= "0/10 * * * * ?",
description = "可靠性投递消息补偿任务",
overwrite = true,
shardingTotalCount = 1
)
public class RetryMessageDataflowJob implements DataflowJob {
@Autowired
private BrokerMessageService brokerMessageService;
@Autowired
private RabbitBroker broker;
@Override
public List fetchData(ShardingContext shardingContext) {
//查询发送状态的记录
List brokerMessages = brokerMessageService.queryBrokerMessageStatus4Timeout(BrokerStatus.SENDING.getCode());
log.info("--------@@@@@ 抓取数据集合, 数量: {} @@@@@@-----------" , brokerMessages.size());
return brokerMessages;
}
@Override
public void processData(ShardingContext shardingContext, List brokerMessageList) {
if(!CollectionUtils.isEmpty(brokerMessageList)){
brokerMessageList.forEach(brokerMessage -> {
if(brokerMessage.getTryCount()> BrokerConstant.MAX_TRY_TIME){
//超过次数将状态设置为发送失败
brokerMessageService.updateFailBrokerMessage(brokerMessage.getMessageId());
log.error("此条记录重发三次失败,id:{},message:{}",brokerMessage.getMessageId(),brokerMessage.getMessage());
}else {
//筛选发送没有超过规定次数的记录进行重发并修改中间记录
brokerMessageService.update4TryCount(brokerMessage.getMessageId());
//重发记录
broker.onReliable(brokerMessage.getMessage());
}
});
}
}
}
5.测试
1.引入依赖
cn.xp.rabbitmq.parent rabbit-core-producer1.0-SNAPSHOT
2.配置rabbit和zookeeper的配置application.properties
server.context-path=/test server.port=8001 spring.application.name=test spring.rabbitmq.addresses=ip:port spring.rabbitmq.username= spring.rabbitmq.password= spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-returns=true spring.rabbitmq.template.mandatory=true spring.rabbitmq.listener.simple.auto-startup=false elastic.job.zk.serverLists=ip:port elastic.job.zk.namespace=elastic-job
3.扫描相关包
package com.bfxy.rabbit;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@Configuration
@ComponentScan({"com.xp.rabbit.*"})
public class MainConfig {
}
4.总结
技术栈:
1.rabbitMq springboot mybatis elstic-job zookeeper
2.可靠性投递保证:使用手动ack落库消息的记录,再使用elastic-job进行job查询,如果状态为失败且重试次数小于规定次数,再进行发送
3.建造者模式创建message
4.异步线程进行发送消息
5.根据消息的topic池化rabbit Template
6.代理模式进行消息转换
7.注解方法配置相关elastic-job的
8.在springboot的onApplicationEvent时候配置自己的jobparse



