栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

centos7安装kafka及简单使用

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

centos7安装kafka及简单使用

【启动kafka前,需要调整这个配置文件(/opt/kafka-2.1.0/config/server.properties)的一处ip地址】

因为每次虚拟机的ip地址可能会有所不同,这和所在网络有关,也可将ip固定,这样不需要调整配置文件

安装并使用

centos7安装kafka

1.解压2.修改配置文件3.创建启动文件4.启动文件赋予权限5.直接启动kafka的启动文件即可6.命令行简单测试 安装kafka-tool客户端java程序调用

生产者

pom.xml文件引入依赖application.yml文件配置kafka信息开发Bean对象,初始化kafka地址,以方便调用controller调用postman测试 消费者

centos7安装kafka

前辈的数据不丢失思想

1.解压

tar -xvf kafka_2.11-2.1.0.tgz
mv kafka_2.11-2.1.0 kafka-2.1.0

2.修改配置文件

cd /opt/kafka-2.1.0/config

【修改server.properties文件】
vim server.properties
修改内容如下:
broker.id=0
#端口号
port=9092
#单机可直接用localhost
host.name=localhost
listeners=PLAINTEXT://192.168.43.196:9092
#日志存放路径
log.dirs=/DATA/kafka/kafka_2.12-2.0.0/log
#zookeeper地址和端口,单机配置部署,localhost:2181
zookeeper.connect=localhost:2181

【修改zookeeper.properties文件】
vim zookeeper.properties
修改内容如下:
#zookeeper数据目录
dataDir=/opt/zookeeper-3.4.8/data
#zookeeper日志目录
dataLogDir=/opt/zookeeper-3.4.8/log
clientPort=2181
maxClientCnxns=100
tickTime=2000
initLimit=10

3.创建启动文件

vim kafkaStart.sh

内容如下:
#!/bin/bash
#启动zookeeper
/opt/kafka-2.1.0/bin/zookeeper-server-start.sh /opt/kafka-2.1.0/config/zookeeper.properties &
sleep 3 #默默等3秒后执行
#启动kafka
/opt/kafka-2.1.0/bin/kafka-server-start.sh /opt/kafka-2.1.0/config/server.properties &

4.启动文件赋予权限

chmod +x kafkaStart.sh

5.直接启动kafka的启动文件即可

cd bin
./kafkaStart.sh

6.命令行简单测试

#创建名字为test的topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

#查看topic列表
./kafka-topics.sh --list --zookeeper localhost:2181

#删除名字为test的topic
./kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

#开启两个终端 (生产者)
./kafka-console-producer.sh --broker-list localhost:9092 --topic test

#消费者
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

安装kafka-tool客户端

傻瓜式安装,通过ip进行连接

java程序调用 生产者 pom.xml文件引入依赖


	org.springframework.kafka
    spring-kafka

application.yml文件配置kafka信息
#kafka推送消息信息配置
kafka:
  #url
  url: 192.168.124.22:9092
  #topic名称
  topic: testKafka
  #发生错误后,消息重发的次数
  retries: 0
  #Broker对producer即将发送来的数据采用何种确认方式
  acks: 1
开发Bean对象,初始化kafka地址,以方便调用

MetricsCallBack 回调方法(获取发送消息的结果)

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Recordmetadata;


@Slf4j
public abstract class MetricsCallBack implements Callback {

    private final long startTime;
    private final String key;
    private final String message;

    
    public MetricsCallBack(long startTime,String key,String message){
        this.startTime=startTime;
        this.key=key;
        this.message=message;
    }

    @Override
    public void onCompletion(Recordmetadata metadata, Exception exception) {
        long cost=System.currentTimeMillis()-startTime;
        if(metadata!=null){
            onSendFinish(this.key,this.message,true,cost,metadata.topic(),metadata.partition(),metadata.offset());
        }else{
            log.error("send message to kafka failed!",exception);
            onSendFinish(this.key,this.message,false,cost,null,-1,-1);
        }
    }

    
    public abstract void onSendFinish(String key,String message,boolean success,long useTime,String topic,int partition,long offset);

}

KafkaProducerHelper 工具类(发送消息)

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;


@Slf4j
public class KafkaProducerHelper {

    private final KafkaProducer producer;

    public KafkaProducerHelper(String servers,String acks,int retries){
        log.info("有参构造方法....");
        Properties properties=new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,servers);
        properties.put(ProducerConfig.CLIENT_ID_CONFIG,"clientId");
        properties.put(ProducerConfig.ACKS_CONFIG,acks);
        properties.put(ProducerConfig.RETRIES_CONFIG,retries);
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,100*1024);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producer=new KafkaProducer(properties);
    }

    
    public void send(String topic,String message,boolean printLog){
        if(printLog){
            log.info("topic:{},message:{}",topic,message);
        }
        try {
            this.producer.send(new ProducerRecord<>(topic,message)).get();
        } catch (InterruptedException |ExecutionException e) {
            log.error("get result error!",e);
        }
    }

    
    public void send(String topic,String key,String message,boolean printLog){
        if(printLog){
            log.info("topic:{},key:{},message:{}",topic,key,message);
        }
        try {
            this.producer.send(new ProducerRecord<>(topic,key,message)).get();
        } catch (InterruptedException |ExecutionException e) {
            log.error("get result error!",e);
        }
    }

    
    public void send(String topic,String message,MetricsCallBack metricsCallBack,boolean printLog){
        if(printLog){
            log.info("topic:{},message:{}",topic,message);
        }
        this.producer.send(new ProducerRecord<>(topic,message),metricsCallBack);
    }
    
    public void send(String topic,String key,String message,MetricsCallBack metricsCallBack,boolean printLog){
        if(printLog){
            log.info("topic:{},key:{},message:{}",topic,key,message);
        }
        this.producer.send(new ProducerRecord<>(topic,key,message),metricsCallBack);
    }
}

KafkaConfig 将kafka配置信息作为bean加载到容器中

import com.bigdata.bigdata.util.kafka.KafkaProducerHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class KafkaConfig {

    @Value("${kafka.url}")
    private String bootStrapServers;
    @Value("${kafka.retries}")
    private int retries;
    @Value("${kafka.acks}")
    private String acks;

    @Bean
    public KafkaProducerHelper getKafkaProducerHelper(){
        return new KafkaProducerHelper(bootStrapServers,acks,retries);
    }
}

KakfaProducerComponent 封装发送消息供他人调用

import com.alibaba.fastjson.JSON;
import com.bigdata.bigdata.util.kafka.KafkaProducerHelper;
import com.bigdata.bigdata.util.kafka.MetricsCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;


@Component
@Slf4j
public class KakfaProducerComponent {

    @Value("${kafka.topic}")
    private String topicKey;

    @Autowired
    private KafkaProducerHelper kafkaProducerHelper;

    @Async
    public void sendMessage(String orderId,int status){
        MetricsCallBack metricsCallBack=null;
        try{
            Map map=new HashMap();
            map.put("orderId",orderId);
            map.put("status",status);
            String message= JSON.toJSONString(map);

            metricsCallBack = new MetricsCallBack(System.currentTimeMillis(),"",message){

                @Override
                public void onSendFinish(String key, String message, boolean success, long useTime, String topic, int partition, long offset) {
                    log.info("推送消息:{},分区:{},偏移量:{}",message,partition,offset);
                }
            };
            kafkaProducerHelper.send(topicKey,message,metricsCallBack,true);
        }catch (Exception e){
            log.error("kafka发生异常",e);
        }finally {
            metricsCallBack=null;
        }
    }
}

controller调用
import com.bigdata.bigdata.component.KakfaProducerComponent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


@RestController
@RequestMapping("/kafka")
public class KafkaController {

    @Autowired
    private KakfaProducerComponent kafkaProducer;

    @GetMapping("/sendMessage")
    public void sendMessage(String orderId,int status){
        kafkaProducer.sendMessage(orderId,status);
    }
}

postman测试


接下来就是在kafkaTool客户端查看数据

消费者

待完成

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/785872.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号