栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

异步模式之生产者/消费者

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

异步模式之生产者/消费者

要点

  • 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
  • 消费队列可以用来平衡生产和消费的线程资源
  • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
  • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
  • JDK 中各种阻塞队列,采用的就是这种模式

代码实现

import com.example.demo.hmjuc.Sleep;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;

import java.util.LinkedList;


public class Test1 {
    public static void main(String[] args) {

        MessageQueue messageQueue = new MessageQueue(5);


        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 10; j++) {
                    Message message = new Message(j, "hello world" + j);
                    System.out.println("生成了一个消息 = " + message);
                    messageQueue.put(message);
                    Sleep.sleep(1000);
                }
            }).start();
        }


        new Thread(() -> {
            while (true) {
                Message message = messageQueue.take();
                System.out.println("消费了一个消息 = " + message);
                Sleep.sleep(500);
            }

        }).start();

    }

}


class MessageQueue {
    
    private final LinkedList list = new LinkedList<>();
    
    private final int capacity;

    public MessageQueue(int capacity) {
        this.capacity = capacity;
    }

    
    public Message take() {
        synchronized (list) {
            while (list.isEmpty()) {
                try {
                    System.out.println("队列为空,等待中...");
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Message message = list.removeFirst();
            list.notifyAll();
            // 获取第一个消息 并且移除
            return message;
        }
    }


    
    public void put(Message message) {
        synchronized (list) {
            // 如果队列满了 则等待
            while (list.size() == capacity) {
                try {
                    System.out.println("队列已满,等待中...");
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 添加消息
            list.addLast(message);
            list.notifyAll();
        }
    }
}


@ToString
@Getter
@AllArgsConstructor
class Message {
    
    private int id;
    
    private Object value;
}



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/858059.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号