栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据

springboot 集成kafka,自定义序列化类

大数据 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

springboot 集成kafka,自定义序列化类

spring:
  kafka:
    # 以逗号分隔的地址列表,用于建立与Kafka集群的初始连接(kafka 默认的端口号为9092)
    bootstrap-servers: ip1:port1,ip2:port2,ip3:port3
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringDeserializer

序列化方式换成 value-serializer::com.xxx.xxx.ObjectSerializer 

反序列化方式换成value-serializer::com.xxx.xxx.ObjectDeserializer

注意:kafka属性配置文件中红色,可以满足只发送字符串数据时候用。然而想发送对象数据,就得用以下自定义序列类来实现。java代码如下:

1.ObjectSerializer.java

public class ObjectSerializer implements Serializer {

	@Override
	public byte[] serialize(String topic, Object data) {
		if(data instanceof byte[]){
			return (byte[])data;
		}else{
			return BeanUtils.ObjectToBytes(data);
		}
	}
} 

2.ObjectDeserializer.java

public class ObjectDeserializer implements Deserializer {
    @Override
    public Object deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }
        return BeanUtils.BytesToObject(data);
    }
} 

3.BeanUtils.java

public class BeanUtils {
    private BeanUtils(){}
    
    public static byte[] ObjectToBytes(Object obj){
        byte[] bytes = null;
        ByteArrayOutputStream bo = null;
        ObjectOutputStream oo = null;
        try {
            bo = new ByteArrayOutputStream();
            oo = new ObjectOutputStream(bo);
            oo.writeObject(obj);
            bytes = bo.toByteArray();

        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            try {
                if(bo!=null){
                    bo.close();
                }
                if(oo!=null){
                    oo.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return bytes;
    }
    
    public static Object BytesToObject(byte[] bytes){
        Object obj = null;
        ByteArrayInputStream bi = null;
        ObjectInputStream oi = null;
        try {
            bi =new ByteArrayInputStream(bytes);
            oi =new ObjectInputStream(bi);
            obj = oi.readObject();

        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            try {
                if(bi!=null){
                    bi.close();
                }
                if(oi!=null){
                    oi.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return obj;
    }
}

4.发送对象代码

public void sendObj(){
        try{
            log.info("发送给kafka 发对象开始");
            Map m = new HashMap();
            m.put("name", "jim");
            kafkaTemplate.send(topicName, m);//或者这里发字符串也一样可用
            log.info("发送给kafka结束");
        }catch(Exception e){
            e.printStackTrace();
        }
}

5.消费对象代码

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"${config.kafka.send-topic}"},groupId = "${config.kafka.group-id}")
    public void onMessage1(ConsumerRecord record){
        String a = record.value().toString();
        log.info("消费的数据:"+ a);
    }

}

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号