栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

多线程设计模式之Master-worker

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

多线程设计模式之Master-worker

多线程设计模式之Master-worker

怎么写-断点调试-hashmap-concurrentlinkedqueue-Thread类-waitnotify
1、先理解:教程上的图是这样的

从教程上一堆话中提取关键字:并行、两个进程、子任务。
Master:作为一个进程,接收和分配任务给worker子进程,归纳总结子进程返回的子结果;
Worker:接收任务并处理返回给Master进程。
获得的效果就是,分解任务并行执行,提高了系统的吞吐量。
2、接下来就是如何实现一个案例,原代码中的实现逻辑图是这样的:

这里其实就有一些注意点了:
(1)为啥代码中使用了ConcurrentlinkedQueue这个线程安全队列和HashMap这个非线程安全类;
(2)如何实现这个多线程模式(或者是如何写出代码);
(3)实现Thread或者是Runnable接口需要注意的点;
然后本着想要多维度去了解这个多线程模式的想法,小挖了一下基础:
(1)ConcurrentlinkedQueue:直接来说应用场景吧,它是并发类队列的一种,没有锁但是线程安全,可以处理高并发场景,性能比BlockingQueue好点(为啥好,知识盲区了,下次莽它),队列嘛,先进先出,进的方法是add()和offer()(在这里没区别),出来是poll()和peek()()(前者取完就删除,后者没有从队列中删除,peek的意思是偷窥,没有占有的意思),用在这个模式里就是需要保证任务在多线程取的时候不会出现线程安全问题(多线程的读写都要考虑这个安全问题)。
(2)HashMap:之前学的半吊子的集合就是因为没有接触到多线程,没有应用到实际中去自然印象不深刻,现在终于可以从一个线程不安全问题去挖原理了,在这里特别感谢一个叫安琪拉的博主一篇 名叫《一个HashMap跟面试官扯了半个小时》的博客,一篇读下来有种意犹未尽的效果(跑去看了一下源码,有种欲仙欲死的感觉,又回来看别人的总结了)。
3、代码实现:总的来讲也是有步骤的,我个人觉得必须先定义好接口、类、方法(可以先写文字步骤(本身就是一个理清逻辑的步骤)),尤其是master这个具有一个承前启后功能的类,可以通过这个类的成员变量和方法去延申完善其它方法的定义(虽然方法的定义我是根据于原代码的步骤来的):
肯定先把任务类搞好:

public class Task {

    private Integer id;
    private String name;
    private Integer price;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getPrice() {
        return price;
    }

    public void setPrice(Integer price) {
        this.price = price;
    }

}

master类:

public class Master {
    //1、承装任务workerqueue,考虑用队列
    private ConcurrentlinkedQueue workQueue=new ConcurrentlinkedQueue<>();
    //2、承装worker对象:workers,考虑用线程安全的hashmap,concurrenthashmap(细粒度的)
    private ConcurrentHashMap workers=new ConcurrentHashMap<>();
    //3、承装结果集,我继续用线程安全的concurrentHashmap
    private ConcurrentHashMap result=new ConcurrentHashMap<>();
    //4、构造函数传参,线程放在workers(workers.put,了解这个方法),这里需要通过构造来确定多少个worker,并且需要worker去设置引用
    //注意不是频繁创建worker,因为这里的worker是继承Runnable接口,业务和控制是分离的,因此一个就好,主要是创建多个线程
    Master(int workerCount){
        Worker worker=new Worker(workQueue,result);
        for(int i=0;i startAll:workers.entrySet()){
            startAll.getValue().start();
        }
    }
    //7、判断所有线程是否运行结束
    public boolean isStop(){
        for (Map.Entry me : workers.entrySet()) {
            if (me.getValue().getState() != Thread.State.TERMINATED)//原来有个表示线程状态的
                return false;
        }
        return true;
    }
    //8、计算结果:将所有的线程的结果汇总
    public int getResult(){
        int allPrice=0;
        for(Map.Entry a:result.entrySet()){
            allPrice+=a.getValue();
        }
        return allPrice;
    }
}

worker类:这里我用的是Runnable接口,因此在后面创建的时候只需要一个worker对象即可,线程的控制和业务逻辑的实现是分开的,也就是说多线程来实现这么一个Runnable接口实现类对象方法的逻辑:

public class Worker implements Runnable {
    //1、结果集和队列的引用
    private ConcurrentlinkedQueue workQueue;
    private ConcurrentHashMap result;
    //2、构造传入
    Worker(ConcurrentlinkedQueue workQueue,ConcurrentHashMap result){
        this.result=result;
        this.workQueue=workQueue;
    }
    //3、run方法:领取任务、处理任务(方法)
    @Override
    public void run() {
           if(!workQueue.isEmpty()){
               Task task=workQueue.poll();//注意取出任务就删除这个任务,用pull,那么万一是null,下面就会报错
               doResult(task);
           }
    }
    //4、处理任务:模拟查询数据库价格,返回结果
    public void doResult(Task task){
        try {
            TimeUnit.MILLISECONDS.sleep(500);//模拟处理数据的时间
            result.put(Thread.currentThread().getName(),task.getPrice());//这里线程安全
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

最后是一个测试类:测试的逻辑是原代码上的步骤复现:

public class Main {
    public static void main(String[] args) {
        //初始化
        Master master=new Master(40);
        //创建任务
        for (int i=0;i<30;i++){
            Task task=new Task();
            task.setId(i);
            task.setPrice(i);
            task.setName("任务"+i);
            //提交任务
            master.submit(task);
        }
        //执行任务,返回结果
        master.execution();
        //注意,一定要等线程执行完再返回结果,要不让返回的直接是0
        while (true){
            if(master.isStop()){
                System.out.println(master.getResult());
                break;
            }
        }
    }
}

运行结果:

对原代码我做了一些改动:
(1)HashMap替换成了ConcurrentHashMap(并发类容器),不过后来想了想应该没什么用,HashMap是主线程里面的,不存在多线程情况下的安全问题;
(2)worker的创建放到了master的构造方法中,因为感觉让用户去显示创建一个worker没有层次感。
还有一些理解:
(1)对于master的一个方法isStop:判断线程是否全部停止,刚开始我是想不到有什么作用的,后来发现测试类中取结果的时候,如果没有等待线程停止就去取结果,那么就不是一个最后的结果,可以说这是一个必须考虑的问题;Thread.State.TERMINATED这个枚举常量着实解决了这个问题。
(2)对于多线程,这么一想,之前写的一个航空检查案例,是不是可以用这种模式改写一下?再仔细想想…

感概这个模式就是,知识储备良多,多线程式处理,为了啥,更好的吞吐量啊。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/270987.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号