集成kafka
1.引入kafka maven依赖
org.springframework.kafka
spring-kafka
com.alibaba
fastjson
1.2.47
commons-lang
commons-lang
2.4
provided
2.服务器安装Kafka
(1).下载kafka压缩包
https://kafka.apache.org/downloads
(2).安装Kafka
tar -zxvf kafka_2.13-3.0.0.tgz
(3).修改kafka的配置
# listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://127.0.0.1:9092 advertised.listeners=PLAINTEXT://127.0.0.1:9092 log.dirs=/usr/local/kafka_2.11-0.9.0.1/kafka-logs//
(4).修改kafka内置zookeeper的配置
dataDir=/usr/local/kafka/zookeeper dataLogDir=/usr/local/kafka/zookeeper-logs # the port at which the clients will connect clientPort=2181 # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=100 tickTime=2000 initLimit=10 syncLimit=5 # Disable the admi
(5).开启zookeeper和kafka
bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties
为了方便可以自己创建一个启动脚本 进入kafka目录下 输入命令:vi kafkaStart.sh 添加内容为: #!/bin/bash #启动zookeeper /DATA/kafka/kafka_2.12-2.0.0/bin/zookeeper-server-start.sh /DATA/kafka/kafka_2.12-2.0.0/config/zookeeper.properties & sleep 3 #默默等3秒后执行 #启动kafka /DATA/kafka/kafka_2.12-2.0.0/bin/kafka-server-start.sh /DATA/kafka/kafka_2.12-2.0.0/config/server.properties & broker.id:当前机器在集群中的唯一标识。例如有三台Kafka主机,则分别配置为1,2,3。 listeners:服务监听端口。 advertised.listeners:提供给生产者,消费者的端口号,即外部访问地址。默认为listeners的值。 zookeeper.connect:zookeeper连接地址。如有集群配置,每台Kafka主机都需要连接全部zookeeper服务
启动结果
(6).创建topic主题
./kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 -partitions 3 -replication-factor 1
(7).启动生产者连接主题
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
(8).启动消费者连接主题
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
3.springboot开始集成Kafka,添加配置
#kafka spring.applicationname=kafka-tutorial # 指定kafka 代理地址,可以多个 spring.kafka.bootstrap-servers=172.31.111.11:9092 spring.kafka.producer.retries: 0 # 每次批量发送消息的数量 spring.kafka.producer.batch-size: 16384 # 缓存容量 spring.kafka.producer.buffer-memory: 33554432 # 指定消息key和消息体的编解码方式 spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer # 指定默认消费者group id spring.kafka.consumer.group-id=demo spring.kafka.consumer.auto-commit-interval=100 spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true # 指定消息key和消息体的编解码方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 指定listener容器中的线程数,用于提高并发量 spring.kafka.listener.concurrency=3
4.创建测试bean包,创建kafka生产者与消费者
(1).创建生产者KafkaProduction
import com.alibaba.fastjson.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; @Component public class KafkaProduction{ private Logger logger = LoggerFactory.getLogger(KafkaProduction.class); @Autowired private KafkaTemplate kafkaTemplate; public void send(T obj,String topics) { String jsonObj = JSON.toJSONString(obj); logger.info("----kafka---- message = {}", jsonObj); //发送消息 ListenableFuture > future = kafkaTemplate.send(topics, jsonObj); future.addCallback(new ListenableFutureCallback >() { @Override public void onFailure(Throwable throwable) { logger.info("KafkaProduction: kafka to be sent:" + throwable.getMessage()); } @Override public void onSuccess(SendResult stringObjectSendResult) { //成功消费 //TODO 业务处理 logger.info("KafkaProduction: The message has be sent successfully:"); logger.info("KafkaProduction: =============== result: " + stringObjectSendResult.toString()); } }); } }
(2).创建KafkaTopicsConstant常量类
public interface IKafkaSenderService {
public void send();
}
(3).创建消费者KafkaConsumer
import com.iflytek.bim.cop.contant.KafkaTopicsConstant;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
@Component
public class KafkaConsumer {
private Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = KafkaTopicsConstant.TEST_TOPICS)
public void listen(ConsumerRecord, ?> record) {
Optional> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
logger.info("KafkaConsumer 接收: ================= Topic:" + record.topic());
logger.info("KafkaConsumer 接收: ================= Record:" + record);
logger.info("KafkaConsumer 接收: ================= Message:" + message);
}
}
}
(4).发送实现
import com.iflytek.bim.cop.component.KafkaProduction;
import com.iflytek.bim.cop.contant.KafkaTopicsConstant;
import com.iflytek.bim.cop.domain.entity.User;
import org.springframework.beans.factory.annotation.Autowired;
public class KafkaSenderServiceImpl implements IKafkaSenderService {
@Autowired
private KafkaProduction kafkaProduction;
@Override
public void send() {
User user = new User();
user.setLoginName("I am is a panda");
user.setPassword("5588996633");
kafkaProduction.send(user, KafkaTopicsConstant.TEST_TOPICS);
}
}
(5).测试Kafka
@ResponseBody
@RequestMapping("/")
public void kafkatest() {
kafkaSenderService.send();
}
(6).Kafka测试结果



