栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【Java 多进程】 006 生产者和消费者

【Java 多进程】 006 生产者和消费者

问题描述

生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,生产者向空间里存放数据,而消费者取用数据,如果不加以协调可能会出现以下情况:

存储空间已满,而生产者占用着它,消费者等着生产者让出空间从而去除产品,生产者等着消费者消费产品,从而向空间中添加产品。互相等待,从而发生死锁。

解决方法

采用某种机制保护生产者和消费者之间的同步。有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式。

在生产者和消费者之间建立一个管道。管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强。

核心:保证同一资源被多个线程并发访问时的完整性。常用的同步方法是采用信号或加锁机制,保证资源在任意时刻至多被一个线程访问。

Java能实现的几种方法

wait() / notify()方法

当缓冲区已满时,生产者线程停止执行,放弃锁,使自己处于等状态,让其他线程执行;

当缓冲区已空时,消费者线程停止执行,放弃锁,使自己处于等状态,让其他线程执行。

当生产者向缓冲区放入一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态;

当消费者从缓冲区取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。

public class ProducerConsumer {
    public static void main(String[] args) {
        PublicBox box = new PublicBox();
        Consumer con = new Consumer(box);
        Producer pro = new Producer(box);
        Thread t1 = new Thread(pro,"A");
        Thread t2 = new Thread(pro,"B");
        Thread t3 = new Thread(con);
        t1.start();
        t2.start();
        t3.start();
    }
}

class Producer implements Runnable {

    private PublicBox box;

    public Producer(PublicBox box) {
        this.box = box;
    }

    @Override
    public void run() {
        while (true) {
            try {
                System.out.println("Put Number:" + Thread.currentThread().getName());
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            box.put();
        }
    }
}

class Consumer implements Runnable {

    private PublicBox box;

    public Consumer(PublicBox box) {
        this.box = box;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                // TODO: handle exception
                e.printStackTrace();
            }

            box.get();
        }
    }
}

class PublicBox {

    private int product = 0;

    public synchronized void put() {
        while (product == 5) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
        product++;
        System.out.println("Put SUCCESS!Product Nums:" + product);
        notify();
    }

    public synchronized void get() {
        while (product == 0) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        product--;
        System.out.println("Get SUCCESS!Product Nums:" + product);
        notify();
    }
}

notifyAll()方法可使所有正在等待队列中等待同一共享资源的“全部”线程从等待状态退出,进入可运行状态。此时,优先级最高的哪个线程最先执行,但也有可能是随机执行的,这要取决于JVM虚拟机的实现。即最终也只有一个线程能被运行。

await() / signal()方法

在JDK5.0以后,JAVA提供了新的更加健壮的线程处理机制,包括了同步、锁定、线程池等等,可以实现更小粒度上的控制。await()和signal()就是其中用来同步的两种方法,功能基本上和wait()/notify()相同,完全可以取代它们,但是它们和新引入的锁定机制Lock直接挂钩,具有更大的灵活性。

public class ProducerConsumer {
    public static void main(String[] args) {
        PublicBox box = new PublicBox();
        Consumer con = new Consumer(box);
        Producer pro = new Producer(box);
        Thread t1 = new Thread(pro,"A");
        Thread t2 = new Thread(pro,"B");
        Thread t3 = new Thread(con);
        t1.start();
        t3.start();
    }
}

class Producer implements Runnable {

    private PublicBox box;

    public Producer(PublicBox box) {
        this.box = box;
    }

    @Override
    public void run() {
        while (true) {
            try {
                System.out.println("Put Number:" + Thread.currentThread().getName());
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            box.put();
        }
    }
}

class Consumer implements Runnable {

    private PublicBox box;

    public Consumer(PublicBox box) {
        this.box = box;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                // TODO: handle exception
                e.printStackTrace();
            }

            box.get();
        }
    }
}

class PublicBox {

    private int product = 0;
      // 锁
    private final Lock lock = new ReentrantLock();
    // 仓库满的条件变量
    private final Condition full = lock.newCondition();
    // 仓库空的条件变量
    private final Condition empty = lock.newCondition();

    public void put() {
        lock.lock();
        try {
            while (product == 5) {
                try {
                    full.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
            }
            product++;
            System.out.println("Put SUCCESS!Product Nums:" + product);
            empty.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public void get() {
        lock.lock();
        try {
            while (product == 0) {
                try {
                    empty.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            product--;
            System.out.println("Get SUCCESS!Product Nums:" + product);
            full.signalAll();
        } finally {
            lock.unlock();
        }
    }
}

BlockingQueue阻塞队列方法

BlockingQueue是JDK5.0的新增内容,它是一个已经在内部实现了同步的队列,实现方式采用的是await() / signal()方法。它可以在生成对象时指定容量大小,用于阻塞操作的是put()和take()方法。

put()方法: 类似于我们上面的生产者线程,容量达到最大时,自动阻塞。

take()方法: 类似于我们上面的消费者线程,容量为0时,自动阻塞。

public class ProducerConsumer {
    public static void main(String[] args) {
        PublicBox box = new PublicBox();
        Consumer con = new Consumer(box);
        Producer pro = new Producer(box);
        Thread t1 = new Thread(pro,"A");
        Thread t2 = new Thread(pro,"B");
        Thread t3 = new Thread(con);
        t1.start();
        t2.start();
        t3.start();
    }
}

class Producer implements Runnable {

    private PublicBox box;

    public Producer(PublicBox box) {
        this.box = box;
    }

    @Override
    public void run() {
        while (true) {
            try {
                System.out.println("Put Number:" + Thread.currentThread().getName());
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            box.put();
        }
    }
}

class Consumer implements Runnable {

    private PublicBox box;

    public Consumer(PublicBox box) {
        this.box = box;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                // TODO: handle exception
                e.printStackTrace();
            }

            box.get();
        }
    }
}

class PublicBox {

    private linkedBlockingQueue list = new linkedBlockingQueue<>(10);

    public void put() {
        try {
            list.put(new Object());
            System.out.println("Put SUCCESS!Product Nums:" + list.size());
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }

    public void get() {
        try {
            list.take();
            System.out.println("Get SUCCESS!Product Nums:" + list.size());
        } catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}
 

信号量

Semaphore是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。

Semaphore可以用来构建一些对象池,资源池之类的,比如数据库连接池,我们也可以创建计数为1的Semaphore,将其作为一种类似互斥锁的机制,这也叫二元信号量,表示两种互斥状态。计数为0的Semaphore是可以release的,然后就可以acquire(即一开始使线程阻塞从而完成其他执行)。

public class TestSemaphore {

    static WareHouse wareHouse = new WareHouse();

    //生产者
    static class Producer implements Runnable {

        static int num = 1;

        @Override
        public void run() {
            while (true) {
                try {
                    wareHouse.insert(num);
                    System.out.println("生产物品" + num);
                    num++;
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }
        }

    }

    //消费者
    static class Consumer implements Runnable {

        @Override
        public void run() {
            // TODO Auto-generated method stub
            while (true) {
                try {
                    System.out.println("消费物品" + wareHouse.remove());
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }
        }

    }

    //仓库,可以放置和拿走物品
    static class WareHouse {

        private final int capacity = 10;
        private final Semaphore full = new Semaphore(0); //仓库中被占用的槽的信号量
        private final Semaphore empty = new Semaphore(capacity); //仓库中空的槽的信号量
        private final Semaphore mutex = new Semaphore(1); //互斥信号量
        private int insertIndex = 0; //仓库中当前可以放置物品的位置
        private int removeIndex = 0; //仓库中当前可以拿走物品的位置
        private final Object[] items = new Object[capacity]; //仓库中的所有物品
        int count = 0; //仓库中的现有物品数

        //向仓库中放置物品
        public void insert(Object item) throws InterruptedException {
            empty.acquire();
            mutex.acquire();
            items[insertIndex++] = item;
            if (insertIndex == capacity) {
                insertIndex = 0;
            }
            count++;
            mutex.release();
            full.release();
        }

        //从仓库中拿走物品
        public Object remove() throws InterruptedException {
            full.acquire();
            mutex.acquire();
            Object item = items[removeIndex++];
            if (removeIndex == capacity) {
                removeIndex = 0;
            }
            count--;
            mutex.release();
            empty.release();
            return item;
        }
    }

    public static void main(String[] args) {
        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();
    }
}

管道

一种特殊的流,用于不同线程间直接传送数据,一个线程发送数据到输出管道,另一个线程从输入管道中读数据。

inputStream.connect(outputStream)或outputStream.connect(inputStream)作用是使两个Stream之间产生通信链接,这样才可以将数据进行输出与输入。

这种方式只适用于两个线程之间通信,不适合多个线程之间通信。

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号