栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

3步教会你部署使用Kafka

3步教会你部署使用Kafka

前言
  • 话不多说,直接开干,具体一些概念,迟一些分享
准备阶段
  • kafka下载安装地址

    Apache Kafka

  •  

开始部署
  • 上传文件到,服务器,具体路径可以自己定,我使用的是/export/software

    • cd /export/software/

    • rz 选择window下刚才下载的kafka文件(如果没有rz命令,自行百度下载)

    • tar -xvzf kafka_2.13-2.4.1.tgz -C ../server/ (解压)

  • 修改 kafka的配置文件(server.properties)

    • cd /export/server/kafka_2.13-2.4.1/

    • vim server.properties

      #此处定义该服务器的id值,用在集群

      broker.id=0

      #指定kafka的数据保存位置

      log.dirs=/export/server/kafka_2.13-2.4.1/data

      #配置zk的节点(目前用的是单节点,所以只配置单节点,这里取本机地址了)

      zookeeper.connect=localhost:2181

      #多节点的配置如下:zookeeper.connect=node1:2181,node2:2181,node3:2181

  • 配置环境变量

    • vim /etc/profile

      export KAFKA_HOME=/export/server/kafka_2.13-2.4.1

      export PATH=PATH:PATH:PATH:{KAFKA_HOME}

      source /etc/profile

  • 启动zookeeper(因为kafka的启动是依赖于zookeeper的)

    • cd /export/server/kafka_2.13-2.4.1/bin

    • nohup zookeeper-server-start.sh ../config/zookeeper.properties &

    • 检查是否启动成功,jps发现出现QuorumPeerMain,则启动成功

      •  

  • 启动kafka

    nohup kafka-server-start.sh ../config/server.properties &

    检查是否启动成功,jps发现出现Kafka,则启动成功

  • 检查集群是否正常

    • kafka-topics.sh --bootstrap-server localhost:9092 --list

      此命令是查询集群里面的topic数量,如果能正常出现空行或内容,则成功

代码应用(springboot应用)
  • pom.xml引入依赖

  org.springframework.kafka
  spring-kafka

  • application.yml
spring:
  kafka:
    bootstrap-servers: 服务器ip:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: test
      enable-auto-commit: true
      auto-commit-interval: 1000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  • logback.xml (选用,此处是关闭kafka打印的一些日志)

  
  


  • ProducerService.class (生产者类)
package com.producerService;

import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import java.util.List;


@Service
public class ProducerService {
  
  protected static Logger logger = LoggerFactory.getLogger(ProducerService.class);

  @Autowired
  private KafkaTemplate kafkaTemplate;

  public void send() {
    String msg = "hello";
    logger.info("sent message: {}", msg);
    kafkaTemplate.send("Topic1", msg);   //发送到消息中间件,主题为:Topic1
  }

  
}





  • CustomerService.class(消费者类)
package com.CustomerService;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import java.util.Optional;


@Service
public class CustomerService {

  protected static Logger logger = LoggerFactory.getLogger(CustomerService.class);

  @KafkaListener(topics = {"Topic1"})   //订阅主题为Topic1的消息,收到,则打印
  public void receive(ConsumerRecord record) {
    Optional message = Optional.ofNullable(record.value());
    message.ifPresent(msg -> logger.info("message: {}", msg));
  }
  
}
  • OpenController(发起api请求)
package com.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

@RestController
@RequestMapping(value = "/api/open")
public class OpenController {
    @Autowired
    private ProducerService producerService;

    @RequestMapping(value = "/helloKafka", method = { RequestMethod.GET })
    public String helloKafka() {
        producerService.send();
        return "welcome to Kafka";
    }
}

  • 发起请求:ip地址:端口/api/open/helloKafka

    可以看到控制台输出的 hello就是成功了

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/676972.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号