栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

springboot+kafka详细

springboot+kafka详细

集成kafka

1.引入kafka maven依赖

  
        
            org.springframework.kafka
            spring-kafka
        
        
        
            com.alibaba
            fastjson
            1.2.47
        
        
        
            commons-lang
            commons-lang
            2.4
            provided
        

2.服务器安装Kafka
(1).下载kafka压缩包

https://kafka.apache.org/downloads

(2).安装Kafka

tar -zxvf kafka_2.13-3.0.0.tgz 

(3).修改kafka的配置

#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://127.0.0.1:9092
advertised.listeners=PLAINTEXT://127.0.0.1:9092
log.dirs=/usr/local/kafka_2.11-0.9.0.1/kafka-logs//

(4).修改kafka内置zookeeper的配置

dataDir=/usr/local/kafka/zookeeper
dataLogDir=/usr/local/kafka/zookeeper-logs
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=100
tickTime=2000
initLimit=10
syncLimit=5
# Disable the admi

(5).开启zookeeper和kafka

bin/zookeeper-server-start.sh config/zookeeper.properties 
bin/kafka-server-start.sh config/server.properties 
为了方便可以自己创建一个启动脚本
进入kafka目录下 输入命令:vi  kafkaStart.sh
 
添加内容为:
#!/bin/bash
#启动zookeeper
/DATA/kafka/kafka_2.12-2.0.0/bin/zookeeper-server-start.sh /DATA/kafka/kafka_2.12-2.0.0/config/zookeeper.properties &
sleep 3  #默默等3秒后执行 
#启动kafka
/DATA/kafka/kafka_2.12-2.0.0/bin/kafka-server-start.sh /DATA/kafka/kafka_2.12-2.0.0/config/server.properties &
 broker.id:当前机器在集群中的唯一标识。例如有三台Kafka主机,则分别配置为1,2,3。

listeners:服务监听端口。

advertised.listeners:提供给生产者,消费者的端口号,即外部访问地址。默认为listeners的值。

zookeeper.connect:zookeeper连接地址。如有集群配置,每台Kafka主机都需要连接全部zookeeper服务

启动结果

(6).创建topic主题

./kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 -partitions 3 -replication-factor 1

(7).启动生产者连接主题

./kafka-console-producer.sh --broker-list localhost:9092 --topic test

(8).启动消费者连接主题

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

3.springboot开始集成Kafka,添加配置

#kafka
spring.applicationname=kafka-tutorial
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=172.31.111.11:9092
spring.kafka.producer.retries: 0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size: 16384
# 缓存容量
spring.kafka.producer.buffer-memory: 33554432
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 指定默认消费者group id
spring.kafka.consumer.group-id=demo
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 指定listener容器中的线程数,用于提高并发量
spring.kafka.listener.concurrency=3

4.创建测试bean包,创建kafka生产者与消费者
(1).创建生产者KafkaProduction

import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;


@Component
public class KafkaProduction {
    private Logger logger = LoggerFactory.getLogger(KafkaProduction.class);
    @Autowired
    private KafkaTemplate kafkaTemplate;
    
    public void send(T obj,String topics) {
        String jsonObj = JSON.toJSONString(obj);
        logger.info("----kafka---- message = {}", jsonObj);
        //发送消息
        ListenableFuture> future = kafkaTemplate.send(topics, jsonObj);
        future.addCallback(new ListenableFutureCallback>() {
            @Override
            public void onFailure(Throwable throwable) {
                logger.info("KafkaProduction: kafka to be sent:" + throwable.getMessage());
            }
            @Override
            public void onSuccess(SendResult stringObjectSendResult) {	//成功消费
                //TODO 业务处理
                logger.info("KafkaProduction: The message has be sent successfully:");
                logger.info("KafkaProduction: =============== result: " + stringObjectSendResult.toString());
            }
        });
    }
}

(2).创建KafkaTopicsConstant常量类

public interface IKafkaSenderService {
    
    public void send();
}

(3).创建消费者KafkaConsumer

import com.iflytek.bim.cop.contant.KafkaTopicsConstant;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;

@Component
public class KafkaConsumer {
        private Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
        
        @KafkaListener(topics = KafkaTopicsConstant.TEST_TOPICS)
        public void listen(ConsumerRecord record) {
            Optional kafkaMessage = Optional.ofNullable(record.value());

            if (kafkaMessage.isPresent()) {
                Object message = kafkaMessage.get();
                logger.info("KafkaConsumer  接收: ================= Topic:" + record.topic());
                logger.info("KafkaConsumer  接收: ================= Record:" + record);
                logger.info("KafkaConsumer  接收: ================= Message:" + message);
            }
        }
}

(4).发送实现

import com.iflytek.bim.cop.component.KafkaProduction;
import com.iflytek.bim.cop.contant.KafkaTopicsConstant;
import com.iflytek.bim.cop.domain.entity.User;
import org.springframework.beans.factory.annotation.Autowired;


public class KafkaSenderServiceImpl implements IKafkaSenderService {
    @Autowired
    private KafkaProduction kafkaProduction;

    @Override
    public void send() {
        User user = new User();
        user.setLoginName("I am is a panda");
        user.setPassword("5588996633");
        kafkaProduction.send(user, KafkaTopicsConstant.TEST_TOPICS);
    }
}

(5).测试Kafka

    @ResponseBody
    @RequestMapping("/")
    public void kafkatest() {
        kafkaSenderService.send();
    }

(6).Kafka测试结果

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/751819.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号