kafka官网: Apache Kafka
公司使用阿里云提供的kafka消息队列服务,分别为测试环境与生产环境,部署了多个集群。
使用场景:应用对外提供API接口调用,同时支持kafka增量消息推送,其他应用使用方可监听消费消息,相当于是多个consumer共同消费topic中的数据。
应用项目引入KafKa方式,现在项目脚手架基本都是SpringBoot,可以引入依赖
spring-kafka
也可以通过引入依赖
org.apache.kafka kafka-clients2.4.1
发消息工具:
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class KafkaSender {
@Resource
private KafkaTemplate
public void sendMsg(String topic , String message){
kafkaTemplate.send(topic ,message);
}
}
定时开启和关闭消费:
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
@Slf4j
public class KafkaConfigurationTask {
@Resource
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Scheduled(cron = "0 0 1 * * ?")
public void startListener() {
log.info("开启消费消息-定时任务开启");
MessageListenerContainer container = kafkaListenerEndpointRegistry.getListenerContainer("act-task");
if (!container.isRunning()) {
container.start();
}
container.resume();
}
@Scheduled(cron = "0 0 6 * * ?")
public void shutdownListener() {
log.info("关闭消费消息-定时任务关闭");
MessageListenerContainer container = kafkaListenerEndpointRegistry.getListenerContainer("act-task");
container.pause();
}
}
消费者
通过注解@KafkaListener方式或定时轮询方式



