本项目所有代码可见:https://github.com/weiyu-zeng/SimpleRPC
前言本次改进我们将引入zookeeper作为RPC框架的注册中心,服务端在zookeeper上注册自己的服务,而客户端调用服务,回去zookeeper上根据服务名寻找调用的服务器地址,使得我们RPC支持集群调度通信的能力。
实现 zookeeper安装与使用zookeeper安装请见:
【zookeeper】windows版zookeeper安装与启动 可能遇到的各种问题
安装好之后,我们打开zookeeper的server:
server启动如下:
开启zookeeper的client:
如下,说明成功启动了
按回车:
输入ls /我们查看目录:
到此为止,先放在这不要关,我们写代码去。
项目创建创建一个名为simpleRPC-06的module:
创建com.rpc的package:
依赖配置pom.xml
SimpleRPC org.example 1.0-SNAPSHOT 4.0.0 simpleRPC-068 8 org.projectlombok lombok1.18.12 provided io.netty netty-all4.1.51.Final com.alibaba fastjson1.2.67 org.apache.curator curator-recipes2.13.0 org.slf4j slf4j-nop1.7.30
请注意一下,curator必须要和zookeeper版本适配,如果curator版本太高,项目将无法运行。
我们在resources目录下配置一下 log4j的配置,文件名为 log4j.properties:
log4j.properties
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%nregister
我们创建一个名为register的package:
创建注册中心的注册服务接口ServiceRegister.java:
package com.rpc.register;
import java.net.InetSocketAddress;
public interface ServiceRegister {
void register(String serviceName, InetSocketAddress serverAddress);
InetSocketAddress serviceDiscovery(String serviceName);
}
然后创建服务注册实现类 ZkServiceRegister.java:
package com.rpc.register;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import java.net.InetSocketAddress;
import java.util.List;
public class ZkServiceRegister implements ServiceRegister {
// curator 提供的zookeeper客户端
private Curatorframework client;
// zookeeper根路径结点
private static final String ROOT_PATH = "MyRPC";
// 构造方法
// 这里负责zookeeper客户端的初始化,并与zookeeper服务端建立连接。
// 初始化包括指定重连策略,指定连接zookeeper的端口,指定超时时间,指定命名空间
// 初始化完成之后start()开启zookeeper客户端。
public ZkServiceRegister() {
// 重连策略:指数时间重试
RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
// zookeeper的地址固定,不管是服务提供者还是消费者,都要与之建立连接
// sessionTimeoutMs 与 zoo.cfg中的tickTime 有关系,
// zk还会根据minSessionTimeout与maxSessionTimeout两个参数重新调整最后的超时值。默认分别为tickTime 的2倍和20倍
// 使用心跳监听状态
this.client = CuratorframeworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(40000)
.retryPolicy(policy)
.namespace(ROOT_PATH)
.build();
this.client.start();
System.out.println("zookeeper 连接成功");
}
// 注册:传入服务方法名(String),传入主机名和端口号的套接字地址(InetSocketAddress)
@Override
public void register(String serviceName, InetSocketAddress serverAddress) {
try {
// serviceName创建成永久节点,服务提供者下线时,不删服务名,只删地址
Stat stat = client.checkExists().forPath("/" + serviceName);
if (stat == null) {
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/" + serviceName);
}
// 路径地址,一个/代表一个节点
String path = "/" + serviceName + "/" + getServiceAddress(serverAddress);
// 临时节点,服务器下线就删除节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
} catch (Exception e) {
System.out.println("此服务已存在");
}
}
// 根据服务名返回地址
@Override
public InetSocketAddress serviceDiscovery(String serviceName) {
try {
List strings = client.getChildren().forPath("/" + serviceName);
// 这里默认用的第一个,后面加负载均衡
String string = strings.get(0);
return parseAddress(string);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
// 地址 -> XXX.XXX.XXX.XXX:port 字符串
private String getServiceAddress(InetSocketAddress serverAddress) {
return serverAddress.getHostName() + ":" + serverAddress.getPort();
}
// 字符串解析为地址:按照":"切分开,前半是host(String),后半是port(int)
private InetSocketAddress parseAddress(String address) {
String[] result = address.split(":");
return new InetSocketAddress(result[0], Integer.parseInt(result[1]));
}
}
接下来可以对service,client和server进行修改。
clientNettyRPCClient.java 做一点修改:
package com.rpc.client;
import com.rpc.common.RPCRequest;
import com.rpc.common.RPCResponse;
import com.rpc.register.ServiceRegister;
import com.rpc.register.ZkServiceRegister;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;
import java.net.InetSocketAddress;
public class NettyRPCClient implements RPCClient {
private static final Bootstrap bootstrap;
private static final EventLoopGroup eventLoopGroup;
private String host;
private int port;
private ServiceRegister serviceRegister; // ServiceRegister接口类class
// 构造函数:初始化zookeeper
public NettyRPCClient() {
this.serviceRegister = new ZkServiceRegister();
}
// netty客户端初始化,重复使用
static {
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new NettyClientInitializer());
}
@Override
public RPCResponse sendRequest(RPCRequest request) {
InetSocketAddress address = serviceRegister.serviceDiscovery(request.getInterfaceName());
host = address.getHostName();
port = address.getPort();
try {
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
Channel channel = channelFuture.channel();
// 发送数据
channel.writeAndFlush(request);
channel.closeFuture().sync();
// 阻塞的获得结果,通过给channel设计别名,获取特定名字下的channel中的内容(这个在hanlder中设置)
// AttributeKey是,线程隔离的,不会由线程安全问题。
// 实际上不应通过阻塞,可通过回调函数,后面可以再进行优化
AttributeKey key = AttributeKey.valueOf("RPCResponse");
RPCResponse response = channel.attr(key).get();
System.out.println(response);
return response;
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
TestClient.java 也做相应的修改:
package com.rpc.client;
import com.rpc.common.Blog;
import com.rpc.common.User;
import com.rpc.service.BlogService;
import com.rpc.service.UserService;
public class TestClient {
public static void main(String[] args) {
// 不需传host,port
RPCClient rpcClient = new NettyRPCClient();
// 把这个客户端传入代理客户端
RPCClientProxy rpcClientProxy = new RPCClientProxy(rpcClient);
// 代理客户端根据不同的服务,获得一个代理类, 并且这个代理类的方法以或者增强(封装数据,发送请求)
UserService userService = rpcClientProxy.getProxy(UserService.class);
// 服务的方法1
User userByUserId = userService.getUserByUserId(10);
System.out.println("从服务器端得到的user为:" + userByUserId);
// 服务的方法2
User user = User.builder().userName("张三").id(100).sex(true).build();
Integer integer = userService.insertUserId(user);
System.out.println("向服务器端插入数据" + integer);
// 服务的方法3
BlogService blogService = rpcClientProxy.getProxy(BlogService.class);
Blog blogById = blogService.getBlogById(10000);
System.out.println("从服务端得到的blog为:" + blogById);
}
}
client中的其他代码和simpleRPC-05一样,可以直接从simpleRPC-05复制粘贴过来,为了完整,我还是把代码放下面:
RPCClient.java
package com.rpc.client;
import com.rpc.common.RPCRequest;
import com.rpc.common.RPCResponse;
public interface RPCClient {
RPCResponse sendRequest(RPCRequest request);
}
RPCClientProxy.java
package com.rpc.client;
import com.rpc.common.RPCRequest;
import com.rpc.common.RPCResponse;
import lombok.AllArgsConstructor;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
@AllArgsConstructor
public class RPCClientProxy implements InvocationHandler {
private RPCClient client;
// jdk动态代理,每一次代理对象调用方法,会经过此方法增强(反射获取request对象,socket发送至客户端)
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// request的构建,使用了lombok中的builder,更加简洁
RPCRequest request = RPCRequest.builder().interfaceName(method.getDeclaringClass().getName())
.methodName(method.getName())
.params(args)
.paramsTypes(method.getParameterTypes())
.build();
// 数据传输
RPCResponse response = client.sendRequest(request);
// System.out.println(response);
return response.getData();
}
T getProxy(Class clazz) {
Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
return (T)o;
}
}
NettyClientInitializer.java
package com.rpc.client; import com.rpc.codec.JsonSerializer; import com.rpc.codec.MyDecode; import com.rpc.codec.MyEncode; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; public class NettyClientInitializer extends ChannelInitializer{ @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 使用自定义的编解码器 pipeline.addLast(new MyDecode()); // 编码需要传入序列化器,这里是json,还支持ObjectSerializer,也可以自己实现其他的 pipeline.addLast(new MyEncode(new JsonSerializer())); pipeline.addLast(new NettyClientHandler()); } }
NettyClientHandler.java
package com.rpc.client; import com.rpc.common.RPCResponse; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.AttributeKey; public class NettyClientHandler extends SimpleChannelInboundHandlerservice{ @Override protected void channelRead0(ChannelHandlerContext ctx, RPCResponse msg) throws Exception { // 接收到response, 给channel设计别名,让sendRequest里读取response AttributeKey key = AttributeKey.valueOf("RPCResponse"); ctx.channel().attr(key).set(msg); ctx.channel().close(); } // 跟NettyRPCServerHandler一样 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
服务暴露类加入注册的功能,ServiceProvider.java 做相应的修改:
package com.rpc.service;
import com.rpc.register.ServiceRegister;
import com.rpc.register.ZkServiceRegister;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
public class ServiceProvider {
private Map interfaceProvider;
private ServiceRegister serviceRegister;
private String host;
private int port;
public ServiceProvider(String host, int port){
// 需要传入服务端自身的服务的网络地址
this.host = host;
this.port = port;
this.interfaceProvider = new HashMap<>();
this.serviceRegister = new ZkServiceRegister();
}
public void provideServiceInterface(Object service) throws Exception {
Class>[] interfaces = service.getClass().getInterfaces();
for(Class clazz : interfaces){
// 本机的映射表
interfaceProvider.put(clazz.getName(),service);
// 在注册中心注册服务
serviceRegister.register(clazz.getName(), new InetSocketAddress(host, port));
}
}
public Object getService(String interfaceName){
return interfaceProvider.get(interfaceName);
}
}
service中的其他代码和simpleRPC-05一样,可以直接从simpleRPC-05复制粘贴过来,为了完整,我还是把代码放下面:
BlogService.java
package com.rpc.service;
import com.rpc.common.Blog;
public interface BlogService {
Blog getBlogById(Integer id);
}
BlogServiceImpl.java
package com.rpc.service;
import com.rpc.common.Blog;
public class BlogServiceImpl implements BlogService {
@Override
public Blog getBlogById(Integer id) {
Blog blog = Blog.builder()
.id(id)
.title("我的博客")
.useId(22).build();
System.out.println("客户端查询了" + id + "博客");
return blog;
}
}
UserService.java
package com.rpc.service;
import com.rpc.common.User;
public interface UserService {
// 客户端通过这个接口调用服务端的实现类
User getUserByUserId(Integer id);
// 给这个服务增加一个功能
Integer insertUserId(User user);
}
UserServiceImpl.java
package com.rpc.service;
import com.rpc.common.User;
public class UserServiceImpl implements UserService {
@Override
public User getUserByUserId(Integer id) {
// 模拟从数据库中取用户的行为
User user = User.builder()
.id(id)
.userName("he2121")
.sex(true).build();
System.out.println("客户端查询了" + id + "的用户");
return user;
}
@Override
public Integer insertUserId(User user) {
System.out.println("插入数据成功: " + user);
return 1;
}
}
server
TestServer.java 做相应的修改:
package com.rpc.server;
import com.rpc.service.*;
public class TestServer {
public static void main(String[] args) throws Exception {
UserService userService = new UserServiceImpl();
BlogService blogService = new BlogServiceImpl();
// 这里重用了服务暴露类,顺便在注册中心注册,实际上应分开,每个类做各自独立的事
ServiceProvider serviceProvider = new ServiceProvider("127.0.0.1", 8899); // 8899
serviceProvider.provideServiceInterface(userService);
serviceProvider.provideServiceInterface(blogService);
RPCServer RPCServer = new NettyRPCServer(serviceProvider);
RPCServer.start(8899);
}
}
server中的其他代码和simpleRPC-05一样,可以直接从simpleRPC-05复制粘贴过来,为了完整,我还是把代码放下面:
RPCServer.java
package com.rpc.server;
public interface RPCServer {
void start(int port);
void stop();
}
NettyRPCServer.java
package com.rpc.server;
import com.rpc.service.ServiceProvider;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.AllArgsConstructor;
@AllArgsConstructor
public class NettyRPCServer implements RPCServer {
private ServiceProvider serviceProvider;
@Override
public void start(int port) {
// netty服务线程组负责建立连接(TCP/IP连接),work负责具体的请求
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
System.out.println("Netty服务端启动了");
try {
// 启动Netty服务器
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 初始化
serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
.childHandler(new NettyServerInitializer(serviceProvider));
// 同步阻塞
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
// 死循环监听
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
@Override
public void stop() {
}
}
NettyServerInitializer.java
package com.rpc.server; import com.rpc.codec.JsonSerializer; import com.rpc.codec.MyDecode; import com.rpc.codec.MyEncode; import com.rpc.service.ServiceProvider; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import lombok.AllArgsConstructor; @AllArgsConstructor public class NettyServerInitializer extends ChannelInitializer{ private ServiceProvider serviceProvider; @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 使用自定义的解码器 pipeline.addLast(new MyDecode()); // 使用自定义的编码器,而且解码器需要传入序列化器,这里是json,还支持ObjectSerializer,也可以自己实现其他的 pipeline.addLast(new MyEncode(new JsonSerializer())); pipeline.addLast(new NettyRPCServerHandler(serviceProvider)); } }
NettyRPCServerHandler.java
package com.rpc.server; import com.rpc.common.RPCRequest; import com.rpc.common.RPCResponse; import com.rpc.service.ServiceProvider; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.AllArgsConstructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @AllArgsConstructor public class NettyRPCServerHandler extends SimpleChannelInboundHandlercommon{ private ServiceProvider serviceProvider; @Override protected void channelRead0(ChannelHandlerContext ctx, RPCRequest msg) throws Exception { // System.out.println(msg); RPCResponse response = getResponse(msg); ctx.writeAndFlush(response); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } // 这里和WorkThread里的getResponse差不多 RPCResponse getResponse(RPCRequest request) { // 得到服务名 String interfaceName = request.getInterfaceName(); // 得到服务器相应类 Object service = serviceProvider.getService(interfaceName); // 反射调用方法 Method method = null; try { method = service.getClass().getMethod(request.getMethodName(), request.getParamsTypes()); Object invoke = method.invoke(service, request.getParams()); return RPCResponse.success(invoke); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { e.printStackTrace(); System.out.println("方法执行错误"); return RPCResponse.fail(); } } }
和simpleRPC-05一样,可以直接复制过来。
codec和simpleRPC-05一样,可以直接复制过来。
文件结构simpleRPC-06的文件结构如下:
运行启动TestServer.java :
然后启动TestClient.java:
我们来看看我们最开始开的zookeeper客户端:
现在输入ls /
发现我们多了一个结点 MyRPC:
输入ls /MyRPC:
可以看到我们注册的服务都在这里,成功!



