1.认识消息队列MQ2.为何使用消息队列3. 为何使用kafka4.阻塞队列示例
生产者线程定义
`queue.put(i)` 消费者线程定义
`queue.take()`
主线程中模拟测试结果
参考牛客网高级项目教程
尚硅谷kafka教学笔记
1.认识消息队列MQ博客链接
MQ(message queue),本质是个阻塞队列
FIFO 先入先出,存放的内容是message一种跨进程的通信机制,用于上下游传递消息
消息队列有两种模式:
点对点模式发布订阅者模式
消息队列三个核心功能:
解耦异步消峰 2.为何使用消息队列
社区项目中,对帖子的点赞、私信、评论等操作频繁,系统会记录这些操作并向指定用户发送消息1.涉及的模块比较多,
为了方便维护,解耦,需要使用到消息队列 2.访问频繁、访问数据库较多,
为了提升性能,异步入库,使用消息队列,将数据先写入消息队列中,再异步入库 3.有可能在一些特殊时刻,例如晚上访问量剧增,
需要消峰处理,防止服务器崩溃,需要用到消息队列消峰功能 3. 为何使用kafka
Kafka , 主要特点是基于Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集传输, 适合产生大量数据的互联网服务的数据收集业务。
本项目中有日志采集,故首选 kafka 4.阻塞队列示例
生产者线程定义 queue.put(i)一共生产100个数间隔为20ms记录每次生产后, 队列中的元素个数
class Producer implements Runnable {
// 每个线程都初始化一个队列来接收传过来的阻塞队列
private BlockingQueue queue;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 100; i++) {
Thread.sleep(20);
queue.put(i);
System.out.println(Thread.currentThread().getName() + "生产了:" + queue.size());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消费者线程定义
queue.take()
消费间隔随机,但比生产慢记录每次消费后,队列中剩下的元素个数
class Consumer implements Runnable {
// 每个线程都初始化一个队列来接收传过来的阻塞队列
private BlockingQueue queue;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while(true) {
Thread.sleep(new Random().nextInt(1000));
queue.take();
System.out.println(Thread.currentThread().getName() + "消费:" + queue.size());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
主线程中模拟
public static void main(String[] args) {
// 实例化一共阻塞队列-使用ArrayBlockingQueue实现
BlockingQueue queue = new ArrayBlockingQueue(10);
new Thread(new Producer(queue), "生产者-1线程:").start();
new Thread(new Consumer(queue), "消费者-1线程:").start();
new Thread(new Consumer(queue), "消费者-2线程:").start();
new Thread(new Consumer(queue), "消费者-3线程:").start();
}
测试结果
队列中最多放10个数据生产满了,等待消费消费完了,等待生产



