栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Java实现Kafka的生产消费数据

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Java实现Kafka的生产消费数据

应用场景:用Java实现在kafka 的topic1中写数据,有其他程序对topic1中的数据进行消费,并且会把结果写进topic2中,我们需要做的就是往topic1中写数据,并且监测topic2,如果有数据写进topic2就获取此数据

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SuccessCallback;

@Component
public class CrwalProduce {
	private Logger logger = LoggerFactory.getLogger(CrwalProduce.class);

	@Autowired
	private KafkaTemplate kafkaTemplate;

	public void send(String topic, String json) {
		ListenableFuture> future = kafkaTemplate.send(topic, json);
		future.addCallback(new SuccessCallback() {
			@Override
			public void onSuccess(Object result) {
				logger.info("生产者已发送数据:" + topic);
			}
		}, new FailureCallback() {
			@Override
			public void onFailure(Throwable ex) {
				logger.error(ex.toString());
			}
		});
	}
}

调用此send方法往topic1中写数据

crwalProduce.send(kafkaTopicPrefix.TASK_RESULT_ + task.getEngineId(), task.getParam());

下面就是监听topic2的类

import org.springframework.stereotype.Component;

import java.util.Optional;



@Component
public class KafkaListener {

    @org.springframework.kafka.annotation.KafkaListener(topics = {"task_result_3189f717470a4b2c85dc334e3a5b3a16"})
    public void listen(String record) {
        Optional kafkaMessage = Optional.ofNullable(record);
        if (kafkaMessage.isPresent()) {
            System.out.println("***********************************************");
            System.out.println("消息接收成功");
            Object message = kafkaMessage.get();
            System.out.println("record =" + record);
            System.out.println("message =" + message);
        }
    }

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/462363.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号