栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

okhttp3 url轮询负载均衡和重试切换失效url拦截器

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

okhttp3 url轮询负载均衡和重试切换失效url拦截器

okhttp3添加拦截器
new OkHttpClient.Builder().addInterceptor(new PollAndRetryInterceptor(urls)).build()
自定义轮询和重试切换Url拦截器

拦截器实现思路:
1、需要一个简单的算法来切换url,直接使用RoundRobinRule的算法
2、请求失败后需要重试,即捕获异常然后递归切换url重试
3、不能一直使用到失效的url,url重新恢复使用后需要加入到切换url队列中,因此我们需要一个定时任务来定时的检测url的可用性

主要代码

1、将url设置为有状态的对象,切换时取可用url,多次切换防止取到无用url
2、url不可用即可使用重试来兜底
3、新建ping任务来维护url的状态

定义url对象

public class Server {

    // url 地址
    private String url;
    // 状态 是否可用
    private Boolean isAlive;

    public String getUrl() {
        return url;
    }

    public Boolean isAlive() {
        return isAlive;
    }

    public void setAlive(Boolean alive) {
        isAlive = alive;
    }

    
    public Server(String url, Boolean isAlive) {
        this.url = url;
        this.isAlive = isAlive;
    }
}

ShutdownEnabledTimer :

public class ShutdownEnabledTimer extends Timer {

    private static final Logger LOGGER = LoggerFactory.getLogger(ShutdownEnabledTimer.class);

    private final Thread cancelThread;
    private final String name;

    public ShutdownEnabledTimer(String name, boolean daemon) {
        super(name, daemon);

        this.name = name;

        this.cancelThread = new Thread(new Runnable() {
            public void run() {
                ShutdownEnabledTimer.super.cancel();
            }
        });

        LOGGER.info("Shutdown hook installed for: {}", this.name);

        Runtime.getRuntime().addShutdownHook(this.cancelThread);
    }

    @Override
    public void cancel() {
        super.cancel();

        LOGGER.info("Shutdown hook removed for: {}", this.name);

        try {
            Runtime.getRuntime().removeShutdownHook(this.cancelThread);
        } catch (IllegalStateException ise) {
            LOGGER.info("Exception caught (might be ok if at shutdown)", ise);
        }
    }
}

拦截器代码:

public class PollAndRetryInterceptor implements Interceptor {

    private static final Logger logger = LoggerFactory.getLogger(PollAndRetryInterceptor.class);

    private static final String DEFAULT_NAME = "default";
    // url
    private final List allServerList = new ArrayList<>();
    // url size
    private final int size;
    // 计数:下次轮询服务下标
    private final AtomicInteger nextUrlCyclicCounter;
    // 定时器执行ping服务
    protected Timer lbTimer = null;
    // 定时器执行时间
    private static final int PING_INTERVAL_SEConDS = 10;
    // 是否正在执行ping
    protected AtomicBoolean pingInProgress = new AtomicBoolean(false);
    // ping 超时时间
    private static final int PING_TIME_OUT = 3000;

    
    public PollAndRetryInterceptor(String[] urls) {
        for (String url : urls) {
            allServerList.add(new Server(url, true));
        }
        this.size = this.allServerList.size();
        this.nextUrlCyclicCounter = new AtomicInteger(0);
        setPingTask();
    }

    
    void setPingTask() {
        if (lbTimer != null) {
            lbTimer.cancel();
        }
        lbTimer = new ShutdownEnabledTimer("PollAndRetryInterceptor-PingTimer-" + DEFAULT_NAME, true);
        lbTimer.schedule(new PingTask(), 0, PING_INTERVAL_SEConDS * 1000);
        forceQuickPing();
    }

    
    public void forceQuickPing() {
        logger.debug("PollAndRetryInterceptor [{}]:  forceQuickPing invoking", DEFAULT_NAME);
        try {
            new Pinger().runPinger();
        } catch (Exception e) {
            logger.error("PollAndRetryInterceptor [{}]: Error running forceQuickPing()", DEFAULT_NAME, e);
        }
    }

    
    @Override
    public Response intercept(Chain chain) throws IOException {
        // 获取原始的originalRequest
        Request originalRequest = chain.request();
        // 不需要负载均衡
        if (size < 2) {
            return chain.proceed(originalRequest);
        }
        // 获取老的url
        HttpUrl oldUrl = originalRequest.url();
        // 获取originalRequest的创建者builder
        Request.Builder builder = originalRequest.newBuilder();
        // 重建新的HttpUrl,需要重新设置的url部分
        HttpUrl newHttpUrl = getHttpUrl(oldUrl);
        // 没有可用地址
        if (newHttpUrl == null) {
            return chain.proceed(originalRequest);
        }
        // 获取新newRequest
        Request newRequest = builder.url(newHttpUrl).build();
        // 请求
        return proceedRequest(chain, newRequest, 0);
    }

    
    public Response proceedRequest(Chain chain, Request request, int retryCount) throws IOException {

        try {
            return chain.proceed(request);
        } catch (IOException e) {
            // 防止切换到无效的地址、防止同一请求轮询到同一无效地址
            if (retryCount++ < 10) {
                HttpUrl oldUrl = request.url();
                // 切换url
                HttpUrl httpUrl = getHttpUrl(oldUrl);
                // 没有可用服务地址
                if (httpUrl == null) {
                    throw new ConnectException("no useful address");
                }
                // 新建request
                Request newRequest = chain.request().newBuilder().url(httpUrl).build();
                return proceedRequest(chain, newRequest, retryCount);
            } else {
                throw new ConnectException("no useful address");
            }
        }
    }

    
    public HttpUrl getHttpUrl(HttpUrl oldUrl) {
        // 获取server
        Server server = chooseServer();
        if (server == null) {
            return null;
        }
        // 获取新的url
        HttpUrl baseURL = HttpUrl.parse(server.getUrl());
        // 重建新的HttpUrl,需要重新设置的url部分
        assert baseURL != null;
        return oldUrl.newBuilder()
                .scheme(baseURL.scheme())// http协议如:http或者https
                .host(baseURL.host())// 主机地址
                .port(baseURL.port())// 端口
                .build();
    }

    
    public Server chooseServer() {
        int count = 0;
        // 选10次,防止一个请求执行过长时间在选server上,快速失败
        while (count++ < 10) {
            int nextServerIndex = incrementAndGetModulo();
            Server server = allServerList.get(nextServerIndex);
            if (server.isAlive()) {
                return server;
            }
        }
        if (count >= 10) {
            logger.error("PollAndRetryInterceptor [{}] chooseServer: no service available", DEFAULT_NAME);
        }
        return null;
    }

    
    public int incrementAndGetModulo() {
        for (; ; ) {
            int current = nextUrlCyclicCounter.get();
            int next = (current + 1) % size;
            if (nextUrlCyclicCounter.compareAndSet(current, next)) {
                return next;
            }
        }
    }

    
    class PingTask extends TimerTask {
        public void run() {
            try {
                new Pinger().runPinger();
            } catch (Exception e) {
                logger.error("LoadBalancer [{}]: Error pinging", DEFAULT_NAME, e);
            }
        }
    }

    
    class Pinger {

        public void runPinger() {
            if (!pingInProgress.compareAndSet(false, true)) {
                // 正在执行ping直接返回
                return;
            }
            // 执行ping任务
            try {
                Server[] allServers = allServerList.toArray(new Server[0]);

                int numCandidates = allServers.length;
                boolean[] results = pingServers(allServers);

                for (int i = 0; i < numCandidates; i++) {
                    boolean isAlive = results[i];
                    Server svr = allServers[i];
                    svr.setAlive(isAlive);
                }

            } finally {
                pingInProgress.set(false);
            }
        }

        
        public boolean[] pingServers(Server[] servers) {
            int numCandidates = servers.length;
            boolean[] results = new boolean[numCandidates];
            for (int i = 0; i < numCandidates; i++) {
                // serially
                Server server = servers[i];
                results[i] = pingURL(server.getUrl(), PING_TIME_OUT);
            }
            return results;
        }

        
        public boolean pingURL(String url, int timeout) {
            try {
                HttpUrl baseURL = HttpUrl.parse(url);
                assert baseURL != null;
                Socket socket = new Socket();
                socket.connect(new InetSocketAddress(baseURL.host(), baseURL.port()), timeout);
                return true;
            } catch (IOException exception) {
                return false;
            }
        }
    }

}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/734167.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号