栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Nacos源码分析(一)之线程池的巧妙设计,可以薅到自己的项目里

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Nacos源码分析(一)之线程池的巧妙设计,可以薅到自己的项目里

大家好,给大家先做个自我介绍
我是码上代码,大家可以叫我码哥
我也是一个普通本科毕业的最普通学生,我相信大部分程序员或者想从事程序员行业的都是普通家庭的孩子,所以我也是靠自己的努力,从毕业入职到一家传统企业,到跳槽未尝败绩,现在在一家某互联网行业巨头公司工作,希望可以通过自己的分享对大家有一些帮助
跟随我的专栏学习,可以省去你很多去培训的费用或者网上找资料的时间,节省你的大部分时间成本,让你更加快速成为面试收割机,年度最佳员工

我为大家准备了16个技术专栏带领大家一起学习

《亿级流量分布式系统实战》

《BAT大厂面试必问系列》

《技术杂谈》

《零基础带你学java教程专栏》

《带你学springCloud专栏》

《带你学SpringCloud源码专栏》

《带你学分布式系统专栏》

《带你学云原生专栏》

《带你学springboot源码》

《带你学netty原理与实战专栏》

《带你学Elasticsearch专栏》

《带你学mysql专栏》

《带你学JVM原理专栏》

《带你学Redis原理专栏》

《带你学java进阶专栏》

《带你学大数据专栏》

Nacos 简介

Nacos 在阿里巴巴起源于 2008 年五彩石项目(完成微服务拆分和业务中台建设),成长于十年双十⼀的洪峰考验,沉淀了简单易用、稳定可靠、性能卓越的核心竞争力。 随着云计算兴起,2018年我们深刻感受到开源软件行业的影响,因此决定将 Nacos(阿里内部 Configserver/Diamond/Vipserver 内核) 开源,输出阿里十年的沉淀,推动微服务行业发展,加速企业数字化转型!

nacos实践

五分钟学会 Spring Cloud Alibaba:Nacos 作为注册中心和配置中心使用(小白必看,一看就会教程)

架构图

整体架构分为用户层、业务层、内核层和插件,用户层主要解决用户使用的易用性问题,业务层主要解决服务发现和配置管理的功能问题,内核层解决分布式系统⼀致性、存储、高可用等核心问题,插件解决扩展性问题。

本篇源码分析 nacos中线程池的使用

nacos中大量使用了线程池,比如协议管理,服务订阅等,nacos的代码为了实现高可用和可扩展性,对于一些公共的功能做了统一处理,代码更加简洁

首先定义了一个线程池工厂,里面定义一些公有的线程方法

public final class ExecutorFactory {
    
    public static ExecutorService newSingleExecutorService() {
        return Executors.newFixedThreadPool(1);
    }
    
    public static ExecutorService newSingleExecutorService(final ThreadFactory threadFactory) {
        return Executors.newFixedThreadPool(1, threadFactory);
    }
    
    public static ExecutorService newFixedExecutorService(final int nThreads) {
        return Executors.newFixedThreadPool(nThreads);
    }
    
    public static ExecutorService newFixedExecutorService(final int nThreads, final ThreadFactory threadFactory) {
        return Executors.newFixedThreadPool(nThreads, threadFactory);
    }
    
    public static ScheduledExecutorService newSingleScheduledExecutorService(final ThreadFactory threadFactory) {
        return Executors.newScheduledThreadPool(1, threadFactory);
    }
    
    public static ScheduledExecutorService newScheduledExecutorService(final int nThreads,
            final ThreadFactory threadFactory) {
        return Executors.newScheduledThreadPool(nThreads, threadFactory);
    }
    
    public static ThreadPoolExecutor newCustomerThreadExecutor(final int coreThreads, final int maxThreads,
            final long keepAliveTimeMs, final ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTimeMs, TimeUnit.MILLISECONDS,
                new linkedBlockingQueue(), threadFactory);
    }

其次比较重要的一个类,ThreadPoolManager,这个类主要实现了线程池资源注册,销毁的方法供其他业务使用

public final class ThreadPoolManager {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolManager.class);
    
    private Map>> resourcesManager;
    
    private Map lockers = new ConcurrentHashMap(8);
    
    private static final ThreadPoolManager INSTANCE = new ThreadPoolManager();
    
    private static final AtomicBoolean CLOSED = new AtomicBoolean(false);

    
    static {
        //初始化存放线程池资源的Map,这里使用懒加载的思想
        INSTANCE.init();
        //这个是线程池工具类提供的一个钩子函数,用户结束线程池,addShutdownHook方法的作用是
        //当JVM关闭时,需要完成线程池中的任务,再关闭线程池,防止任务丢失
        ThreadUtils.addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                LOGGER.warn("[ThreadPoolManager] Start destroying ThreadPool");
                shutdown();
                LOGGER.warn("[ThreadPoolManager] Destruction of the end");
            }
        }));
    }
    
    //获取当前类
    public static ThreadPoolManager getInstance() {
        return INSTANCE;
    }
    
    //实现私有构造器,不能继承
    private ThreadPoolManager() {
    }
    
    private void init() {
        //用的时候在进行初始化
        resourcesManager = new ConcurrentHashMap>>(8);
    }
    
    
    public void register(String namespace, String group, ExecutorService executor) {
        if (!resourcesManager.containsKey(namespace)) {
            //这里使用了synchronized,防止出现线程安全问题
            synchronized (this) {
                lockers.put(namespace, new Object());
            }
        }
        final Object monitor = lockers.get(namespace);
        synchronized (monitor) {
            Map> map = resourcesManager.get(namespace);
            if (map == null) {
                map = new HashMap>(8);
                map.put(group, new HashSet());
                map.get(group).add(executor);
                resourcesManager.put(namespace, map);
                return;
            }
            if (!map.containsKey(group)) {
                map.put(group, new HashSet());
            }
            map.get(group).add(executor);
        }
    }
    
    
    public void deregister(String namespace, String group) {
        if (resourcesManager.containsKey(namespace)) {
            final Object monitor = lockers.get(namespace);
            synchronized (monitor) {
                resourcesManager.get(namespace).remove(group);
            }
        }
    }
    
    
    public void deregister(String namespace, String group, ExecutorService executor) {
        if (resourcesManager.containsKey(namespace)) {
            final Object monitor = lockers.get(namespace);
            synchronized (monitor) {
                final Map> subResourceMap = resourcesManager.get(namespace);
                if (subResourceMap.containsKey(group)) {
                    subResourceMap.get(group).remove(executor);
                }
            }
        }
    }
    
    
    public void destroy(final String namespace) {
        final Object monitor = lockers.get(namespace);
        if (monitor == null) {
            return;
        }
        synchronized (monitor) {
            Map> subResource = resourcesManager.get(namespace);
            if (subResource == null) {
                return;
            }
            for (Map.Entry> entry : subResource.entrySet()) {
                for (ExecutorService executor : entry.getValue()) {
                    ThreadUtils.shutdownThreadPool(executor);
                }
            }
            resourcesManager.get(namespace).clear();
            resourcesManager.remove(namespace);
        }
    }
    
    
    public void destroy(final String namespace, final String group) {
        final Object monitor = lockers.get(namespace);
        if (monitor == null) {
            return;
        }
        synchronized (monitor) {
            Map> subResource = resourcesManager.get(namespace);
            if (subResource == null) {
                return;
            }
            Set waitDestroy = subResource.get(group);
            for (ExecutorService executor : waitDestroy) {
                ThreadUtils.shutdownThreadPool(executor);
            }
            resourcesManager.get(namespace).remove(group);
        }
    }
    
    
    public static void shutdown() {
    //使用乐观锁判断是否可以终止
        if (!CLOSED.compareAndSet(false, true)) {
            return;
        }
        Set namespaces = INSTANCE.resourcesManager.keySet();
        for (String namespace : namespaces) {
            INSTANCE.destroy(namespace);
        }
    }
    
}

下面是我们在源码里可以拿来即用的工具类ThreadUtils

package com.alibaba.nacos.common.utils;

import org.slf4j.Logger;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;


public final class ThreadUtils {
    
    
    public static void objectWait(Object object) {
        try {
            object.wait();
        } catch (InterruptedException ignore) {
            Thread.interrupted();
        }
    }
    
    
    public static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    public static void countDown(CountDownLatch latch) {
        Objects.requireNonNull(latch, "latch");
        latch.countDown();
    }
    
    
    public static void latchAwait(CountDownLatch latch) {
        try {
            latch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    
    public static void latchAwait(CountDownLatch latch, long time, TimeUnit unit) {
        try {
            latch.await(time, unit);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    
    public static int getSuitableThreadCount() {
        return getSuitableThreadCount(THREAD_MULTIPLER);
    }
    
    
    public static int getSuitableThreadCount(int threadMultiple) {
        final int coreCount = Runtime.getRuntime().availableProcessors();
        int workerCount = 1;
        while (workerCount < coreCount * threadMultiple) {
            workerCount <<= 1;
        }
        return workerCount;
    }
    
    public static void shutdownThreadPool(ExecutorService executor) {
        shutdownThreadPool(executor, null);
    }
    
    
    public static void shutdownThreadPool(ExecutorService executor, Logger logger) {
        executor.shutdown();
        //重试策略
        int retry = 3;
        while (retry > 0) {
            retry--;
            try {
                if (executor.awaitTermination(1, TimeUnit.SECONDS)) {
                    return;
                }
            } catch (InterruptedException e) {
                executor.shutdownNow();
                Thread.interrupted();
            } catch (Throwable ex) {
                if (logger != null) {
                    logger.error("ThreadPoolManager shutdown executor has error : {}", ex);
                }
            }
        }
        executor.shutdownNow();
    }
    
    public static void addShutdownHook(Runnable runnable) {
        Runtime.getRuntime().addShutdownHook(new Thread(runnable));
    }
    
    private static final int THREAD_MULTIPLER = 2;
    
}

大家一定记得点赞,收藏,关注

防止下次找不到了

你们的支持是我持续创作的动力!!!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/768477.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号