偶然间看到一个RPC的框架项目,感觉与其他的项目与众不同,所以准备跟着实现一下。由于教程比较少,因此写下这边记录,慢慢踩坑。既然是重复造轮子,自然是以学习框架与组件的运用为主。
1 涉及的知识点- RPC的概念及运作流程。
- RPC协议及RPC框架的概念。
- Netty的基本使用,修复高并发情况下,netty导致的内存泄漏问题。
- Java序列化以及反序列化,protobuf和kryo序列化协议,配置即用。
- Zookeeper作为注册中心的基本使用。
- 自定义注释实现特殊业务逻辑
- Java动态代理
- 自定义Spring Boot Starter
- 多种负载均衡算法(随机、轮询、加权轮询、平滑加权轮询),配置即用。
- 客户端增加本地服务列表缓存,提高性能。
- 由原来的每个请求建立一次连接,改为建立TCP长连接,并多次复用。
- 服务端增加线程池提高消息处理能力
Remote Procedure Call(RPC):远程过程调用。
RPC采用Client-Server结构,通过Request-Response消息模式实现。
- 客户端处理过程中调用Client stub(就像调用本地方法一样),传递参数;
- Client stub将参数编组为消息,然后通过系统调用向服务端发送消息;
- 客户端本地操作系统将消息从客户端机器发送到服务端机器;
- 服务端操作系统将接收到的数据包传递给Server stub;
- Server stub解组消息为参数;
- Server stub再调用服务端的过程,过程执行结果以反方向的相同步骤响应给客户端。
- Client stub、Server stub的开发;
- 参数如何编组为消息,以及解组消息;
- 消息如何发送;
- 过程结果如何表示、异常情况如何处理;
- 如何实现安全的访问控制。
RPC调用过程中采用的消息协议称为RPC协议
2.5 RPC框架是什么?RPC协议规定请求、响应消息的格式在TCP(网络传输控制协议)上可选用或自定义消息协议来完成RPC消息交互。我们可以选用通用的标准协议(如:http、https),也可以根据自身的需要定义自己的消息协议。
封装好参数编组、消息解组、底层网络通信的RPC程序开发框架,带来的便捷是可以直接在其基础上只需要专注于过程代码编写。
Java领域:
- 传统的webservice框架:Apache CXF、Apache Axis2、Java自带的JAX-WS等。webservice框架大多基于标准的SOAP协议。
- 新兴的微服务框架:Dubbo、spring cloud、Apache Thrift等。
我们将会写一个简易的RPC框架,暂且叫它zarlic-rpc-spring-boot-starter,通过在项目中引入该starter,并简单的配置一下,项目即拥有提供远程服务的能力。
编写自定义注解@Service,被它注解的类将会提供远程服务。
编写自定义注解@InjectService,使用它可注入远程服务。
3.2 项目整体结构- 注册中心 :注册中心负责服务地址的注册与查找,相当于目录服务。
- 网络传输 :既然我们要调用远程的方法,就要发送网络请求来传递目标类和方法的信息以及方法的参数等数据到服务提供端。
- 序列化和反序列化 :要在网络传输数据就要涉及到序列化。
- 动态代理 :屏蔽程方法调用的底层细节。
- 负载均衡 : 避免单个服务器响应同一请求,容易造成服务器宕机、崩溃等问题。
- 传输协议 :这个协议是客户端(服务消费方)和服务端(服务提供方)交流的基础。
客户端想要调用远程服务,必须具备服务发现的能力;在知道有哪些服务过后,还必须有服务代理来执行服务调用;客户端想要与服务端通信,必须要有相同的消息协议;客户端想要调用远程服务,那么必须具备网络请求的能力,即网络层功能。
当然,这是客户端所需的最基本的能力,其实还可以扩展的能力,例如负载均衡。
3.3.2 具体实现基于面向接口编程的理念,不同角色都实现了定义了相应规范的接口。这里面我们没有发现消息协议相关内容,那是因为服务端也需要消息协议,因此抽离了出来,放在公共层。
客户端的代码结构:
public interface ServiceDiscoverer {
List getService(String name);
}
public class ZookeeperServiceDiscoverer implements ServiceDiscoverer{
private ZkClient zkClient;
public ZookeeperServiceDiscoverer(String zkAddress) {
zkClient = new ZkClient(zkAddress); //配置zk中心地址
zkClient.setZkSerializer(new ZookeeperSerializer()); //自定义序列化,在common中定义ZookeeperSerializer
}
@Override
public List getService(String name) {
String servicePath = ZarlicConstant.ZK_SERVICE_PATH + ZarlicConstant.PATH_DELIMITER + name + "/service";
List children = zkClient.getChildren(servicePath);
//ofNullable 如果children不为空就将其赋值ArrayList,为空创建一个空对象集合赋值给newList,也就避免了空指针异常。
//使用decode解码出服务名字,再根据服务名字,转换成对应的服务
return Optional.ofNullable(children).orElse(new ArrayList<>()).stream().map(str -> {
String deCh = null;
try {
deCh = URLDecoder.decode(str,ZarlicConstant.UTF_8);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return JSON.parseObject(deCh,Service.class);
}).collect(Collectors.toList());
}
}
服务发现者使用Zookeeper来实现,通过ZkClient我们很容易发现已经注册在ZK上的服务。当然我们也可以使用其他组件作为注册中心,例如Redis。
3.3.2.2 网络客户端
public interface NetClient {
byte[] sendRequest(byte[] data, Service service) throws InterruptedException;
}
public class NettyNetClient implements NetClient{
private static Logger logger = LoggerFactory.getLogger(NettyNetClient.class);
@Override
public byte[] sendRequest(byte[] data, Service service) throws InterruptedException {
String[] addInfoArray = service.getAddress().split(":");
String serverAddress = addInfoArray[0];
String serverPort = addInfoArray[1];
SendHandler sendHandler = new SendHandler(data);
byte[] respData;
// 配置客户端
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
//group设置线程池,channel设置nio类型的channel,option设置通道选项,handler装配流水线
bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer() {
//有连接到达时会创建一个channel
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
// pipeline管理channel中的Handler
// 在channel队列中添加一个handler来处理业务
channelPipeline.addLast(sendHandler);
}
});
// 启动客户端连接
// 开始绑定server
// 通过调用sync同步方法阻塞直到绑定成功
bootstrap.connect(serverAddress,Integer.parseInt(serverPort)).sync();
respData = (byte[]) sendHandler.rspData();
logger.info("SendRequest get reply:{}",respData);
} finally {
// 优雅关闭EventLoopGroup,
// 释放掉所有资源包括创建的线程
group.shutdownGracefully();
}
return respData;
}
}
public class SendHandler extends ChannelInboundHandlerAdapter {
private static Logger logger = LoggerFactory.getLogger(SendHandler.class);
private CountDownLatch countDownLatch;
private Object readMsg = null;
private byte[] data;
public SendHandler(byte[] data) {
countDownLatch = new CountDownLatch(1);
this.data = data;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("Successful connection to server:{}",ctx);
ByteBuf reqBuf = Unpooled.buffer(data.length);
reqBuf.writeBytes(data);
logger.info("Client send message:{}",reqBuf);
ctx.writeAndFlush(reqBuf);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.info("Client reads message: {}", msg);
ByteBuf msgBuf = (ByteBuf) msg;
byte[] resp = new byte[msgBuf.readableBytes()];
msgBuf.readBytes(resp);
readMsg = resp;
countDownLatch.countDown();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// Close the connection when an exception is raised.
cause.printStackTrace();
logger.error("Exception occurred:{}", cause.getMessage());
ctx.close();
}
public Object rspData() throws InterruptedException {
countDownLatch.await();
return readMsg;
}
}
在这里我们使用Netty来实现网络请求客户端,当然也可以使用Mina。网络请求客户端能连接远程服务端,并将编组好的请求数据发送给服务端,待服务端处理好后,又将服务端的响应数据返回给客户端。
3.3.2.3 服务代理computeIfAbsent
// 方法定义
default V computeIfAbsent(K key, Function super K, ? extends V> mappingFunction) {
...
}
// java8之前。从map中根据key获取value操作可能会有下面的操作
Object key = map.get("key");
if (key == null) {
key = new Object();
map.put("key", key);
}
// java8之后。上面的操作可以简化为一行,若key对应的value为空,会将第二个参数的返回值存入并返回
Object key2 = map.computeIfAbsent("key", k -> new Object());
动态代理(dynamic proxy)
利用Java的反射技术(Java Reflection),在运行时创建一个实现某些给定接口的新类(也称“动态代理类”)及其实例(对象),代理的是接口(Interfaces),不是类(Class),也不是抽象类。在运行时才知道具体的实现,spring aop就是此原理。
public static Object newProxyInstance(ClassLoader loader,
Class>[] interfaces,
InvocationHandler h)
throws IllegalArgumentException
newProxyInstance,方法有三个参数:
loader: 用哪个类加载器去加载代理对象
interfaces:动态代理类需要实现的接口
h:动态代理方法在执行时,会调用h里面的invoke方法去执行
public class ClientProxyFactory {
private ServiceDiscoverer serviceDiscoverer;
private Map supportMessageProtocols;
private NetClient netClient;
@SuppressWarnings("unchecked") //该批注的作用是给编译器一条指令,告诉它对被批注的代码元素内部的某些警告保持静默。
private Map, Object> objectCache = new HashMap<>();
//获取代理类,若没有则使用java动态代理Proxy.newProxyInstance
public T getProxy(Class clazz){
return (T) this.objectCache.computeIfAbsent(clazz,
cls -> newProxyInstance(cls.getClassLoader(),new Class>[]{cls},new ClientInvocationHandler(cls)));
}
private class ClientInvocationHandler implements InvocationHandler{
private Class> clazz;
private Random random = new Random();
public ClientInvocationHandler(Class> clazz){
super();
this.clazz = clazz;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getName().equals("toString")) {
return proxy.getClass().toString();
}
if (method.getName().equals("hashCode")) {
return 0;
}
//1、获得服务信息
String serviceName = this.clazz.getName();
List services = serviceDiscoverer.getService(serviceName);
if(services == null || services.isEmpty()){
throw new ZarlicException("No provider available!");
}
//随机选择一个服务提供者
Service service = services.get(random.nextInt(services.size()));
//2、构造request对象
ZarlicRequest req = new ZarlicRequest();
req.setServiceName(service.getName());
req.setMethod(method.getName());
req.setParameterTypes(method.getParameterTypes());
req.setParameters(args);
//3、协议层编组
//获得该方法对应的协议
MessageProtocol protocol = supportMessageProtocols.get(service.getProtocol());
//编组请求
byte[] data = protocol.marshallingRequest(req);
//4、调用网络层发送请求
byte[] repData = netClient.sendRequest(data, service);
//5、解组相应消息
ZarlicResponse rsp = protocol.unmarshallingResponse(repData);
//6、结果处理
if(rsp.getException() != null){
throw rsp.getException();
}
return rsp.getReturnValue();
}
}
}
服务代理类由客户端代理工厂类产生,代理方式是基于Java的动态代理。在处理类ClientInvocationHandler的invoke函数中,定义了一系列的操作,包括获取服务、选择服务提供者、构造请求对象、编组请求对象、网络请求客户端发送请求、解组响应消息、异常处理等。
3.3.2.4 消息协议
public interface MessageProtocol {
byte[] marshallingRequest(ZarlicRequest req) throws Exception;
ZarlicRequest unmarshallingRequest(byte[] data) throws Exception;
byte[] marshallingResponse(ZarlicResponse rsp) throws Exception;
ZarlicResponse unmarshallingResponse(byte[] data) throws Exception;
}
public class JavaSerializeMessageProtocol implements MessageProtocol {
private byte[] serialize(Object obj) throws Exception {
ByteArrayOutputStream bout = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(bout);
out.writeObject(obj);
return bout.toByteArray();
}
@Override
public byte[] marshallingRequest(ZarlicRequest req) throws Exception {
return this.serialize(req);
}
@Override
public ZarlicRequest unmarshallingRequest(byte[] data) throws Exception {
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data));
return (ZarlicRequest) in.readObject();
}
@Override
public byte[] marshallingResponse(ZarlicResponse rsp) throws Exception {
return this.serialize(rsp);
}
@Override
public ZarlicResponse unmarshallingResponse(byte[] data) throws Exception {
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data));
return (ZarlicResponse) in.readObject();
}
}
消息协议主要是定义了客户端如何编组请求、解组响应,服务端如何解组请求、编组响应这四个操作规范。
3.4 服务端编写 3.4.1 服务端需要做什么首先,服务端要提供远程服务,必须具备服务注册及暴露的能力;在这之后,还需要开启网络服务,供客户端连接。有些项目可能既是服务提供者,又是服务消费者,那什么时候开启服务,什么时候注入服务呢?这里我们引入一个RPC处理者的概念,由它来帮我们开启服务,以及注入服务。
3.4.2 具体实现服务端的代码结构
服务端做的事情也很简单,注册服务并暴露服务,然后开启网络服务;如果服务端也是消费者,则注入远程服务。
服务注册和服务注入依赖两个自定义注解来实现:
- @Service:注册服务
- @InjectService:注入服务
下面是他们的实现代码:
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@documented //这个注解只是用来标注生成javadoc的时候是否会被记录。
@Component
public @interface Service {
String value() default "";
}
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@documented
public @interface InjectService {
}
3.4.2.1 服务注册(暴露)
public interface ServiceRegister {
void register(ServiceObject so) throws Exception;
ServiceObject getServiceObject(String name) throws Exception;
}
public class DefaultServiceRegister implements ServiceRegister{
private Map serviceMap = new HashMap<>();
protected String protocol;
protected Integer port;
@Override
public void register(ServiceObject so) throws Exception {
if(so == null){
throw new IllegalArgumentException("Parameter cannot be empty.");
}
this.serviceMap.put(so.getName,so);
}
@Override
public ServiceObject getServiceObject(String name) throws Exception {
return serviceMap.get(name);
}
}
public class ZookeeperExportServiceRegister extends DefaultServiceRegister implements ServiceRegister{
private ZkClient client;
public ZookeeperExportServiceRegister(String zkAddress, Integer port, String protocol) {
client = new ZkClient(zkAddress);
client.setZkSerializer(new ZookeeperSerializer());
this.port = port;
this.protocol = protocol;
}
@Override
public void register(ServiceObject so) throws Exception {
super.register(so);
Service service = new Service();
String host = InetAddress.getLocalHost().getHostAddress();
String address = host + ":" + port;
service.setAddress(address);
service.setName(so.getClass().getName());
service.setProtocol(protocol);
this.exportService(service); //暴露服务
}
private void exportService(Service serviceResource) {
String serviceName = serviceResource.getName();
String uri = JSON.toJSONString(serviceResource);
try {
uri = URLEncoder.encode(uri, ZarlicConstant.UTF_8);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
String servicePath = ZarlicConstant.ZK_SERVICE_PATH + ZarlicConstant.PATH_DELIMITER + serviceName + "/service";
if(!client.exists(servicePath)){
client.createPersistent(servicePath,true);
}
String uriPath = servicePath + ZarlicConstant.PATH_DELIMITER + uri;
if (client.exists(uriPath)) {
client.delete(uriPath);
}
client.createEphemeral(uriPath); //创建临时节点
}
}
将指定ServiceObject对象序列化后保存到ZK上,供客户端发现。同时会将服务对象缓存起来,在客户端调用服务时,通过缓存的ServiceObject对象反射指定服务,调用方法。
3.4.2.2 网络服务
public abstract class RpcServer {
protected int port;
protected String protocol;
protected RequestHandler handler;
public RpcServer(int port, String protocol, RequestHandler handler) {
super();
this.port = port;
this.protocol = protocol;
this.handler = handler;
}
public abstract void start();
public abstract void stop();
}
public class NettyRpcServer extends RpcServer{
private static Logger logger = LoggerFactory.getLogger(NettyRpcServer.class);
private Channel channel;
public NettyRpcServer(int port, String protocol, RequestHandler handler) {
super(port, protocol, handler);
}
@Override
public void start() {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,100)
.handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ChannelRequestHandler());
}
});
// 启动服务
ChannelFuture f = b.bind(port).sync();
logger.info("Server started successfully.");
channel = f.channel();
// 等待服务通道关闭
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放线程组资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
@Override
public void stop() {
this.channel.close();
}
private class ChannelRequestHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("Channel active:{}", ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.info("The server receives a message: {}", msg);
ByteBuf msgBuf = (ByteBuf) msg;
byte[] req = new byte[msgBuf.readableBytes()];
msgBuf.readBytes(req);
byte[] res = handler.handleRequest(req);
logger.info("Send response:{}", msg);
ByteBuf respBuf = Unpooled.buffer(res.length);
respBuf.writeBytes(res);
ctx.write(respBuf);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
logger.error("Exception occurred:{}", cause.getMessage());
ctx.close();
}
}
}
public class RequestHandler {
private MessageProtocol protocol;
private ServiceRegister serviceRegister;
public RequestHandler(MessageProtocol protocol, ServiceRegister serviceRegister) {
super();
this.protocol = protocol;
this.serviceRegister = serviceRegister;
}
public byte[] handleRequest(byte[] data) throws Exception {
// 1、解组消息
ZarlicRequest req = this.protocol.unmarshallingRequest(data);
// 2、查找服务对象
ServiceObject so = this.serviceRegister.getServiceObject(req.getServiceName());
ZarlicResponse rsp = null;
if(so == null){
rsp = new ZarlicResponse(ZarlicStatus.NOT_FOUND);
}else {
// 3、反射调用对应的过程方法
try {
Method method = so.getClazz().getMethod(req.getMethod(), req.getParameterTypes());
Object returnValue = method.invoke(so.getObj(), req.getParameters());
rsp = new ZarlicResponse(ZarlicStatus.SUCCESS);
rsp.setReturnValue(returnValue);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException
| InvocationTargetException e) {
rsp = new ZarlicResponse(ZarlicStatus.ERROR);
rsp.setException(e);
}
}
// 4、编组响应消息
return this.protocol.marshallingResponse(rsp);
}
}
3.4.2.3 RPC处理者
public class DefaultRpcProcessor implements ApplicationListener{ @Resource private ClientProxyFactory clientProxyFactory; @Resource private ServiceRegister serviceRegister; @Resource private RpcServer rpcServer; @Override public void onApplicationEvent(ContextRefreshedEvent event) { if(Objects.isNull(event.getApplicationContext().getParent())){ ApplicationContext context = event.getApplicationContext(); // 开启服务 startServer(context); // 注入Service injectService(context); } } private void injectService(ApplicationContext context) { String[] names = context.getBeanDefinitionNames(); for (String name : names) { Class> clazz = context.getType(name); if (Objects.isNull(clazz)) continue; Field[] fields = clazz.getDeclaredFields(); for (Field field : fields) { InjectService injectLeisure = field.getAnnotation(InjectService.class); if (Objects.isNull(injectLeisure)) continue; Class> fieldClass = field.getType(); Object object = context.getBean(name); field.setAccessible(true); try { field.set(object,clientProxyFactory.getProxy(fieldClass)); } catch (IllegalAccessException e) { e.printStackTrace(); } } } } private void startServer(ApplicationContext context) { Map beans = context.getBeansWithAnnotation(Service.class); if (beans.size() != 0) { boolean startServerFlag = true; for (Object obj : beans.values()) { try { Class> clazz = obj.getClass(); Class>[] interfaces = clazz.getInterfaces(); ServiceObject so; if(interfaces.length != 1){ Service service = clazz.getAnnotation(Service.class); String value = service.value(); if(value.equals("")){ startServerFlag = false; throw new UnsupportedOperationException("The exposed interface is not specific with '" + obj.getClass().getName() + "'"); } so = new ServiceObject(value,Class.forName(value),obj); }else { Class> superClass = interfaces[0]; so = new ServiceObject(superClass.getName(),superClass,obj); } serviceRegister.register(so); } catch (Exception e) { e.printStackTrace(); } } if (startServerFlag) { rpcServer.start(); } } } }
DefaultRpcProcessor实现了ApplicationListener,并监听了ContextRefreshedEvent事件,其效果就是在Spring启动完毕过后会收到一个事件通知,基于这个机制,就可以在这里开启服务,以及注入服务。因为一切已经准备就绪了,所需要的资源都是OK的。
3.5 配置类
@EnableConfigurationProperties(ZarlicRpcProperty.class)
@ConfigurationProperties("zarlic.rpc")
public class ZarlicRpcProperty {
private String registerAddress = "127.0.0.1:2181";
private Integer serverPort = 19000;
private String protocol = "zarlic";
public String getRegisterAddress() {
return registerAddress;
}
public void setRegisterAddress(String registerAddress) {
this.registerAddress = registerAddress;
}
public Integer getServerPort() {
return serverPort;
}
public void setServerPort(Integer serverPort) {
this.serverPort = serverPort;
}
public String getProtocol() {
return protocol;
}
public void setProtocol(String protocol) {
this.protocol = protocol;
}
}
@Configuration
public class AutoConfiguration {
@Bean
public DefaultServiceRegister defaultServiceRegister(){
return new DefaultServiceRegister();
}
@Bean
public ClientProxyFactory clientProxyFactory(@Autowired ZarlicRpcProperty zarlicRpcProperty){
ClientProxyFactory clientProxyFactory = new ClientProxyFactory();
//设置服务发现者
clientProxyFactory.setServiceDiscoverer(new ZookeeperServiceDiscoverer(zarlicRpcProperty.getRegisterAddress()));
//设置支持的协议
Map supportMessageProtocols = new HashMap<>();
supportMessageProtocols.put(zarlicRpcProperty.getProtocol(),new JavaSerializeMessageProtocol());
clientProxyFactory.setSupportMessageProtocols(supportMessageProtocols);
//设置网络层的实现
clientProxyFactory.setNetClient(new NettyNetClient());
return clientProxyFactory;
}
@Bean
public ServiceRegister serviceRegister(@Autowired ZarlicRpcProperty zarlicRpcProperty){
return new ZookeeperExportServiceRegister(zarlicRpcProperty.getRegisterAddress(), zarlicRpcProperty.getServerPort(),zarlicRpcProperty.getProtocol());
}
@Bean
public RequestHandler requestHandler(@Autowired ServiceRegister serviceRegister){
return new RequestHandler(new JavaSerializeMessageProtocol(),serviceRegister);
}
@Bean
public RpcServer rpcServer(@Autowired RequestHandler requestHandler,@Autowired ZarlicRpcProperty zarlicRpcProperty){
return new NettyRpcServer(zarlicRpcProperty.getServerPort(), zarlicRpcProperty.getProtocol(),requestHandler);
}
@Bean
public ZarlicRpcProperty zarlicRpcProperty(){
return new ZarlicRpcProperty();
}
}



