需求概述开篇
我们这次文章主要和大家分享一下,如何使用Flink 原生的RPC 也就是经过Flink封装Akka之后的RPC程序来进行完成一个需求。
1、两个进程JobManager、TaskManager
2、当TaskManager启动的时候,向JobManager发送注册信息,报告本地的内存、CPU
3、当JobManager收到注册消息的时候,返回给TaskManager注册成功的消息
4、TaskManager每间隔三秒向JobManager发送心跳消息
5、JobManager每间隔3秒扫描一下,有哪些TaskManager下线
我们先来实现注册的需求。
我们需要一个接口来实现注册:
我们可以看到我们编写的接口继承了RpcGateway,因为在Flink中所有的RPC功能都需要继承这个基类,也就是老祖宗。
那么既然接口有了,我们就实现一个Endpoint,因为在Flink RPC中 Endpoint是对RPC框架中提供具体服务的实体的抽象,只要提供RPC接口服务的话,也就是可以让远程调用你的方法,那么就必须要实现这个类。
既然我们的Endpoint也有了,我们就可以启动JobManager了
public class JobManager {
private static final ConcurrentSkipListSet taskManagerIds = new ConcurrentSkipListSet<>();
public static void main(String[] args) throws Exception {
ActorSystem jobManagerDefaultSystem = AkkaUtils.createDefaultActorSystem();
AkkaRpcService taskManagerRpcService =
new AkkaRpcService(jobManagerDefaultSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
RegisterRpcEndpoint jobManagerRegisterRpcEndpoint = new RegisterRpcEndpoint(taskManagerRpcService);
jobManagerRegisterRpcEndpoint.start();
String registerRpcEndpointAddress = jobManagerRegisterRpcEndpoint.getAddress();
writeAddress(registerRpcEndpointAddress);
}
private static void writeAddress(String address) throws Exception {
File file = new File("./jobmanager-info.txt");
BufferedWriter writer = new BufferedWriter(new FileWriter(file, false));
writer.write(address);
writer.close();
}
}
既然JobManager已经起来了,我们就看下TaskManager应该怎么做。
public class TaskManager {
public static void main(String[] args) throws Exception{
ActorSystem taskManagerDefaultSystem = AkkaUtils.createDefaultActorSystem();
AkkaRpcService taskManagerRpcService =
new AkkaRpcService(taskManagerDefaultSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
String jobMangerAddress = getJobMangerAddress();
JobManager.RegisterGateWay registerGateWay = taskManagerRpcService.connect(jobMangerAddress, JobManager.RegisterGateWay.class).get();
boolean registerReturn = registerGateWay.register(UUID.randomUUID().toString());
if (registerReturn){
System.out.println("注册成功!!!");
}
}
private static String getJobMangerAddress() throws Exception{
while(true){
File file = new File("./jobmanager-info.txt");
BufferedReader bufferedReader = new BufferedReader(new FileReader(file));
String s = bufferedReader.readLine();
if (s != null){
return s;
}
}
}
}
我们分别启动JobManager和TaskManager,我们看一下效果
JobManager:
TaskManager
此时我们可以看到我们的注册功能正常运行,那么还剩下一个心跳功能,我相信很简单,我这里就不实现了,有兴趣的大佬们,可以进行一下code 。
这些知识都是我学习来的一些东西,我也是一个菜鸡,只是想把自己学到的东西记录一下,生成自己的一些知识做一些记录以后找的时候方便,现在免费分享给大家,谢谢大家的观看。



