1 实现原理最近简单看了看Flink的RPC通讯相关的源码,它是通过Akka实现的,为了更好的阅读理解代码,又大体看了看Akka相关的知识。这篇文章主要记录了如果使用Akka来实现RPC通讯,其中涉及到了akka actor、akka remote等相关知识。
首先大体讲一下实现原理,其实很简单,与我之前写过的RPC框架系列文章里手写了一个原生的JavaRPC例子很类似,只不过通过Akka我们不需要进行底层的网络编程以及去关心数据的序列化与发序列化问题。首先客户端使用动态代理用户需要进行远程调用的服务接口,底层通过Akka actor进行数据交互。远程服务端使用Akka actor来接受数据,并进行服务端的方法调用,然后将数据返回给客户端的Akka actor,如下图所示:
2 代码实现首先引入akka相关的依赖
2.1 通信协议定义以及工具类com.typesafe.akka akka-actor_2.12 2.5.32 com.typesafe.akka akka-remote_2.12 2.5.32
定义通信协议,主要是对请求和响应结构进行定义。
2.1.1 RpcRequest这个类定义了需要调用的方法名,参数列表,以及参数类型列表。使用这些参数我们就可以通过反射的方式动态调用对应的方法了。
package learn.demo.akka.remote;
import java.io.Serializable;
import java.util.Arrays;
public class RpcRequest implements Serializable {
private static final long serialVersionUID = 4932007273709224551L;
private String methodName;
private Object[] parameters;
private Class>[] parameterTypes;
public String getMethodName() {
return methodName;
}
public RpcRequest setMethodName(String methodName) {
this.methodName = methodName;
return this;
}
public Object[] getParameters() {
return parameters;
}
public RpcRequest setParameters(Object[] parameters) {
this.parameters = parameters;
return this;
}
public Class>[] getParameterTypes() {
return parameterTypes;
}
public RpcRequest setParameterTypes(Class>[] parameterTypes) {
this.parameterTypes = parameterTypes;
return this;
}
@Override
public String toString() {
return "RpcRequest{" +
"methodName='" + methodName + ''' +
", parameters=" + Arrays.toString(parameters) +
", parameterTypes=" + Arrays.toString(parameterTypes) +
'}';
}
}
2.1.2 RpcResponse
这个类定义了响应结果,status为响应状态,message为异常信息,data是远程调用方法后的返回值。
package learn.demo.akka.remote;
import java.io.Serializable;
public class RpcResponse implements Serializable {
public final static String SUCCEED = "succeed";
public final static String FAILED = "failed";
private static final long serialVersionUID = 6595683424889346485L;
private String status = RpcResponse.SUCCEED;
private String message;
private Object data;
public String getStatus() {
return status;
}
public RpcResponse setStatus(String status) {
this.status = status;
return this;
}
public String getMessage() {
return message;
}
public RpcResponse setMessage(String message) {
this.message = message;
return this;
}
public Object getData() {
return data;
}
public RpcResponse setData(Object data) {
this.data = data;
return this;
}
@Override
public String toString() {
return "RpcResponse{" +
"status='" + status + ''' +
", message='" + message + ''' +
", data=" + data +
'}';
}
}
2.1.3 FutureUtils
这个工具类是从Flink源码里拿过来的,主要是将Scala里的Future转化为Java中的Future。
package learn.demo.akka.util;
import akka.dispatch.OnComplete;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import java.util.concurrent.CompletableFuture;
public class FutureUtils {
public static CompletableFuture toJava(Future scalaFuture) {
final CompletableFuture result = new CompletableFuture<>();
scalaFuture.onComplete(new OnComplete() {
@Override
public void onComplete(Throwable failure, U success) {
if (failure != null) {
result.completeExceptionally(failure);
} else {
result.complete(success);
}
}
}, DirectExecutionContext.INSTANCE);
return result;
}
private static class DirectExecutionContext implements ExecutionContext {
static final DirectExecutionContext INSTANCE = new DirectExecutionContext();
private DirectExecutionContext() {
}
@Override
public void execute(Runnable runnable) {
runnable.run();
}
@Override
public void reportFailure(Throwable cause) {
throw new IllegalStateException("Error in direct execution context.", cause);
}
@Override
public ExecutionContext prepare() {
return this;
}
}
}
2.1.3 AkkaUtils
这个工具类主要是用来创建能够提供远程服务的AkkaSystem。
package learn.demo.akka.remote;
import akka.actor.ActorSystem;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
public class AkkaUtils {
public static ActorSystem createRemoteActorSystem(String name, int port) {
String systemConfigStr = "akka.actor.provider = "akka.remote.RemoteActorRefProvider"rn" +
"akka.remote.enabled-transports=["akka.remote.netty.tcp"]rn" +
"akka.remote.netty.tcp.hostname = "0.0.0.0"rn" +
"akka.remote.netty.tcp.port = "" + port + """;
Config systemConfig = ConfigFactory.parseString(systemConfigStr);
return ActorSystem.create(name, systemConfig);
}
}
2.2 服务端
2.2.1 AkkaRpcServerActor
实现一个基于AkkaActor的服务端,提供远程服务,接收远程请求,根据请求信息进行反射调用。
package learn.demo.akka.remote; import akka.actor.UntypedAbstractActor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.Method; public class AkkaRpcServerActor2.2.2 AkkaRpcServerProviderextends UntypedAbstractActor { private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcServerActor.class); private final T ref; private final Class> interfaceClass; public AkkaRpcServerActor(T ref, Class> interfaceClass) { this.ref = ref; this.interfaceClass = interfaceClass; } @Override public void onReceive(Object message) { if (message instanceof RpcRequest) { RpcRequest request = (RpcRequest) message; LOG.info("Received request:{}", request); // 处理请求 RpcResponse response = handleRequest(request); // 将结果返回给客户端 LOG.info("Send response to client.{}", response); getSender().tell(response, getSelf()); } } private RpcResponse handleRequest(RpcRequest request) { RpcResponse response = new RpcResponse(); try { LOG.info("The server is handling request."); Method method = interfaceClass.getMethod(request.getMethodName(), request.getParameterTypes()); Object data = method.invoke(ref, request.getParameters()); response.setData(data); } catch (Exception e) { response.setStatus(RpcResponse.FAILED).setMessage(e.getMessage()); } return response; } }
用以创建AkkaRpcServerActor实例,启动Akka服务。
package learn.demo.akka.remote; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; public class AkkaRpcServerProvider2.3 客户端 2.3.1 AkkaRpcClient{ private T ref; private int port; private String name; private Class interfaceClass; public AkkaRpcServerProvider setRef(T ref) { this.ref = ref; return this; } public AkkaRpcServerProvider setPort(int port) { this.port = port; return this; } public AkkaRpcServerProvider setName(String name) { this.name = name; return this; } public AkkaRpcServerProvider setInterfaceClass(Class interfaceClass) { this.interfaceClass = interfaceClass; return this; } public ActorRef get() { ActorSystem system = AkkaUtils.createRemoteActorSystem("rpcSys", port); return system.actorOf(Props.create(AkkaRpcServerActor.class, ref, interfaceClass), name); } }
创建ActorSystem并获取远程ActorRef。
package learn.demo.akka.remote;
import akka.actor.*;
import akka.pattern.Patterns;
import akka.util.Timeout;
import learn.demo.akka.util.FutureUtils;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.reflect.ClassTag$;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
public class AkkaRpcClient {
private ActorRef actorRef;
public void connect(String address) throws ExecutionException, InterruptedException {
ActorSystem localActorSystem = AkkaUtils.createRemoteActorSystem("rpcClientSystem", 10087);
ActorSelection actorSel = localActorSystem.actorSelection(address);
Timeout timeout = new Timeout(Duration.create(2, "seconds"));
final Future identityFuture = Patterns.ask(actorSel, new Identify(42), timeout).mapTo(ClassTag$.MODULE$.apply(ActorIdentity.class));
final CompletableFuture identifyFuture = FutureUtils.toJava(identityFuture);
final CompletableFuture actorRefFuture = identifyFuture.thenApply(
(ActorIdentity ai) -> {
if (ai.getRef() == null) {
throw new CompletionException(new RuntimeException("Could not connect to rpc endpoint under address " + address + '.'));
} else {
return ai.getRef();
}
}
);
this.actorRef = actorRefFuture.get();
}
public Object ask(Object message) throws ExecutionException, InterruptedException {
Timeout timeout = new Timeout(Duration.create(2, "seconds"));
CompletableFuture
2.3.2 AkkaRpcInvocationHandler
用以创建用户RPC服务的动态代理处理器。
package learn.demo.akka.remote;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
public class AkkaRpcInvocationHandler implements InvocationHandler {
private final AkkaRpcClient client;
public AkkaRpcInvocationHandler(AkkaRpcClient client) {
this.client = client;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 构建请求对象
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setMethodName(method.getName())
.setParameterTypes(method.getParameterTypes())
.setParameters(args);
// 使用客户端发送请求
RpcResponse response = (RpcResponse) client.ask(rpcRequest);
// 响应成功返回结果
if (RpcResponse.SUCCEED.equals(response.getStatus())) {
return response.getData();
}
throw new RuntimeException(response.getMessage());
}
}
2.3.3 AkkaRpcClientProvider
创建AkkaRpcClient,并提供用户Rpc服务的动态代理。
package learn.demo.akka.remote; import java.lang.reflect.Proxy; public class AkkaRpcClientProvider2.4 验证{ private String address; private Class interfaceClass; public AkkaRpcClientProvider setInterfaceClass(Class interfaceClass) { this.interfaceClass = interfaceClass; return this; } public AkkaRpcClientProvider setAddress(String address) { this.address = address; return this; } @SuppressWarnings("unchecked") public T get() { AkkaRpcClient client = new AkkaRpcClient(); try { client.connect(this.address); } catch (Exception e) { e.printStackTrace(); } AkkaRpcInvocationHandler handler = new AkkaRpcInvocationHandler(client); return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, handler); } }
通过上面的代码实现了一个简单的RPC框架,现在就对这个RPC框架进行验证。需要如下工作:
- 创建RPC服务接口
- 实现RPC服务
- 编写客户端示例
- 编写服务端示例
- 运行
package learn.demo.akka.remote;
public interface DemoService {
String sayHello(String name);
String sayGoodbye(String name);
}
2.4.2 实现RPC服务
package learn.demo.akka.remote;
public class DemoServiceImpl implements DemoService {
@Override
public String sayHello(String name) {
return "This is akka RPC service.nHello " + name;
}
@Override
public String sayGoodbye(String name) {
return "This is akka RPC service.nGoodbye " + name;
}
}
2.4.3 编写客户端示例
package learn.demo.akka.remote;
public class AkkaRpcClientExamples {
public static void main(String[] args) {
AkkaRpcClientProvider clientProvider = new AkkaRpcClientProvider<>();
clientProvider.setAddress("akka.tcp://rpcSys@0.0.0.0:10086/user/akkaRpcServer");
clientProvider.setInterfaceClass(DemoService.class);
DemoService demoService = clientProvider.get();
String result = demoService.sayHello("akka");
System.out.println(result);
}
}
2.4.4 编写服务端示例
package learn.demo.akka.remote;
import akka.actor.ActorRef;
public class AkkaRpcServerExamples {
public static void main(String[] args) {
DemoServiceImpl demoService = new DemoServiceImpl();
AkkaRpcServerProvider provider = new AkkaRpcServerProvider<>();
provider.setPort(10086);
provider.setName("akkaRpcServer");
provider.setRef(demoService);
provider.setInterfaceClass(DemoService.class);
ActorRef actorRef = provider.get();
System.out.println(actorRef.path());
}
}
2.4.5 运行
启动服务端示例:
启动客户端示例:
3 总结通过实现Akka的RPC框架,更好的理解了Akka Actor的基本API,包括创建ActorSystem,创建ActorRef,查找远程的ActorRef,Actor之间的通讯ask、tell等等。RPC除了网络通讯之外,还有对数据的序列化及反序列化,默认Akka使用了Java的序列化方式,也可以通过配置使用protobuf,当然也可以实现自定义的序列化方式,关于序列化这方面你的内容可以参考:http://doc.yonyoucloud.com/doc/akka-doc-cn/2.3.6/scala/book/chapter5/04_serialization.html。当然一个成熟的RPC框架,不需要用户手动填写远程调用地址,可以考虑使用中间件进行服务的注册发现,之前的文章里也有介绍过使用zookeeper进行服务发现,也可以参考。另外掌握了AkkaRPC的基本原理之后,可以按照这个思路重新阅读一下Flink在RPC这方面的实现,下一篇文章也会对这块代码进行阅读理解分析。



