栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

项目1在线交流平台-5.Kafka构建异步消息系统-1.认识消息队列MQ

项目1在线交流平台-5.Kafka构建异步消息系统-1.认识消息队列MQ

文章目录

1.认识消息队列MQ2.为何使用消息队列3. 为何使用kafka4.阻塞队列示例

生产者线程定义

`queue.put(i)` 消费者线程定义

`queue.take()` 主线程中模拟测试结果
参考牛客网高级项目教程

尚硅谷kafka教学笔记

1.认识消息队列MQ

博客链接

MQ(message queue),本质是个阻塞队列

FIFO 先入先出,存放的内容是message一种跨进程的通信机制,用于上下游传递消息

消息队列有两种模式:

点对点模式发布订阅者模式

消息队列三个核心功能:

解耦异步消峰 2.为何使用消息队列

社区项目中,对帖子的点赞、私信、评论等操作频繁,系统会记录这些操作并向指定用户发送消息1.涉及的模块比较多

为了方便维护,解耦,需要使用到消息队列 2.访问频繁、访问数据库较多

为了提升性能,异步入库,使用消息队列,将数据先写入消息队列中,再异步入库 3.有可能在一些特殊时刻,例如晚上访问量剧增

需要消峰处理,防止服务器崩溃,需要用到消息队列消峰功能 3. 为何使用kafka

Kafka , 主要特点是基于Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集传输, 适合产生大量数据的互联网服务的数据收集业务。

本项目中有日志采集,故首选 kafka 4.阻塞队列示例

生产者线程定义 queue.put(i)

一共生产100个数间隔为20ms记录每次生产后, 队列中的元素个数

class Producer implements Runnable {
    // 每个线程都初始化一个队列来接收传过来的阻塞队列
    private BlockingQueue queue;

    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 100; i++) {
                Thread.sleep(20);
                queue.put(i);
                System.out.println(Thread.currentThread().getName() + "生产了:" + queue.size());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
消费者线程定义 queue.take()

消费间隔随机,但比生产慢记录每次消费后,队列中剩下的元素个数

class Consumer implements Runnable {
    // 每个线程都初始化一个队列来接收传过来的阻塞队列
    private BlockingQueue queue;

    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while(true) {
                Thread.sleep(new Random().nextInt(1000));
                queue.take();
                System.out.println(Thread.currentThread().getName() + "消费:" + queue.size());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
主线程中模拟
public static void main(String[] args) {
    // 实例化一共阻塞队列-使用ArrayBlockingQueue实现
    BlockingQueue queue = new ArrayBlockingQueue(10);
    new Thread(new Producer(queue), "生产者-1线程:").start();
    new Thread(new Consumer(queue), "消费者-1线程:").start();
    new Thread(new Consumer(queue), "消费者-2线程:").start();
    new Thread(new Consumer(queue), "消费者-3线程:").start();
}
测试结果

队列中最多放10个数据生产满了,等待消费消费完了,等待生产

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/784855.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号