栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

5.1 Kafka入门和Spring整合

5.1 Kafka入门和Spring整合

阻塞队列

BlockingQueue是一个队列接口,ArrayBlockingQueue等是它的实现类,ArrayBlockingQueue是底层是数组;生产者和消费者公用一个队列,生产者使用put方法生产数据,消费者使用take方法取出数据;当队列已满的时候,生产者线程被阻塞;当队列为空的时候,消费者线程被阻塞;
Kafka入门

kafaka把数据存放到硬盘里 对硬盘顺序读写具有较高的性能,这也是高吞吐量的基础

消息队列的两种模式(Kafka采用发布订阅模式)

**点对点模式:**生产者把数据生产到队列里,多个消费者从队列里取值,值被取出就从队列里消失,多个消费者取到的值不重复,每个数据只被一个消费者消费;

**发布订阅模式:**生产者把数据放到指定位置,多个消费者可以从指定位置读取数据,多个消费者读到的数据可以重复,可以同时先后读取数据;

Kafka术语

Broker:Kafka中的每一台服务器; Zookeeper:用来管理集群;

Topic:主题,存取数据的指定位置; Partition:对主题位置的分区,增强服务器处理,并发能力;

Offset:消息在分区里存放的索引; Replica:数据副本,每一个分区有多个副本;

Leader Replica:主数据副本,可以处理请求,提供数据; Follower Replica:从副本,只负责从主副本备份数据,不做相应;

分布式的时候如果主数据副本挂了,Zookeeper会从从数据副本里选择一个作为主数据副本;

linux命令:.sh Windows:.bat

Kafka下载安装

官网下载地址:

https://www.apache.org/dyn/closer.cgi?path=/kafka/3.1.0/kafka_2.13-3.1.0.tgz

先配置

1、数据存放的位置 打开zookeeper.properties 配置 dataDir=d:/work/data/zookeeper

2、日志文件存放的位置 打开server.properties 配置 log.dirs=d:/work/data/kafka-logs

官网快速入门:

https://kafka.apache.org/quickstart

先开启zookeeper集群,在开启Kafka

注意:需要cd 到Kafaka的安装目录下,不能直接cd到windows目录下,不然启动的时候会出现找不到Zookeeper配置文件的错误!

在windows的命令行里启动kafka之后,当关闭命令行窗口时,就会强制关闭kafka。这种关闭方式为暴力关闭,很可能会导致kafka无法完成对日志文件的解锁。届时,再次启动kafka的时候,就会提示日志文件被锁,无法成功启动。因此需要使用相应的命令进行关闭!

#进入到Kafaka目录

 cd D:workkafka_2.13-3.1.0

# 启动服务器 (先启动zookeeper服务器,再启动kafka) 不要直接点击关闭,使用关闭命令,

binwindowszookeeper-server-start.bat configzookeeper.properties
binwindowskafka-server-start.bat configserver.properties

# 创建主题

binwindowskafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

# 查看当前服务器的主题

binwindowskafka-topics.bat --list --bootstrap-server localhost:9092

# 创建生产者,往指定主题上发消息

binwindowskafka-console-producer.bat --broker-list localhost:9092 --topic test

# 在开启一个终端来创建消费者

binwindowskafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

# 关闭zookeeper服务器

binwindowszookeeper-server-stop.bat

# 关闭kafka服务器

binwindowskafka-server-stop.bat
Spring整合Kafka 1、引入依赖 - spring-kafka

为了避免版本冲突,因此删除版本 父pom中已经声明相应版本


    org.springframework.kafka
    spring-kafka

2、配置Kafka

配置server、consumer

是否自动提交消费者读取的偏移量 自动提交的频率/ms

# KafkaProperties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=3000
3、 访问Kafka

kafkaTemplate是Spirng整合的类

-生产者 kafkaTemplate.send(topic, data);

-消费者 通过注解监听器实现,一个或者多个需要监听的主题;如果主题上有消息就会调用 handleMessage取处理,把消息包装成ConsumerRecord record传入方法

@KafkaListener(topics = {“test”})

public void handleMessage(ConsumerRecord record) {}

生产者发消息需要主动调用 消费者处理消息是被动的,略有延迟

@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {

    @Autowired(required = false)
    private KafkaProducer kafkaProducer;

    @Test
    public void testKafka() {
        kafkaProducer.sendMessage("test", "你好");
        kafkaProducer.sendMessage("test", "在吗");

        try {
            Thread.sleep(1000 * 2);
            kafkaProducer.sendMessage("test", "hallo");
            Thread.sleep(1000 * 2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

@Component
class KafkaProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void sendMessage(String topic, String content) {
        kafkaTemplate.send(topic, content);
    }

}

@Component
class KafkaConsumer {

    @KafkaListener(topics = {"test"})
    public void handleMessage(ConsumerRecord record) {
        System.out.println(record.value());
    }


}
);
    }

}

@Component
class KafkaConsumer {

    @KafkaListener(topics = {"test"})
    public void handleMessage(ConsumerRecord record) {
        System.out.println(record.value());
    }


}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758536.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号