-
linux环境(vm环境)
-
docker环境
-
zookeeper 环境
kafka的工作依赖于zookeeper,在搭建kafka集群时,必须搭建好zookeeper集群,准备三台服务器或虚拟机比较麻烦,为了简化使用docker环境。
参考zookeeper集群(docker)搭建
如图三个zookeeper容器组成的集群
-
拉取镜像
docker pull wurstmeister/kafka
-
创建容器
docker run -d --name=kafka1 --restart=always -p 9092:9092 --network=my-net -e KAFKA_ADVERTISED_HOST_NAME=192.168.48.131 -e HOST_IP=192.168.48.131:9092 -e KAFKA_ADVERTISED_PORT=9092 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper1:2181,zookeeper2:2182,zookeeper3:2183 -e KAFKA_BROKER_ID=0 wurstmeister/kafka:latest
参数说明:
- –network: 使用docker 自定义的网络通道
- KAFKA_ADVERTISED_HOST_NAME:宿主机地址
- KAFKA_ADVERTISED_PORT:宿主机端口
- KAFKA_ZOOKEEPER_CONNECT:zookeeper集群地址
- KAFKA_BROKER_ID:broked.id集群中必须唯一
- HOST_IP:暴露的宿主机地址
如上创建三个容器
注:修改容器名称与端口号
3、kafka集群监控使用KafkaOffsetMonitor-assembly-0.4.6.jar对kafka集群监控
1、在/opt/module/下创建kafka-offset-console文件夹
2、将上传的jar包放入刚创建的目录下
3、在/opt/module/kafka-offset-console目录下创建mobile-logs文件夹
4、在/opt/module/kafka-offset-console目录下创建启动脚本start.sh
java -cp KafkaOffsetMonitor-assembly-0.4.6-SNAPSHOT.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --offsetStorage kafka --kafkaBrokers 192.168.48.131:9092,192.168.48.131:9093,192.168.48.131:9094 --kafkaSecurityProtocol PLAINTEXT --zk 192.168.48.131:2181,192.168.48.131:2182,192.168.48.131:2183 --port 8086 --refresh 10.seconds --retain 2.days --dbName offsetapp_kafka &
5、启动监控
./start.sh
6、在主机访问测试
致此集群搭建完成;
二、springboot整合 1、导入依赖2、配置文件org.springframework.kafka spring-kafka
server.port=8080 #============== kafka =================== spring.kafka.bootstrap-servers=192.168.48.131:9092,192.168.48.131:9093,192.168.48.131:9094 #=============== provider ======================= spring.kafka.producer.retries=0 spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.interceptor.class=com.example.demo.Interceptor.TimeInterceptor,com.example.demo.Interceptor.CounterInterceptor spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #=============== consumer ======================= spring.kafka.consumer.group-id=user-log-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer3、配置类
@Configuration
public class KafkaConfigration {
@Autowired
private KafkaProperties properties;
@Value("#{'${spring.kafka.producer.interceptor.class}'.split(',')}")
private ArrayList interceptors;
@Bean
public ProducerFactory, ?> kafkaProducerFactory(ObjectProvider customizers) {
Map map = this.properties.buildProducerProperties();
map.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);
DefaultKafkaProducerFactory, ?> factory = new DefaultKafkaProducerFactory<>(map);
String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
return factory;
}
}
4、Controller层
@Autowired
private KafkaTemplate kafkaTemplate;
//从前端接收消息,并调用生产者封装的api发送消息
@GetMapping("/sendMassage/{massage}")
public String sendMassage(@PathVariable("massage") String massage){
kafkaTemplate.send("first", JSON.toJSONString(massage));
return "消息已发送";
}
5、消费消息
@KafkaListener(topics = {"first"})
public String receMassage(ConsumerRecord,?> consumerRecord){
//判断是否为null
Optional> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if(kafkaMessage.isPresent()){
//得到Optional实例中的值
Object message = kafkaMessage.get();
System.err.println("消费消息:"+message);
}
return null;
}
6、拦截器
在拦截器中对消息进行处理
1、时间拦截器@Component public class TimeInterceptor implements ProducerInterceptor2、计数拦截器{ @Override public void configure(Map map) { } @Override public ProducerRecord onSend(ProducerRecord producerRecord) { return new ProducerRecord(producerRecord.topic(), producerRecord.partition(), producerRecord.timestamp(), producerRecord.key(), new SimpleDateFormat("yyyy/MM/dd HH-mm-ss").format(System.currentTimeMillis()) + "," + producerRecord.value().toString()); } @Override public void onAcknowledgement(Recordmetadata recordmetadata, Exception e) { } @Override public void close() { } }
@Component public class CounterInterceptor implements ProducerInterceptor7、日志配置{ private int errorCounter = 0; private int successCounter = 0; @Override public ProducerRecord onSend(ProducerRecord producerRecord) { return producerRecord; } @Override public void onAcknowledgement(Recordmetadata recordmetadata, Exception e) { // 统计成功和失败的次数 if (e == null) { successCounter++; } else { errorCounter++; } } @Override public void close() { // 保存结果 System.out.println("Successful sent: " + successCounter); System.out.println("Failed sent: " + errorCounter); } @Override public void configure(Map map) { } }
8、测试logback %d{HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n true poslog/%d{yyyy-MM-dd}/%d{yyyy-MM-dd}.log %d{yyyy-MM-dd HH:mm:ss} -%msg%n %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n



