使用传统JUC锁实现
public class ProducerAndConsumerTraditionalDemo {
class Data {
private int number = 0; // 资源
private Lock lock = new ReentrantLock(); // 可重入锁
private Condition condition = lock.newCondition();
public void increment() throws Exception{ //生产者
lock.lock();
try {
// 判断,防止虚假唤醒用 while 不是 if
while (number != 0) {
// 等待,不能生产
condition.await();
}
// 干活
number++;
System.out.println(Thread.currentThread().getName() + "线程t" + this.number);
// 通知唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decrement() throws Exception{ //消费者
lock.lock();
try {
// 判断,防止虚假唤醒用 while 不是 if
while (number == 0) {
// 等待,不能消费
condition.await();
}
// 消费
number--;
System.out.println(Thread.currentThread().getName() + "线程t" + this.number);
// 通知唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) {
Data data = new Data();
// 要求写一个初始变量为0,两个线程交替操作,一个加一,一个减一,进行五轮,五轮后的结果得是0
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
data.increment();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
data.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "B").start();
}
}
阻塞队列实现
public class BlockQueueDome {
private volatile boolean FLAG = true; //控制启动
private AtomicInteger atomicInteger = new AtomicInteger();
private BlockingQueue blockingQueue = null;
public BlockQueueDome(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void prod () throws InterruptedException {
System.out.println("prod启动成功");
while (FLAG){
String data = atomicInteger.incrementAndGet()+""; //原子的自增并返回
try{ TimeUnit.MICROSECONDS.sleep( 1 ); } catch(Exception e){}
boolean offer = blockingQueue.offer(data,2L,TimeUnit.SECONDS); //超过两秒队列满则会插入失败
//线程睡眠
if (offer){
System.out.println(Thread.currentThread().getName()+"t "+data +"插入成功" );
}else {
System.out.println(Thread.currentThread().getName()+"t "+data +"插入失败" );
}
}
System.out.println("生产者被叫停");
}
public void consumer() throws InterruptedException {
System.out.println("consumer启动成功");
//线程睡眠
try{ TimeUnit.SECONDS.sleep( 3 ); } catch(Exception e){}
while (FLAG){
String poll = blockingQueue.poll(2L, TimeUnit.SECONDS); //超过2秒队列空会获取失败
if(poll==null||poll.equalsIgnoreCase("")){
System.out.println("超过两秒获取失败");
continue;
}
System.out.println(poll+"消费成功");
}
System.out.println("消费者被叫停");
}
public void stop(){
FLAG = false;
}
public static void main(String[] args) {
BlockQueueDome blockQueueDome = new BlockQueueDome(new ArrayBlockingQueue(10));
new Thread(() -> {
try {
blockQueueDome.prod();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "prod1").start();
new Thread(() -> {
try {
blockQueueDome.consumer();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "consumer1").start();
new Thread(() -> {
try {
blockQueueDome.prod();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "prod2").start();
new Thread(() -> {
try {
blockQueueDome.consumer();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "consumer2").start();
//线程睡眠
try{ TimeUnit.SECONDS.sleep( 5 ); } catch(Exception e){}
blockQueueDome.stop();
}
}