栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

使用Flink Rpc模拟TaskManager向JobManager注册

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

使用Flink Rpc模拟TaskManager向JobManager注册

开篇

我们这次文章主要和大家分享一下,如何使用Flink 原生的RPC 也就是经过Flink封装Akka之后的RPC程序来进行完成一个需求。

需求概述

1、两个进程JobManager、TaskManager

2、当TaskManager启动的时候,向JobManager发送注册信息,报告本地的内存、CPU

3、当JobManager收到注册消息的时候,返回给TaskManager注册成功的消息

4、TaskManager每间隔三秒向JobManager发送心跳消息

5、JobManager每间隔3秒扫描一下,有哪些TaskManager下线

我们先来实现注册的需求。

我们需要一个接口来实现注册:

我们可以看到我们编写的接口继承了RpcGateway,因为在Flink中所有的RPC功能都需要继承这个基类,也就是老祖宗。

那么既然接口有了,我们就实现一个Endpoint,因为在Flink RPC中 Endpoint是对RPC框架中提供具体服务的实体的抽象,只要提供RPC接口服务的话,也就是可以让远程调用你的方法,那么就必须要实现这个类。

既然我们的Endpoint也有了,我们就可以启动JobManager了

public class JobManager {
    private static final ConcurrentSkipListSet taskManagerIds = new ConcurrentSkipListSet<>();

    public static void main(String[] args) throws Exception {
        
        ActorSystem jobManagerDefaultSystem = AkkaUtils.createDefaultActorSystem();
        
        AkkaRpcService taskManagerRpcService =
                new AkkaRpcService(jobManagerDefaultSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
        
        RegisterRpcEndpoint jobManagerRegisterRpcEndpoint = new RegisterRpcEndpoint(taskManagerRpcService);
        jobManagerRegisterRpcEndpoint.start();
        
        String registerRpcEndpointAddress = jobManagerRegisterRpcEndpoint.getAddress();
        writeAddress(registerRpcEndpointAddress);
        
    }

    private static void writeAddress(String address) throws Exception {
        File file = new File("./jobmanager-info.txt");
        BufferedWriter writer = new BufferedWriter(new FileWriter(file, false));
        writer.write(address);
        writer.close();
    }
}

既然JobManager已经起来了,我们就看下TaskManager应该怎么做。

public class TaskManager {
    public static void main(String[] args) throws Exception{
        
        ActorSystem taskManagerDefaultSystem = AkkaUtils.createDefaultActorSystem();
        
        AkkaRpcService taskManagerRpcService =
                new AkkaRpcService(taskManagerDefaultSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
        
        String jobMangerAddress = getJobMangerAddress();
        JobManager.RegisterGateWay registerGateWay = taskManagerRpcService.connect(jobMangerAddress, JobManager.RegisterGateWay.class).get();
		
        boolean registerReturn = registerGateWay.register(UUID.randomUUID().toString());
        if (registerReturn){
            System.out.println("注册成功!!!");
        }
    }
    private static String getJobMangerAddress() throws Exception{
        while(true){
            File file = new File("./jobmanager-info.txt");
            BufferedReader bufferedReader = new BufferedReader(new FileReader(file));
            String s = bufferedReader.readLine();
            if (s != null){
                return s;
            }
        }
    }
}

我们分别启动JobManager和TaskManager,我们看一下效果

JobManager:

TaskManager

此时我们可以看到我们的注册功能正常运行,那么还剩下一个心跳功能,我相信很简单,我这里就不实现了,有兴趣的大佬们,可以进行一下code 。

这些知识都是我学习来的一些东西,我也是一个菜鸡,只是想把自己学到的东西记录一下,生成自己的一些知识做一些记录以后找的时候方便,现在免费分享给大家,谢谢大家的观看。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/775645.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号