栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

多线程-消息中心-生产者-消费者

多线程-消息中心-生产者-消费者

源代码:

package com.example.a1;

import java.util.ArrayList;
import java.util.linkedList;
import java.util.List;

public class DemoA {
    public static void main(String[] args) throws InterruptedException {
//        DemoA demoA = new DemoA();
//        demoA.test2();
        Queue queue = new Queue();
        queue.push(new Message(String.valueOf(System.currentTimeMillis()), "good"));
        queue.pop();
        queue.push(new Message(String.valueOf(System.currentTimeMillis()), "good"));
    }

    private void test2() throws InterruptedException {
        Queue queue = new Queue();

        //消费者先启动
        Consumer consumer = new Consumer(queue);
        ConsumerThread consumerThread = new ConsumerThread(consumer);
        consumerThread.start();

        Consumer consumer2 = new Consumer(queue);
        ConsumerThread consumerThread2 = new ConsumerThread(consumer2);
        consumerThread2.start();

        //生产者
        Producer producer = new Producer(queue);
        ProducerThread producerThread = new ProducerThread(producer);
        producerThread.start();
    }

    private void test1() throws InterruptedException {
        Queue queue = new Queue();
        Producer producer = new Producer(queue);

        Message message = new Message(String.valueOf(System.currentTimeMillis()), "good");
        producer.push(message);

        Consumer consumer = new Consumer(queue);
        Message message1 = consumer.pop();

        System.out.println(message1);
    }
}


class Message {
    private String key;
    private String value;

    public Message(String key, String value) {
        this.key = key;
        this.value = value;
    }

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }

    public String toString() {
        return key + ":" + value;
    }
}


class Queue {
    private Object lock;
    List messageList;

    public Queue() {
        this.lock = new Object();
        messageList = new linkedList<>();
    }

    
    public void push(Message message) {

        synchronized (lock) {
            messageList.add(message);
            System.out.println(Thread.currentThread().getId() + ":push:"+message);
            //放消息后,通知消费
            lock.notifyAll();
        }
    }


    
    public Message pop() throws InterruptedException {
        System.out.println(Thread.currentThread().getId() + "pop>>>>>>>>>>>");

        //循环消费
        while (true) {
            synchronized (lock) {
                //无消息等待
                while (messageList.size() == 0) {
                    System.out.println(Thread.currentThread().getId() + ":消费者等待");
                    lock.wait();
                }
                System.out.println(Thread.currentThread().getId() + ":开始消费");
                for (Message message : messageList) {
                    System.out.println(Thread.currentThread().getId() + ":消费:message:" + message);
                }
                messageList.clear();
            }
        }
    }
}


class Producer {
    public Queue queue;

    public Producer(Queue queue) {
        this.queue = queue;
    }

    public void push(Message message) {
        System.out.println("Producer push:" + message);
        queue.push(message);
    }
}


class Consumer {
    public Queue queue;

    public Consumer(Queue queue) {
        this.queue = queue;
    }

    public Message pop() throws InterruptedException {
        Message message = queue.pop();
        System.out.println("Consumer pop:" + message);
        return message;
    }
}


class ProducerThread extends Thread {
    private Producer producer;

    ProducerThread(Producer producer) {
        this.producer = producer;
    }

    @Override
    public void run() {
        System.out.println("ProducerThread");
        for (int i = 0; i < 10; i++) {
            Message message = new Message(String.valueOf(System.currentTimeMillis()), "good");
            producer.push(message);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class ConsumerThread extends Thread {
    private Consumer consumer;

    public ConsumerThread(Consumer consumer) {
        this.consumer = consumer;
    }

    @Override
    public void run() {
        System.out.println("ConsumerThread");
        try {
            consumer.pop();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/706759.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号