BlockingQueue是一个队列接口,ArrayBlockingQueue等是它的实现类,ArrayBlockingQueue是底层是数组;生产者和消费者公用一个队列,生产者使用put方法生产数据,消费者使用take方法取出数据;当队列已满的时候,生产者线程被阻塞;当队列为空的时候,消费者线程被阻塞;
Kafka入门
kafaka把数据存放到硬盘里 对硬盘顺序读写具有较高的性能,这也是高吞吐量的基础
消息队列的两种模式(Kafka采用发布订阅模式)
**点对点模式:**生产者把数据生产到队列里,多个消费者从队列里取值,值被取出就从队列里消失,多个消费者取到的值不重复,每个数据只被一个消费者消费;
**发布订阅模式:**生产者把数据放到指定位置,多个消费者可以从指定位置读取数据,多个消费者读到的数据可以重复,可以同时先后读取数据;
Kafka术语Broker:Kafka中的每一台服务器; Zookeeper:用来管理集群;
Topic:主题,存取数据的指定位置; Partition:对主题位置的分区,增强服务器处理,并发能力;
Offset:消息在分区里存放的索引; Replica:数据副本,每一个分区有多个副本;
Leader Replica:主数据副本,可以处理请求,提供数据; Follower Replica:从副本,只负责从主副本备份数据,不做相应;
分布式的时候如果主数据副本挂了,Zookeeper会从从数据副本里选择一个作为主数据副本;
linux命令:.sh Windows:.bat
Kafka下载安装官网下载地址:
https://www.apache.org/dyn/closer.cgi?path=/kafka/3.1.0/kafka_2.13-3.1.0.tgz
先配置
1、数据存放的位置 打开zookeeper.properties 配置 dataDir=d:/work/data/zookeeper
2、日志文件存放的位置 打开server.properties 配置 log.dirs=d:/work/data/kafka-logs
官网快速入门:
https://kafka.apache.org/quickstart
先开启zookeeper集群,在开启Kafka
注意:需要cd 到Kafaka的安装目录下,不能直接cd到windows目录下,不然启动的时候会出现找不到Zookeeper配置文件的错误!
在windows的命令行里启动kafka之后,当关闭命令行窗口时,就会强制关闭kafka。这种关闭方式为暴力关闭,很可能会导致kafka无法完成对日志文件的解锁。届时,再次启动kafka的时候,就会提示日志文件被锁,无法成功启动。因此需要使用相应的命令进行关闭!
#进入到Kafaka目录
cd D:workkafka_2.13-3.1.0
# 启动服务器 (先启动zookeeper服务器,再启动kafka) 不要直接点击关闭,使用关闭命令,
binwindowszookeeper-server-start.bat configzookeeper.properties binwindowskafka-server-start.bat configserver.properties
# 创建主题
binwindowskafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
# 查看当前服务器的主题
binwindowskafka-topics.bat --list --bootstrap-server localhost:9092
# 创建生产者,往指定主题上发消息
binwindowskafka-console-producer.bat --broker-list localhost:9092 --topic test
# 在开启一个终端来创建消费者
binwindowskafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
# 关闭zookeeper服务器
binwindowszookeeper-server-stop.bat
# 关闭kafka服务器
binwindowskafka-server-stop.batSpring整合Kafka 1、引入依赖 - spring-kafka
为了避免版本冲突,因此删除版本 父pom中已经声明相应版本
2、配置Kafkaorg.springframework.kafka spring-kafka
配置server、consumer
是否自动提交消费者读取的偏移量 自动提交的频率/ms
# KafkaProperties spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=community-consumer-group spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=30003、 访问Kafka
kafkaTemplate是Spirng整合的类
-生产者 kafkaTemplate.send(topic, data);
-消费者 通过注解监听器实现,一个或者多个需要监听的主题;如果主题上有消息就会调用 handleMessage取处理,把消息包装成ConsumerRecord record传入方法
@KafkaListener(topics = {“test”})
public void handleMessage(ConsumerRecord record) {}
生产者发消息需要主动调用 消费者处理消息是被动的,略有延迟
@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {
@Autowired(required = false)
private KafkaProducer kafkaProducer;
@Test
public void testKafka() {
kafkaProducer.sendMessage("test", "你好");
kafkaProducer.sendMessage("test", "在吗");
try {
Thread.sleep(1000 * 2);
kafkaProducer.sendMessage("test", "hallo");
Thread.sleep(1000 * 2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Component
class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String topic, String content) {
kafkaTemplate.send(topic, content);
}
}
@Component
class KafkaConsumer {
@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record) {
System.out.println(record.value());
}
}
);
}
}
@Component
class KafkaConsumer {
@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record) {
System.out.println(record.value());
}
}



