栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ组件封装

RabbitMQ组件封装

1.组件需求

需求:需要单独设计一个模块用来封装rabbitmq 其他应用之间引用,做简单少量的配置,即可发送消息,消息的发送要保证可靠的投递

2.组件架构选型

消息组件:rabbitMQ
消息可靠性保证:使用elasticjob查询数据库相关表记录来对消息进行可靠投递

3.模块分类

相关依赖



    4.0.0
    
        rabbit-common
        rabbit-api
        rabbit-core-producer
        rabbit-task
        rabbit-test
    
    
        spring-boot-starter-parent
        org.springframework.boot
        2.1.5.RELEASE
        
    
    cn.xp.rabbitmq.parent
    rabbitmq-parent
    1.0-SNAPSHOT
    rabbit-parent
    pom
    
        UTF-8
        8
        8
        8
        3.1.4
        1.9.13
        1.0.24
        2.1.4
        20.0
        1.2.2
        3.3.1
        2.4
        3.2.2
        2.11.0
        1.1.26
    

    
        
            org.springframework.boot
            spring-boot-starter
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
        
            org.projectlombok
            lombok
            provided
        
        
            com.google.guava
            guava
            ${guava.version}
        
        
            commons-fileupload
            commons-fileupload
            ${commons-fileupload.version}
        
        
            org.apache.commons
            commons-lang3
        
        
            commons-io
            commons-io
            ${commons-io.version}
        
        
            com.alibaba
            fastjson
            ${fastjson.version}
        
        
            com.fasterxml.jackson.core
            jackson-databind
        
        
            org.codehaus.jackson
            jackson-mapper-asl
            ${org.codehaus.jackson.version}
        
        
            com.fasterxml.uuid
            java-uuid-generator
            ${fasterxml.uuid.version}
        
    




1.rabbit- api


1.message消息传输类

package com.xp.rabbitmq.api;

import lombok.Data;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;


@Data
public class Message implements Serializable {
    
    private String messageId;

    
    private String messageTopic="";

    
    private String messageRouteKey="";


    
    private Map messageAttribute=new HashMap<>();

    
    private String messageType=MessageType./confirm/i;

    
    private Long messageDelay;

    public Message() {
    }

    public Message(String messageId, String messageTopic, String messageRouteKey, Map messageAttribute, String messageType, Long messageDelay) {
        this.messageId = messageId;
        this.messageTopic = messageTopic;
        this.messageRouteKey = messageRouteKey;
        this.messageAttribute = messageAttribute;
        this.messageType = messageType;
        this.messageDelay = messageDelay;
    }

    public Message(String messageId, String messageTopic, String messageRouteKey, Map messageAttribute, Long messageDelay) {
        this.messageId = messageId;
        this.messageTopic = messageTopic;
        this.messageRouteKey = messageRouteKey;
        this.messageAttribute = messageAttribute;
        this.messageDelay = messageDelay;
    }
}

2.使用建造者模式构造message—MessageBuilder

package com.xp.rabbitmq.api;

import com.xp.rabbitmq.exception.MessageRuntimeException;
import org.apache.commons.lang3.StringUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;


public class MessageBuilder {
    
    private String messageId;
    
    private String messageTopic;
    
    private String messageRouteKey="";
    
    private Map messageAttribute=new HashMap<>();
    
    private String messageType=MessageType./confirm/i;
    
    private Long messageDelay;

    public MessageBuilder() {
    }

    public static MessageBuilder builder(){
        return new MessageBuilder();
    }

    public MessageBuilder withMessageId(String messageId){
        this.messageId=messageId;
        return this;
    }

    public MessageBuilder withMessageTopic(String messageTopic){
        this.messageTopic=messageTopic;
        return this;
    }

    public MessageBuilder withMessageRouteKey(String messageRouteKey){
        this.messageRouteKey=messageRouteKey;
        return this;
    }

    public MessageBuilder withMessageAttributes(Map messageAttribute){
        this.messageAttribute=messageAttribute;
        return this;
    }
    public MessageBuilder withMessageAttribute(String key,Object properties){
        this.messageAttribute.put(key,properties);
        return this;
    }
    public MessageBuilder withMessageType(String messageType){
        this.messageType=messageType;
        return this;
    }
    public MessageBuilder withMessageDelay(Long messageDelay){
        this.messageDelay=messageDelay;
        return this;
    }
    public Message build(){
        if(StringUtils.isEmpty(this.messageId)){
            this.messageId= UUID.randomUUID().toString();
        }
        if(StringUtils.isEmpty((this.messageTopic))){
            throw new MessageRuntimeException("消息主题不能为空");
        }

        Message message=new Message(messageId,messageTopic,messageRouteKey,messageAttribute,messageType,messageDelay);

        return message;
    }


}

3.消息发送类MessageProducer

package com.xp.rabbitmq.api;

import com.xp.rabbitmq.exception.MessageRuntimeException;

import java.util.List;


public interface MessageProducer {

    void send(Message message) throws MessageRuntimeException;
    void send(Message message,SendCallback sendCallback) throws MessageRuntimeException;
    void send(List messageList) throws MessageRuntimeException;
    void send(List messageList,SendCallback sendCallbackList) throws MessageRuntimeException;


}

4.消息分类MessageType

package com.xp.rabbitmq.api;


public class MessageType {
    
    public static final String RAPID="0";

    
    public static final String ConFIRM="1";


    
    public static final String RELIABLE="2";

}

5.相关异常处理

package com.xp.rabbitmq.exception;


public class MessageException extends Exception{

    public MessageException() {
        super();
    }
    public MessageException(String exception) {
        super(exception);
    }
    public MessageException(String exception,Throwable throwable) {
        super(exception,throwable);
    }
    public MessageException(Throwable throwable) {
        super(throwable);
    }




}
package com.xp.rabbitmq.exception;


public class MessageRuntimeException extends RuntimeException{
    public MessageRuntimeException() {
        super();
    }
    public MessageRuntimeException(String exception) {
        super(exception);
    }
    public MessageRuntimeException(String exception,Throwable throwable) {
        super(exception,throwable);
    }
    public MessageRuntimeException(Throwable throwable) {
        super(throwable);
    }
}
2.rabbit-common


包含消息进行序列化 反序列化处理
1.序列化接口Serializer

package com.xp.rabbit.common.serializer;

public interface Serializer {
    //对象序列化为byte数组
    byte[] serializerRaw(Object object);
    //对象序列化string
    String serializer(Object object);
    //string反序列化对象
    T deserializer(String content);
    //数组反序列化对象
    T deserializer( byte[] content);
}

2.相关实现 SerializerImpl

package com.xp.rabbit.common.impl;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.xp.rabbit.common.serializer.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;



public class SerializerImpl implements Serializer {
    private static final Logger LOGGER= LoggerFactory.getLogger(SerializerImpl.class);
    private static final ObjectMapper mapper=new ObjectMapper();
    static {
        mapper.disable(SerializationFeature.INDENT_OUTPUT);
        mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
        mapper.configure(JsonParser.Feature.ALLOW_COMMENTS,true);
        mapper.configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER,true);
        mapper.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS,true);
        mapper.configure(JsonParser.Feature.ALLOW_NUMERIC_LEADING_ZEROS,true);
        mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES,true);
        mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS,true);
        mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES,true);
    }

    private final JavaType type;

    public SerializerImpl(JavaType type) {
        this.type = type;
    }

    public static SerializerImpl createParametricType(Class cls){
        return new SerializerImpl(mapper.getTypeFactory().constructType(cls));
    }

    @Override
    public byte[] serializerRaw(Object object) {
        try{
            return mapper.writevalueAsBytes(object);
        }catch (JsonProcessingException e){
            LOGGER.error("序列化出错",e);
        }
        return null;
    }

    @Override
    public String serializer(Object object) {
        try{
            return mapper.writevalueAsString(object);
        }catch (JsonProcessingException e){
            LOGGER.error("序列化出错",e);
        }
        return null;
    }

    @Override
    public  T deserializer(String content) {
        try{
            return mapper.readValue(content,type);
        }catch (IOException e){
            LOGGER.error("反序列化出错",e);
        }
        return null;
    }

    @Override
    public  T deserializer(byte[] content) {
        try{
            return mapper.readValue(content,type);
        }catch (IOException e){
            LOGGER.error("反序列化出错",e);
        }
        return null;
    }
}

3.单例工厂模式产生序列化对象

package com.xp.rabbit.common.serializer;


public interface SerializerFactory {

    Serializer create();
}


package com.xp.rabbit.common.impl;

import com.xp.rabbit.common.serializer.Serializer;
import com.xp.rabbit.common.serializer.SerializerFactory;
import com.xp.rabbitmq.api.Message;


public class SerializerFactoryImpl implements SerializerFactory {
    public static final SerializerFactoryImpl INSTANCE=new SerializerFactoryImpl();
    @Override
    public Serializer create() {
        return SerializerImpl.createParametricType(Message.class);
    }
}

4.对象转json类

package com.xp.rabbit.common.util;

import java.util.ArrayList;
import java.util.List;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;


public class FastJsonConvertUtil {

	private static final SerializerFeature[] featuresWithNullValue = { SerializerFeature.WriteMapNullValue, SerializerFeature.WriteNullBooleanAsFalse,
	        SerializerFeature.WriteNullListAsEmpty, SerializerFeature.WriteNullNumberAsZero, SerializerFeature.WriteNullStringAsEmpty };

	
	public static  T convertJSONToObject(String data, Class clzss) {
		try {
			T t = JSON.parseObject(data, clzss);
			return t;
		} catch (Exception e) {
			e.printStackTrace();
			return null;
		}
	}
	
	
	public static  T convertJSONToObject(JSonObject data, Class clzss) {
		try {
			T t = JSONObject.toJavaObject(data, clzss);
			return t;
		} catch (Exception e) {
			e.printStackTrace();
			return null;
		}
	}

	
	public static  List convertJSONToArray(String data, Class clzss) {
		try {
			List t = JSON.parseArray(data, clzss);
			return t;
		} catch (Exception e) {
			e.printStackTrace();
			return null;
		}
	}
	
	
	public static  List convertJSONToArray(List data, Class clzss) {
		try {
			List t = new ArrayList();
			for (JSonObject jsonObject : data) {
				t.add(convertJSONToObject(jsonObject, clzss));
			}
			return t;
		} catch (Exception e) {
			e.printStackTrace();
			return null;
		}
	}

	
	public static String convertObjectToJSON(Object obj) {
		try {
			String text = JSON.toJSonString(obj);
			return text;
		} catch (Exception e) {
			e.printStackTrace();
			return null;
		}
	}
	
	
	public static JSonObject convertObjectToJSONObject(Object obj){
		try {
			JSonObject jsonObject = (JSONObject) JSONObject.toJSON(obj);
			return jsonObject;
		} catch (Exception e) {
			e.printStackTrace();
			return null;
		}		
	}


	
	public static String convertObjectToJSONWithNullValue(Object obj) {
		try {
			String text = JSON.toJSonString(obj, featuresWithNullValue);
			return text;
		} catch (Exception e) {
			e.printStackTrace();
			return null;
		}
	}

	public static void main(String[] args) {
		System.err.println(System.getProperties());
	}
}
package com.xp.rabbit.common.mybatis.handler;

import java.sql.CallableStatement;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import com.xp.rabbit.common.util.FastJsonConvertUtil;
import com.xp.rabbitmq.api.Message;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.type.baseTypeHandler;
import org.apache.ibatis.type.JdbcType;





public class MessageJsonTypeHandler extends baseTypeHandler {

    @Override
    public void setNonNullParameter(PreparedStatement ps, int i, Message parameter,
            JdbcType jdbcType) throws SQLException {
        
        ps.setString(i, FastJsonConvertUtil.convertObjectToJSON(parameter));
    }

    @Override
    public Message getNullableResult(ResultSet rs, String columnName)
            throws SQLException {
    	String value = rs.getString(columnName);
    	if(null != value && !StringUtils.isBlank(value)) {
    		return FastJsonConvertUtil.convertJSONToObject(rs.getString(columnName), Message.class);
    	}
    	return null;  
    }

    @Override
    public Message getNullableResult(ResultSet rs, int columnIndex) throws SQLException {
    	String value = rs.getString(columnIndex);
    	if(null != value && !StringUtils.isBlank(value)) {
    		return FastJsonConvertUtil.convertJSONToObject(rs.getString(columnIndex), Message.class);
    	}
    	return null;         
    }

    @Override
    public Message getNullableResult(CallableStatement cs, int columnIndex) throws SQLException {
    	String value = cs.getString(columnIndex);
    	if(null != value && !StringUtils.isBlank(value)) {
    		return FastJsonConvertUtil.convertJSONToObject(cs.getString(columnIndex), Message.class);
    	}
    	return null;   
    }

}
package com.xp.rabbit.common.convert;


import com.google.common.base.Preconditions;
import com.xp.rabbit.common.serializer.Serializer;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;


public class GenericMessageConvert implements MessageConverter {
    private Serializer serializer;

    public GenericMessageConvert(Serializer serializer) {
        Preconditions.checkNotNull(serializer);
        this.serializer = serializer;
    }

    
    @Override
    public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
        return new Message(serializer.serializerRaw(o),messageProperties);
    }

    
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        return serializer.deserializer(message.getBody());
    }
}

代理模式创建自己的json转换器

package com.xp.rabbit.common.convert;

import com.google.common.base.Preconditions;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;


public class RabbitMessageConvert implements MessageConverter {
    private GenericMessageConvert delegate;

    public RabbitMessageConvert(GenericMessageConvert delegate) {
        Preconditions.checkNotNull(delegate);
        this.delegate = delegate;
    }

    @Override
    public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
        //可以增加过期时间设置
        return this.delegate.toMessage(o,messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        return (com.xp.rabbitmq.api.Message ) this.delegate.fromMessage(message);
    }
}
3.rabbit-core-producer


这个模块主要是消息投递
1.配置spring.factories实现自动加载 RabbitProducerAutoConfiguration

# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=
  com.xp.rabbit.producer.autoconfigure.RabbitProducerAutoConfiguration
  
package com.xp.rabbit.producer.autoconfigure;

import com.xp.rabbit.task.annotation.EnableElasticJob;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;


@EnableElasticJob
@Configuration
@ComponentScan({"com.xp.rabbit.producer.*"})
public class RabbitProducerAutoConfiguration {
}

2.rabbit-producer-message.properties

rabbit.producer.druid.type=com.alibaba.druid.pool.DruidDataSource
rabbit.producer.druid.jdbc.url=jdbc:mysql://ip:port/broker_message?characterEncoding=UTF-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useUnicode=true&serverTimezone=GMT
rabbit.producer.druid.jdbc.driver-class-name=com.mysql.jdbc.Driver
rabbit.producer.druid.jdbc.username=
rabbit.producer.druid.jdbc.password=
rabbit.producer.druid.jdbc.initialSize=5
rabbit.producer.druid.jdbc.minIdle=1
rabbit.producer.druid.jdbc.maxActive=100
rabbit.producer.druid.jdbc.maxWait=60000
rabbit.producer.druid.jdbc.timeBetweenEvictionRunsMillis=60000
rabbit.producer.druid.jdbc.minEvictableIdleTimeMillis=300000
rabbit.producer.druid.jdbc.validationQuery=SELECt 1 FROM DUAL
rabbit.producer.druid.jdbc.testWhileIdle=true
rabbit.producer.druid.jdbc.testonBorrow=false
rabbit.producer.druid.jdbc.testonReturn=false
rabbit.producer.druid.jdbc.poolPreparedStatements=true
rabbit.producer.druid.jdbc.maxPoolPreparedStatementPerConnectionSize= 20
rabbit.producer.druid.jdbc.filters=stat,wall,log4j
rabbit.producer.druid.jdbc.useGlobalDataSourceStat=true

3.引入消息中间类 落库保证可靠性投递 BrokerMessage

package com.xp.rabbit.producer.entity;

import com.xp.rabbitmq.api.Message;

import java.io.Serializable;
import java.util.Date;




public class BrokerMessage implements Serializable {
	
	private static final long serialVersionUID = 7447792462810110841L;

	private String messageId;

    private Message message;

    private Integer tryCount = 0;

    private String status;

    private Date nextRetry;

    private Date createTime;

    private Date updateTime;

    public String getMessageId() {
        return messageId;
    }

    public void setMessageId(String messageId) {
        this.messageId = messageId == null ? null : messageId.trim();
    }

    public Message getMessage() {
		return message;
	}

	public void setMessage(Message message) {
		this.message = message;
	}

	public Integer getTryCount() {
        return tryCount;
    }

    public void setTryCount(Integer tryCount) {
        this.tryCount = tryCount;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status == null ? null : status.trim();
    }

    public Date getNextRetry() {
        return nextRetry;
    }

    public void setNextRetry(Date nextRetry) {
        this.nextRetry = nextRetry;
    }

    public Date getCreateTime() {
        return createTime;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }

    public Date getUpdateTime() {
        return updateTime;
    }

    public void setUpdateTime(Date updateTime) {
        this.updateTime = updateTime;
    }
}

mapper

package com.xp.rabbit.producer.mapper;

import com.xp.rabbit.producer.entity.BrokerMessage;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;

import java.util.Date;
import java.util.List;
@Mapper
public interface BrokerMessageMapper {
    int deleteByPrimaryKey(String messageId);

    int insert(BrokerMessage record);

    int insertSelective(BrokerMessage record);

    BrokerMessage selectByPrimaryKey(String messageId);

    int updateByPrimaryKeySelective(BrokerMessage record);

    int updateByPrimaryKeyWithBLOBs(BrokerMessage record);

    int updateByPrimaryKey(BrokerMessage record);

    void changeBrokerMessageStatus(@Param("brokerMessageId")String brokerMessageId, @Param("brokerMessageStatus")String brokerMessageStatus, @Param("updateTime")Date updateTime);

    List queryBrokerMessageStatus4Timeout(@Param("brokerMessageStatus")String brokerMessageStatus);

    List queryBrokerMessageStatus(@Param("brokerMessageStatus")String brokerMessageStatus);

    int update4TryCount(@Param("brokerMessageId")String brokerMessageId, @Param("updateTime") Date updateTime);

}

xml




  
    
    
    
    
    
    
    
  
  
    message_id, message, try_count, status, next_retry, create_time, update_time
  
  
  	
  
  
  
  
  
   
    update broker_message bm
    set bm.try_count = bm.try_count + 1,
      bm.update_time = #{updateTime,jdbcType=TIMESTAMP}
    where bm.message_id = #{brokerMessageId,jdbcType=VARCHAR}
   
   
   

service

package com.xp.rabbit.producer.service;


import com.xp.rabbit.producer.entity.BrokerMessage;
import org.apache.ibatis.annotations.Param;

import java.util.Date;
import java.util.List;


public interface BrokerMessageService {
    //插入记录
    void insertBrokerMessage(BrokerMessage brokerMessage);
    //成功修改记录
    void updateSuccessBrokerMessage(String brokerMessageId);
    //成功修改记录
    void updateFailBrokerMessage(String brokerMessageId);
    //通过id查询是否有记录
    BrokerMessage selectBrokerMessageById(String brokerMessageId);
    //查询发送失败的记录
    List queryBrokerMessageStatus4Timeout(String brokerMessageStatus);
    //更新重试次数
    int update4TryCount(String brokerMessageId);
}

package com.xp.rabbit.producer.service.impl;

import com.xp.rabbit.producer.constant.BrokerStatus;
import com.xp.rabbit.producer.entity.BrokerMessage;
import com.xp.rabbit.producer.mapper.BrokerMessageMapper;
import com.xp.rabbit.producer.service.BrokerMessageService;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.List;


@Service
public class BrokerMessageServiceImpl implements BrokerMessageService {
    @Autowired
    private BrokerMessageMapper brokerMessageMapper;
    @Override
    public void insertBrokerMessage(BrokerMessage brokerMessage) {
        brokerMessageMapper.insert(brokerMessage);

    }

    @Override
    public void updateSuccessBrokerMessage(String brokerMessageId) {
        brokerMessageMapper.changeBrokerMessageStatus(brokerMessageId,BrokerStatus.SEND_FAIL.getCode(),new Date() );
    }

    @Override
    public void updateFailBrokerMessage(String brokerMessageId) {
        brokerMessageMapper.changeBrokerMessageStatus(brokerMessageId, BrokerStatus.SEND_FAIL.getCode(), new Date() );

    }

    @Override
    public BrokerMessage selectBrokerMessageById(String brokerMessageId) {
        return brokerMessageMapper.selectByPrimaryKey(brokerMessageId);
    }

    @Override
    public List queryBrokerMessageStatus4Timeout(String brokerMessageStatus) {
        return brokerMessageMapper.queryBrokerMessageStatus4Timeout(brokerMessageStatus);
    }

    @Override
    public int update4TryCount(String brokerMessageId) {
        return brokerMessageMapper.update4TryCount(brokerMessageId,new Date());
    }
}

4.操作数据库配置

package com.xp.rabbit.producer.config.database;

import javax.sql.DataSource;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.jdbc.datasource.init.DataSourceInitializer;
import org.springframework.jdbc.datasource.init.DatabasePopulator;
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;


@Configuration
public class BrokerMessageConfiguration {

    @Autowired
    private DataSource rabbitProducerDataSource;
    
    @Value("classpath:rabbit-producer-message-schema.sql")
    private Resource schemascript;
    
    @Bean
    public DataSourceInitializer initDataSourceInitializer() {
    	System.err.println("--------------rabbitProducerDataSource-----------:" + rabbitProducerDataSource);
        final DataSourceInitializer initializer = new DataSourceInitializer();
        initializer.setDataSource(rabbitProducerDataSource);
        initializer.setDatabasePopulator(databasePopulator());
        return initializer;
    }

    private DatabasePopulator databasePopulator() {
        final ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
        populator.addscript(schemascript);
        return populator;
    }
}
package com.xp.rabbit.producer.config.database;

import java.sql.SQLException;

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.PropertySource;

@Configuration
@PropertySource({"classpath:rabbit-producer-message.properties"})
public class RabbitProducerDataSourceConfiguration {
	
	private static Logger LOGGER = org.slf4j.LoggerFactory.getLogger(RabbitProducerDataSourceConfiguration.class);
	
	@Value("${rabbit.producer.druid.type}")
	private Class dataSourceType;
	
	@Bean(name = "rabbitProducerDataSource")
	@Primary
	@ConfigurationProperties(prefix = "rabbit.producer.druid.jdbc")
	public DataSource rabbitProducerDataSource() throws SQLException {
		DataSource rabbitProducerDataSource = DataSourceBuilder.create().type(dataSourceType).build();
		LOGGER.info("============= rabbitProducerDataSource : {} ================", rabbitProducerDataSource);
		return rabbitProducerDataSource;
	}
	
    public DataSourceProperties primaryDataSourceProperties(){
        return new DataSourceProperties();
    }
    
    public DataSource primaryDataSource(){
        return primaryDataSourceProperties().initializeDataSourceBuilder().build();
    }
	
}
package com.xp.rabbit.producer.config.database;

import javax.annotation.Resource;
import javax.sql.DataSource;

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;



@Configuration
@AutoConfigureAfter(value = {RabbitProducerDataSourceConfiguration.class})
public class RabbitProducerMyBatisConfiguration {

	@Resource(name= "rabbitProducerDataSource")
	private DataSource rabbitProducerDataSource;
	
	@Bean(name="rabbitProducerSqlSessionFactory")
	public SqlSessionFactory rabbitProducerSqlSessionFactory(DataSource rabbitProducerDataSource) {
		
		SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
		bean.setDataSource(rabbitProducerDataSource);
		ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
		try {
			bean.setMapperLocations(resolver.getResources("classpath:mapper
public interface BrokerConstant {
    public final Integer TRY_TIME=1;
    //最大发送次数
    public final Integer MAX_TRY_TIME=3;
}
package com.xp.rabbit.producer.constant;

public enum BrokerStatus {
    //发送中
    SENDING("0"),
    //发送成功
    SEND_OK("1"),
    //发送失败
    SEND_FAIL("2");

    private String code;

    private String sendMessages;

    BrokerStatus(String code) {
        this.code = code;
    }

    public String getCode(){
        return this.code;
    }
}

6.生产者实现

package com.xp.rabbit.producer.broker;

import com.xp.rabbitmq.api.Message;
import com.xp.rabbitmq.api.SendCallback;

public interface RabbitBroker {
    void onRapid(Message message);

    void on/confirm/i(Message message);

    void onReliable(Message message);

    void onListSend();

    SendCallback on/confirm/iToCallBack(Message message);
}
package com.xp.rabbit.producer.broker;

import com.xp.rabbit.producer.constant.BrokerConstant;
import com.xp.rabbit.producer.constant.BrokerStatus;
import com.xp.rabbit.producer.entity.BrokerMessage;
import com.xp.rabbit.producer.service.BrokerMessageService;
import com.xp.rabbitmq.api.Message;
import com.xp.rabbitmq.api.MessageType;
import com.xp.rabbitmq.api.SendCallback;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.List;


@Component
@Slf4j
public class RabbitBrokerImpl implements RabbitBroker{
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private BrokerMessageService brokerMessageService;
    @Override
    public void onRapid(Message message) {
            sendKernel(message);
    }

    @Override
    public void on/confirm/i(Message message) {
        message.setMessageType(MessageType./confirm/i);
        sendKernel(message);
    }

    @Override
    public void onReliable(Message message) {
        message.setMessageType(MessageType.RELIABLE);

        //查询是否有相关记录
        BrokerMessage brokerMessageInRecord = brokerMessageService.selectBrokerMessageById(message.getMessageId());
        if(brokerMessageInRecord==null){
            //构建重发记录
            Date now = new Date();
            BrokerMessage brokerMessage = new BrokerMessage();
            brokerMessage.setMessage(message);
            brokerMessage.setStatus(BrokerStatus.SENDING.getCode());
            brokerMessage.setMessageId(message.getMessageId());
            brokerMessage.setCreateTime(now);
            brokerMessage.setUpdateTime(now);
            brokerMessage.setNextRetry(DateUtils.addMinutes(now, BrokerConstant.TRY_TIME));
            brokerMessageService.insertBrokerMessage(brokerMessage);
        }

        sendKernel(message);


    }

    @Override
    public void onListSend() {
        List messages = MessageHolder.clear();
        messages.forEach(message -> {
            MessageHolderAyncQueue.submit((Runnable) () -> {
                CorrelationData correlationData =
                        new CorrelationData(String.format("%s#%s#%s",
                                message.getMessageId(),
                                System.currentTimeMillis(),
                                message.getMessageType()));
                String topic = message.getMessageTopic();
                String routingKey = message.getMessageRouteKey();
                rabbitTemplate.convertAndSend(topic, routingKey, message, correlationData);
                log.info("#RabbitBrokerImpl.sendMessages# send to rabbitmq, messageId: {}", message.getMessageId());
            });
        });

    }

    @Override
    public SendCallback on/confirm/iToCallBack(Message message) {

        return null;
    }

    
    private void sendKernel(Message message){
        AsyncbaseQueue.submit((Runnable)() ->{
            //ID=id+时间戳
            CorrelationData correlationData=new CorrelationData(String.format("%s#%s#%s",message.getMessageId(),System.currentTimeMillis(),message.getMessageType()));
            rabbitTemplate.convertAndSend(message.getMessageTopic(),message.getMessageRouteKey(),message,correlationData);

            log.info("#RabbitBrokerImpl.sendKernel#发送消息:messageId:{}",message.getMessageId());
        });

    }
}

异步线程池发送相关消息

package com.xp.rabbit.producer.broker;

import lombok.extern.slf4j.Slf4j;
import org.springframework.core.task.support.ExecutorServiceAdapter;
import org.springframework.core.task.support.TaskExecutorAdapter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.*;


@Slf4j
public class AsyncbaseQueue {
    private static final int THREAD_SIZE=Runtime.getRuntime().availableProcessors();

    private static final int QUEUE_SIZE=1000;

    private static ExecutorService sendAsync=
            new ThreadPoolExecutor(THREAD_SIZE,
                    THREAD_SIZE,
                    60L,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(QUEUE_SIZE),
                    new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable r) {
                            Thread t = new Thread(r);
                            t.setName("rabbitmq_client_async_sender");
                            return t;
                        }
                    }, new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    log.error("async  sender is error,runnable:{},executor:{}",r,executor);
                }
            });

    public static  void submit(Runnable runnable){
        sendAsync.submit(runnable);
    }
}
package com.xp.rabbit.producer.broker;

import java.util.List;

import com.google.common.collect.Lists;
import com.xp.rabbitmq.api.Message;

public class MessageHolder {

	private List messages = Lists.newArrayList();
	
	@SuppressWarnings({"rawtypes", "unchecked"})
	public static final ThreadLocal holder = new ThreadLocal() {
		@Override
		protected Object initialValue() {
			return new MessageHolder();
		}
	};
	
	public static void add(Message message) {
		holder.get().messages.add(message);
	}
	
	public static List clear() {
		List tmp = Lists.newArrayList(holder.get().messages);
		holder.remove();
		return tmp;
	}
	
}

package com.xp.rabbit.producer.broker;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MessageHolderAyncQueue {

	private static final int THREAD_SIZE = Runtime.getRuntime().availableProcessors();
	
	private static final int QUEUE_SIZE = 10000;
	
	private static ExecutorService senderAsync =
			new ThreadPoolExecutor(THREAD_SIZE,
					THREAD_SIZE,
					60L,
					TimeUnit.SECONDS,
					new ArrayBlockingQueue(QUEUE_SIZE),
					new ThreadFactory() {
						@Override
						public Thread newThread(Runnable r) {
							Thread t = new Thread(r);
							t.setName("rabbitmq_client_async_sender");
							return t;
						}
					},
					new java.util.concurrent.RejectedExecutionHandler() {
						@Override
						public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
							log.error("async sender is error rejected, runnable: {}, executor: {}", r, executor);
						}
					});
			
		public static void submit(Runnable runnable) {
			senderAsync.submit(runnable);
		}	
}

容器化rabbit template

package com.xp.rabbit.producer.broker;

import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Maps;
import com.xp.rabbit.common.convert.GenericMessageConvert;
import com.xp.rabbit.common.convert.RabbitMessageConvert;
import com.xp.rabbit.common.impl.SerializerFactoryImpl;
import com.xp.rabbit.common.serializer.Serializer;
import com.xp.rabbit.common.serializer.SerializerFactory;
import com.xp.rabbit.producer.constant.BrokerStatus;
import com.xp.rabbit.producer.service.BrokerMessageService;
import com.xp.rabbitmq.api.Message;
import com.xp.rabbitmq.api.MessageType;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Component;
import sun.plugin.services.BrowserService;

import java.util.List;
import java.util.Map;


@Component
@Slf4j
public class RabbitTemplateContainer implements RabbitTemplate.ConfirmCallback {
    Map rabbitTemplateMap= Maps.newConcurrentMap();
    private SerializerFactory serializerFactory= SerializerFactoryImpl.INSTANCE;
    @Autowired
    ConnectionFactory connectionFactory;
    @Autowired
    private BrokerMessageService brokerMessageService;

    public RabbitTemplate getRabbitTemplate(Message message){
        Preconditions.checkNotNull(message);
        String messageTopic = message.getMessageTopic();
        RabbitTemplate rabbitTemplate = rabbitTemplateMap.get(messageTopic);
        if(rabbitTemplate!=null){
            return rabbitTemplate;
        }
        //根据topic创建
        RabbitTemplate newRabbitTemplate = new RabbitTemplate(connectionFactory);
        newRabbitTemplate.setExchange(messageTopic);
        newRabbitTemplate.setRoutingKey(message.getMessageRouteKey());
        newRabbitTemplate.setRetryTemplate(new RetryTemplate());
        //序列化内容

        Serializer serializer = serializerFactory.create();
        GenericMessageConvert gmc=new GenericMessageConvert(serializer);
        RabbitMessageConvert rmc=new RabbitMessageConvert(gmc);

        newRabbitTemplate.setMessageConverter(rmc);

        //根据消息发送类型,设置回调
        if(!message.getMessageType().equals(MessageType.RAPID))
        {
            newRabbitTemplate.setConfirmCallback(this);
            //支持事务
            newRabbitTemplate.setChannelTransacted(true);
        }


        rabbitTemplateMap.putIfAbsent(messageTopic,newRabbitTemplate);

        return rabbitTemplateMap.get(messageTopic);

    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        //1.取得id
        List list = Splitter.on("#").splitToList(correlationData.getId());
        String id=list.get(0);
        Long time=Long.parseLong(list.get(1));
        String messageType = list.get(2);

        if (ack){
            log.info("消息发送成功,id:{},时间:{}",id,time);
            //如果消息为可靠模式,发送成功 将记录的日志更改为成功状态
            if(MessageType./confirm/i.equals(messageType)){
                brokerMessageService.updateSuccessBrokerMessage(id);
            }
        }else {
            log.error("消息发送失败,id:{},时间:{}",id,time);
        }


    }
}
package com.xp.rabbit.producer.broker;

import com.google.common.base.Preconditions;
import com.xp.rabbitmq.api.Message;
import com.xp.rabbitmq.api.MessageProducer;
import com.xp.rabbitmq.api.MessageType;
import com.xp.rabbitmq.api.SendCallback;
import com.xp.rabbitmq.exception.MessageRuntimeException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;


@Component
public class ProducerClient implements MessageProducer {
    @Autowired
    private  RabbitBroker rabbitBroker;
    @Override
    public void send(Message message) throws MessageRuntimeException {
        //1.判断message主题是否为空
        Preconditions.checkNotNull(message.getMessageTopic());
        //2.根据message类型发送消息
        switch(message.getMessageType()){
            case MessageType.RAPID:
                rabbitBroker.onRapid(message);
                break;
            case MessageType./confirm/i:
                rabbitBroker.on/confirm/i(message);
                break;
            case MessageType.RELIABLE:
                rabbitBroker.onReliable(message);
                break;
            default :
        }
    }

    @Override
    public void send(Message message, SendCallback sendCallback) throws MessageRuntimeException {

    }

    @Override
    public void send(List messageList) throws MessageRuntimeException {
        messageList.forEach(message -> {
            message.setMessageType(MessageType.RAPID);
            MessageHolder.add(message);
        });
    }

    @Override
    public void send(List messageList, SendCallback sendCallbackList) throws MessageRuntimeException {

    }


}
4.rabbit-task

使用elastic- job查询数据库进行可靠性投递保证


1.自动加载spring.factories

# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=
  com.xp.rabbit.task.autoconfigure.JobParseAutoConfiguration

2.相关常量

package com.xp.rabbit.task.enums;

public enum ElasticJobTypeEnum {

  SIMPLE("SimpleJob", "简单类型job"),
  DATAFLOW("DataflowJob", "流式类型job"),
  script("scriptJob", "脚本类型job");
  
  private String type;
  
  private String desc;
  
  private ElasticJobTypeEnum(String type, String desc) {
  	this.type = type;
  	this.desc = desc;
  }

  public String getType() {
  	return type;
  }

  public void setType(String type) {
  	this.type = type;
  }

  public String getDesc() {
  	return desc;
  }

  public void setDesc(String desc) {
  	this.desc = desc;
  }

}

3.elastic- job对zookeeper的配置

package com.xp.rabbit.task.autoconfigure;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;



@ConfigurationProperties(prefix = "elastic.job.zk")
@Data
public class JobZookeeperProperties {

    private String namespace;

    private String serverLists;

    private int maxRetries = 3;

    private int connectionTimeoutMilliseconds = 15000;

    private int sessionTimeoutMilliseconds = 60000;

    private int baseSleepTimeMilliseconds = 1000;

    private int maxSleepTimeMilliseconds = 3000;

    private String digest = "";

}

4.job解析配置

package com.xp.rabbit.task.autoconfigure;

import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import com.xp.rabbit.task.parse.ElasticJobParse;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
@ConditionalOnProperty(prefix = "elastic.job.zk",name = {"namespace","serverLists"},matchIfMissing = false)
@EnableConfigurationProperties(JobZookeeperProperties.class)
@Slf4j
public class JobParseAutoConfiguration {
    
    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter registryCenter(JobZookeeperProperties jobZookeeperProperties) {
        ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(jobZookeeperProperties.getServerLists(), jobZookeeperProperties.getNamespace());
        zkConfig.setbaseSleepTimeMilliseconds(jobZookeeperProperties.getbaseSleepTimeMilliseconds());
        zkConfig.setMaxSleepTimeMilliseconds(jobZookeeperProperties.getMaxSleepTimeMilliseconds());
        zkConfig.setConnectionTimeoutMilliseconds(jobZookeeperProperties.getConnectionTimeoutMilliseconds());
        zkConfig.setSessionTimeoutMilliseconds(jobZookeeperProperties.getSessionTimeoutMilliseconds());
        zkConfig.setMaxRetries(jobZookeeperProperties.getMaxRetries());
        zkConfig.setDigest(jobZookeeperProperties.getDigest());
        log.info("初始化job注册中心配置成功, zkaddress : {}, namespace : {}", jobZookeeperProperties.getServerLists(), jobZookeeperProperties.getNamespace());
        return new ZookeeperRegistryCenter(zkConfig);
    }

    @Bean
    public ElasticJobParse registryElasticJobParse(JobZookeeperProperties jobZookeeperProperties,ZookeeperRegistryCenter zookeeperRegistryCenter){
        return new ElasticJobParse(jobZookeeperProperties,zookeeperRegistryCenter);
    }
}

5.job的解析类 在Springboot所有类创建好的时候进行相关配置加载

package com.xp.rabbit.task.parse;

import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import com.xp.rabbit.task.autoconfigure.JobParseAutoConfiguration;
import com.xp.rabbit.task.autoconfigure.JobZookeeperProperties;



import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.beans.factory.support.ManagedList;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.util.StringUtils;

import com.xp.rabbit.task.annotation.ElasticJobConfig;
import com.xp.rabbit.task.autoconfigure.JobZookeeperProperties;
import com.xp.rabbit.task.enums.ElasticJobTypeEnum;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.JobTypeConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.config.script.scriptJobConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
import com.dangdang.ddframe.job.executor.handler.JobProperties;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ElasticJobParse implements ApplicationListener {

    private JobZookeeperProperties jobZookeeperProperties;

    private ZookeeperRegistryCenter zookeeperRegistryCenter;

    public ElasticJobParse(JobZookeeperProperties jobZookeeperProperties,
                                ZookeeperRegistryCenter zookeeperRegistryCenter) {
        this.jobZookeeperProperties = jobZookeeperProperties;
        this.zookeeperRegistryCenter = zookeeperRegistryCenter;
    }

    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        try {
            ApplicationContext applicationContext = event.getApplicationContext();
            Map beanMap = applicationContext.getBeansWithAnnotation(ElasticJobConfig.class);
            for(Iterator it = beanMap.values().iterator(); it.hasNext();) {
                Object confBean = it.next();
                Class clazz = confBean.getClass();
                if(clazz.getName().indexOf("$") > 0) {
                    String className = clazz.getName();
                    clazz = Class.forName(className.substring(0, className.indexOf("$")));
                }
                // 	获取接口类型 用于判断是什么类型的任务
                String jobTypeName = clazz.getInterfaces()[0].getSimpleName();
                //	获取配置项 ElasticJobConfig
                ElasticJobConfig conf = clazz.getAnnotation(ElasticJobConfig.class);

                String jobClass = clazz.getName();
                String jobName = this.jobZookeeperProperties.getNamespace() + "." + conf.name();
                String cron = conf.cron();
                String shardingItemParameters = conf.shardingItemParameters();
                String description = conf.description();
                String jobParameter = conf.jobParameter();
                String jobExceptionHandler = conf.jobExceptionHandler();
                String executorServiceHandler = conf.executorServiceHandler();

                String jobShardingStrategyClass = conf.jobShardingStrategyClass();
                String eventTraceRdbDataSource = conf.eventTraceRdbDataSource();
                String scriptCommandLine = conf.scriptCommandLine();

                boolean failover = conf.failover();
                boolean misfire = conf.misfire();
                boolean overwrite = conf.overwrite();
                boolean disabled = conf.disabled();
                boolean monitorExecution = conf.monitorExecution();
                boolean streamingProcess = conf.streamingProcess();

                int shardingTotalCount = conf.shardingTotalCount();
                int monitorPort = conf.monitorPort();
                int maxTimeDiffSeconds = conf.maxTimeDiffSeconds();
                int reconcileIntervalMinutes = conf.reconcileIntervalMinutes();

                //	先把当当网的esjob的相关configuration
                JobCoreConfiguration coreConfig = JobCoreConfiguration
                        .newBuilder(jobName, cron, shardingTotalCount)
                        .shardingItemParameters(shardingItemParameters)
                        .description(description)
                        .failover(failover)
                        .jobParameter(jobParameter)
                        .misfire(misfire)
                        .jobProperties(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), jobExceptionHandler)
                        .jobProperties(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER.getKey(), executorServiceHandler)
                        .build();

                //	我到底要创建什么样的任务.
                JobTypeConfiguration typeConfig = null;
                if(ElasticJobTypeEnum.SIMPLE.getType().equals(jobTypeName)) {
                    typeConfig = new SimpleJobConfiguration(coreConfig, jobClass);
                }

                if(ElasticJobTypeEnum.DATAFLOW.getType().equals(jobTypeName)) {
                    typeConfig = new DataflowJobConfiguration(coreConfig, jobClass, streamingProcess);
                }

                if(ElasticJobTypeEnum.script.getType().equals(jobTypeName)) {
                    typeConfig = new scriptJobConfiguration(coreConfig, scriptCommandLine);
                }

                // LiteJobConfiguration
                LiteJobConfiguration jobConfig = LiteJobConfiguration
                        .newBuilder(typeConfig)
                        .overwrite(overwrite)
                        .disabled(disabled)
                        .monitorPort(monitorPort)
                        .monitorExecution(monitorExecution)
                        .maxTimeDiffSeconds(maxTimeDiffSeconds)
                        .jobShardingStrategyClass(jobShardingStrategyClass)
                        .reconcileIntervalMinutes(reconcileIntervalMinutes)
                        .build();

                // 	创建一个Spring的beanDefinition
                BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(SpringJobScheduler.class);
                factory.setInitMethodName("init");
                factory.setScope("prototype");

                //	1.添加bean构造参数,相当于添加自己的真实的任务实现类
                if (!ElasticJobTypeEnum.script.getType().equals(jobTypeName)) {
                    factory.addConstructorArgValue(confBean);
                }
                //	2.添加注册中心
                factory.addConstructorArgValue(this.zookeeperRegistryCenter);
                //	3.添加LiteJobConfiguration
                factory.addConstructorArgValue(jobConfig);

                //	4.如果有eventTraceRdbDataSource 则也进行添加
                if (StringUtils.hasText(eventTraceRdbDataSource)) {
                    BeanDefinitionBuilder rdbFactory = BeanDefinitionBuilder.rootBeanDefinition(JobEventRdbConfiguration.class);
                    rdbFactory.addConstructorArgReference(eventTraceRdbDataSource);
                    factory.addConstructorArgValue(rdbFactory.getBeanDefinition());
                }

                //  5.添加监听
                List elasticJobListeners = getTargetElasticJobListeners(conf);
                factory.addConstructorArgValue(elasticJobListeners);

                // 	接下来就是把factory 也就是 SpringJobScheduler注入到Spring容器中
                DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory();

                String registerBeanName = conf.name() + "SpringJobScheduler";
                defaultListableBeanFactory.registerBeanDefinition(registerBeanName, factory.getBeanDefinition());
                SpringJobScheduler scheduler = (SpringJobScheduler)applicationContext.getBean(registerBeanName);
                scheduler.init();
                log.info("启动elastic-job作业: " + jobName);
            }
            log.info("共计启动elastic-job作业数量为: {} 个", beanMap.values().size());

        } catch (Exception e) {
            log.error("elasticjob 启动异常, 系统强制退出", e);
            System.exit(1);
        }
    }

    private List getTargetElasticJobListeners(ElasticJobConfig conf) {
        List result = new ManagedList(2);
        String listeners = conf.listener();
        if (StringUtils.hasText(listeners)) {
            BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(listeners);
            factory.setScope("prototype");
            result.add(factory.getBeanDefinition());
        }

        String distributedListeners = conf.distributedListener();
        long startedTimeoutMilliseconds = conf.startedTimeoutMilliseconds();
        long completedTimeoutMilliseconds = conf.completedTimeoutMilliseconds();

        if (StringUtils.hasText(distributedListeners)) {
            BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(distributedListeners);
            factory.setScope("prototype");
            factory.addConstructorArgValue(Long.valueOf(startedTimeoutMilliseconds));
            factory.addConstructorArgValue(Long.valueOf(completedTimeoutMilliseconds));
            result.add(factory.getBeanDefinition());
        }
        return result;
    }

}

6.使用注解的方式配置,加载

package com.xp.rabbit.task.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ElasticJobConfig {

	String name();	//elasticjob的名称
	
	String cron() default "";
	
	int shardingTotalCount() default 1;
	
	String shardingItemParameters() default "";
	
	String jobParameter() default "";
	
	boolean failover() default false;
	
	boolean misfire() default true;
	
	String description() default "";
	
	boolean overwrite() default false;
	
	boolean streamingProcess() default false;
	
	String scriptCommandLine() default "";
	
	boolean monitorExecution() default false;
	
	public int monitorPort() default -1;	//must

	public int maxTimeDiffSeconds() default -1;	//must

	public String jobShardingStrategyClass() default "";	//must

	public int reconcileIntervalMinutes() default 10;	//must

	public String eventTraceRdbDataSource() default "";	//must

	public String listener() default "";	//must

	public boolean disabled() default false;	//must

	public String distributedListener() default "";

	public long startedTimeoutMilliseconds() default Long.MAX_VALUE;	//must

	public long completedTimeoutMilliseconds() default Long.MAX_VALUE;		//must

	public String jobExceptionHandler() default "com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler";

	public String executorServiceHandler() default "com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler";
	
}
package com.xp.rabbit.task.annotation;



import com.xp.rabbit.task.autoconfigure.JobParseAutoConfiguration;
import org.springframework.context.annotation.import;

import java.lang.annotation.*;


@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@documented
@Inherited
@import(JobParseAutoConfiguration.class)
public @interface EnableElasticJob {
}

7.使用
在RabbitProducerAutoConfiguration开启EnableElasticJob进行
EnableElasticJob配置解析

package com.xp.rabbit.producer.autoconfigure;

import com.xp.rabbit.task.annotation.EnableElasticJob;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;


@EnableElasticJob
@Configuration
@ComponentScan({"com.xp.rabbit.producer.*"})
public class RabbitProducerAutoConfiguration {
}

在相关job配置zookeeper和其他配置

package com.xp.rabbit.producer.task;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.xp.rabbit.producer.broker.RabbitBroker;
import com.xp.rabbit.producer.constant.BrokerConstant;
import com.xp.rabbit.producer.constant.BrokerStatus;
import com.xp.rabbit.producer.entity.BrokerMessage;
import com.xp.rabbit.producer.service.BrokerMessageService;
import com.xp.rabbit.task.annotation.ElasticJobConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.List;


@Component
@Slf4j
@ElasticJobConfig(
        name= "com.xp.rabbit.producer.task.RetryMessageDataflowJob",
        cron= "0/10 * * * * ?",
        description = "可靠性投递消息补偿任务",
        overwrite = true,
        shardingTotalCount = 1
)
public class RetryMessageDataflowJob implements DataflowJob {
   @Autowired
   private BrokerMessageService brokerMessageService;
   @Autowired
   private RabbitBroker broker;
    
    @Override
    public List fetchData(ShardingContext shardingContext) {
        //查询发送状态的记录
        List brokerMessages = brokerMessageService.queryBrokerMessageStatus4Timeout(BrokerStatus.SENDING.getCode());
        log.info("--------@@@@@ 抓取数据集合, 数量:	{} 	@@@@@@-----------" , brokerMessages.size());
        return brokerMessages;
    }

    @Override
    public void processData(ShardingContext shardingContext, List brokerMessageList) {
        if(!CollectionUtils.isEmpty(brokerMessageList)){
            brokerMessageList.forEach(brokerMessage -> {
                if(brokerMessage.getTryCount()> BrokerConstant.MAX_TRY_TIME){
                    //超过次数将状态设置为发送失败
                    brokerMessageService.updateFailBrokerMessage(brokerMessage.getMessageId());
                    log.error("此条记录重发三次失败,id:{},message:{}",brokerMessage.getMessageId(),brokerMessage.getMessage());
                }else {
                    //筛选发送没有超过规定次数的记录进行重发并修改中间记录
                    brokerMessageService.update4TryCount(brokerMessage.getMessageId());
                    //重发记录
                    broker.onReliable(brokerMessage.getMessage());
                }

            });
        }
    }
}
5.测试

1.引入依赖


			cn.xp.rabbitmq.parent
			rabbit-core-producer
			1.0-SNAPSHOT
		

2.配置rabbit和zookeeper的配置application.properties

server.context-path=/test
server.port=8001

spring.application.name=test

spring.rabbitmq.addresses=ip:port
spring.rabbitmq.username=
spring.rabbitmq.password=
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.listener.simple.auto-startup=false

elastic.job.zk.serverLists=ip:port  
elastic.job.zk.namespace=elastic-job

3.扫描相关包

package com.bfxy.rabbit;

import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

@Configuration
@ComponentScan({"com.xp.rabbit.*"})
public class MainConfig {

}
4.总结

技术栈:
1.rabbitMq springboot mybatis elstic-job zookeeper
2.可靠性投递保证:使用手动ack落库消息的记录,再使用elastic-job进行job查询,如果状态为失败且重试次数小于规定次数,再进行发送
3.建造者模式创建message
4.异步线程进行发送消息
5.根据消息的topic池化rabbit Template
6.代理模式进行消息转换
7.注解方法配置相关elastic-job的
8.在springboot的onApplicationEvent时候配置自己的jobparse

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/676055.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号