生产者消费者实例
1.ReentrantLock实现
public class Demo {
public static void main(String[] args) {
Data data=new Data();
//两个生产者线程
for (int i = 0; i < 2; i++) {
new Thread(()->{
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"生产者"+String.valueOf(i)).start();
}
//两个消费者线程
for (int i = 2; i < 4; i++) {
new Thread(()->{
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"消费者"+String.valueOf(i)).start();
}
}
}
class Data{
//共享资源
private int number=0;
Lock lock=new ReentrantLock();
Condition condition= lock.newCondition();
public void increment() throws InterruptedException {
//上锁
lock.lock();
try {
//循环判断条件,不用if避免虚假唤醒
while (number!=0){
//不满足条件就等待
condition.await();
}
//操作 生产
number++;
System.out.println(Thread.currentThread().getName()+"线程t"+number);
//通知唤醒等待的消费者线程
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
}
//解锁
lock.unlock();
}
public void decrement() throws InterruptedException {
//上锁
lock.lock();
try {
while (number==0){
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName()+"线程t"+number);
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unlock();
}
}
2.阻塞队列实现
public class 阻塞队列版 {
public static void main(String[] args) {
MyResource myResource=new MyResource(new ArrayBlockingQueue(10));
new Thread(()->{
System.out.println("生产线程启动");
try {
myResource.myProd();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Prod").start();
new Thread(()->{
System.out.println("消费线程启动");
try {
myResource.myConsumer();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Consumer").start();
try {
Thread.sleep(5000);
System.out.println();
System.out.println();
System.out.println();
System.out.println();
myResource.stop();
System.out.println("5秒时间到,老板main线程叫停");
} catch (InterruptedException e) {
}
}
}
class MyResource{
//默认开启,开始生产消费
private volatile boolean FLAG=true;
private AtomicInteger atomicInteger=new AtomicInteger();
BlockingQueue blockingQueue=null;
public MyResource(BlockingQueue blockingQueue1){
blockingQueue=blockingQueue1;
System.out.println(blockingQueue1.getClass().getName());
}
//生产方法
public void myProd() throws InterruptedException {
String data=null;
boolean result;
while (FLAG){
data=atomicInteger.incrementAndGet()+"";
result=blockingQueue.offer(data,2L, TimeUnit.SECONDS);
if(result){
System.out.println(Thread.currentThread().getName()+"t 插入队列"+data+"成功");
}else{
System.out.println(Thread.currentThread().getName()+"t 插入队列"+data+"失败");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println("生产结束");
}
//消费方法
public void myConsumer() throws InterruptedException {
String rs=null;
while (FLAG){
rs = blockingQueue.poll(2L, TimeUnit.SECONDS);
if (rs==null||rs.equalsIgnoreCase(""))
{
FLAG=false;
System.out.println(Thread.currentThread().getName()+"超过两秒钟没取到商品,退出");
System.out.println();
System.out.println();
System.out.println();
return; //
}
System.out.println(Thread.currentThread().getName()+"t 消费队列"+rs+"成功");
}
}
public void stop(){
this.FLAG=false;
}
}