栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

异步模式-生产者/消费者

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

异步模式-生产者/消费者

异步模式-生产者/消费者

文章目录
  • 异步模式-生产者/消费者
    • 1.概念
    • 2.代码实现生产者-消费者模式代码

1.概念
  • 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应;
  • 消费队列可以用来平衡生产和消费的线程资源;
  • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据;
  • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据;
  • JDK 中各种阻塞队列,采用的就是这种模式;


消息队列有延时,不会被立刻消费,异步的。

2.代码实现生产者-消费者模式代码

消息类POJO

package com.concurrent.p3.message_queue;

import lombok.extern.slf4j.Slf4j;


@Slf4j(topic = "c.Message")
public class Message {
    private static Integer id = 1;
    private String message;

    private static Integer generateId() {
        return id++;
    }

    public Message(String message) {
        generateId();
        this.message = message;
    }

    public Integer getId() {
        return id;
    }

    public String getMessage() {
        return message;
    }

    @Override
    public String toString() {
        return "Message{" +
                "id=" + id + "t" +
                "message='" + message + ''' +
                '}';
    }
}

消息队列

package com.concurrent.p3.message_queue;

import lombok.extern.slf4j.Slf4j;

import java.util.linkedList;
import java.util.Random;

@Slf4j(topic = "c.MessageQueue")
public class MessageQueue {
    //消息队列
    private linkedList list = new linkedList<>();
    //队列容量
    private Integer capacity;

    //构造方法,初始化容量
    public MessageQueue(Integer capacity) {
        this.capacity = capacity;
    }

    //获取消息
    public Message get() {
        synchronized (this) {
            //如果队列为空,消费者就一直等待
            while (list.size() == 0) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //如果队列不为空,则取出队头消息,并唤醒生产者
            Message message = list.removeFirst();
            this.notifyAll();
            return message;
        }
    }

    //产生消息
    public void complete() {
        synchronized (this) {
            //如果队列满,生产者就等待
            while (list.size() == capacity) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //如果队列不满,生产者就一直生产消息
            Message message = new Message("hello" + (new Random().nextInt(100) + 1));
            //将产生的消息放到队列尾
            list.addLast(message);
            //唤醒消费者
            this.notifyAll();
        }
    }

    //查看当前队列的情况
    public void showList() {
        synchronized (this) {
            list.forEach((s) -> {
                log.debug(s.getMessage());
            });
            log.debug("=================");
        }
    }
}

测试代码,1个生成者,1个消费者

package com.concurrent.p3.message_queue;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

@Slf4j(topic = "c.TestMessageQueue")
public class TestMessageQueue {

    @Test
    public void testMessageQueue() {
        MessageQueue mq = new MessageQueue(5);
        //生产者线程
        Thread procedure = new Thread(() -> {
            while (true) {
                mq.complete();
            }
        }, "procedure");
        procedure.start();

        //消费者线程
        Thread customer = new Thread(() -> {
            while (true) {
                //每个1秒,消费1次
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Message message = mq.get();
                log.debug(message.toString());
            }
        }, "customer");
        customer.start();

        //在主线程查看队列的情况
        while (true) {
            //每个2秒查看消息队列的情况
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            mq.showList();
        }
    }
}

测试代码,3个生产者,1个消费者

package com.concurrent.p3.message_queue;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

@Slf4j(topic = "c.TestMessageQueue")
public class TestMessageQueue {

    
    @Test
    public void testMessageQueue1() {
        MessageQueue mq = new MessageQueue(5);
        //生产者线程
        Thread procedure = new Thread(() -> {
            while (true) {
                mq.complete();
            }
        }, "procedure");
        procedure.start();

        //消费者线程
        Thread customer = new Thread(() -> {
            while (true) {
                //每个1秒,消费1次
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Message message = mq.get();
                log.debug(message.toString());
            }
        }, "customer");
        customer.start();

        //在主线程查看队列的情况
        while (true) {
            //每个2秒查看消息队列的情况
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            mq.showList();
        }
    }

    
    @Test
    public void testMessageQueue2() {
        //3个生产者线程
        List procedureThreadList = new ArrayList<>();
        //消息队列
        MessageQueue mq = new MessageQueue(10);
        for (int i = 0; i < 3; i++) {
            Thread p = new Thread(() -> {
                while (true) {
                    mq.complete();
                }
            }, "p" + i + 1);
            procedureThreadList.add(p);
        }
        for (Thread i : procedureThreadList) {
            i.start();
        }

        //1个消费者
        Thread c1 = new Thread(()->{
            while (true) {
                Message message = mq.get();
                //log.debug(message.toString());
            }
        },"c1");
        c1.start();

        //在主线程查看队列的情况
        while (true) {
            //每个2秒查看消息队列的情况
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            mq.showList();
        }
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/336529.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号