要求完成改造版本:
1. 启动2个服务端,可以将IP及端口信息自动注册到Zookeeper
2. 客户端启动时,从Zookeeper中获取所有服务提供端节点信息,客户端与每一个服务端都建立连接
3. 某个服务端下线后,Zookeeper注册列表会自动剔除下线的服务端节点,客户端与下线的服务端断开连接
4. 服务端重新上线,客户端能感知到,并且与重新上线的服务端重新建立连接
---------------------------------------------------------------------------------------------------------------------------------
说明:实际开发中,使用curator作为连接zookeeper的框架。
1.先引入curator相关依赖
org.apache.curator
curator-framework
2.12.0
org.apache.curator
curator-recipes
2.12.0
2.因为提供者和消费者,均需要持有zookeeper客户端,所以把建立会话的方法放到公共模块中。
3.编写建立会话的方法
package com.lagou.zkutils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class GetCuratorClient {
// 在服务端创建节点时候,作为父节点,例如:/lg-rpc/ip:port
public static String path = "/lg-rpc";
public static Curatorframework getZkClient(){
RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3); // 用接口接收返回值
// 使用 fluent 编程风格
Curatorframework client = CuratorframeworkFactory.builder()
.connectString("127.0.0.1:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(exponentialBackoffRetry)
.namespace("base") // 设置了独立的命名该空间 /base 当前这个客户端,对zookeeper数据节点的任何操作,都是相对于该目录进行的 有利于实现不同的zookeeper的业务隔离
.build();
client.start();
System.out.println("使用fluent编程风格会话建立");
return client;
}
}
4.提供者模块的操作
4.1,主启动类
package com.lagou.rpc;
import com.lagou.rpc.provider.server.RpcServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ServerBootstrapApplication implements CommandLineRunner {
@Autowired
private RpcServer rpcServer;
public static void main(String[] args) {
SpringApplication.run(ServerBootstrapApplication.class,args);
}
// 启动netty服务端 是在spring容器启动之后执行
@Override
public void run(String... args) throws Exception {
// 根据线程启动netty服务端
new Thread(new Runnable() {
@Override
public void run() {
rpcServer.startServer("127.0.0.1",8899);
}
}).start();
}
}
4.2将服务端的信息注册到zookeeper上,修改RpcServer类
具体代码:
package com.lagou.rpc.provider.server;
import com.lagou.rpc.provider.handler.RpcServerHandler;
import com.lagou.zkutils.GetCuratorClient;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.apache.curator.framework.Curatorframework;
import org.apache.zookeeper.CreateMode;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class RpcServer implements DisposableBean {
@Autowired
private RpcServerHandler rpcServerHandler;
// netty需要bossGroup和workerGroup,在容器销毁时,在destroy()方法中关闭线程组
private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workerGroup;
public void startServer(String ip,int port){
try {
// 1. 创建线程组
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
// 2. 创建服务端启动助手
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 3. 设置参数
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class) // 设置通道实现
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 添加针对 String的编解码器,因为服务提供者和消费者,是通过json字符串传递的
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// 添加业务处理类 TODO
pipeline.addLast(rpcServerHandler);
}
});
// 4. 绑定端口
ChannelFuture channelFuture = serverBootstrap.bind(ip, port).sync();
System.out.println("------服务端启动成功-------");
// 注册服务端信息到zookeeper
connectZkAndRegister(ip,port);
// 5. 监听端口关闭的状态
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
if (bossGroup != null){
bossGroup.shutdownGracefully();
}
if (workerGroup != null){
workerGroup.shutdownGracefully();
}
}
}
private void connectZkAndRegister(String ip, int port) {
Curatorframework zkClient = GetCuratorClient.getZkClient();
String path = GetCuratorClient.path + "/" + ip + ":" + port;
try {
zkClient.create().creatingParentContainersIfNeeded()
.withMode(CreateMode.EPHEMERAL)
// 服务器端,在zk上存储请求的响应时间,默认0s 当前请求时间 + 响应时间
.forPath(path,(System.currentTimeMillis()+":"+0).getBytes());
} catch (Exception e) {
e.printStackTrace();
}
}
// 实现DisposableBean 重写方法,在容器关销毁时候执行该方法,在该方法中关闭线程组
@Override
public void destroy() throws Exception {
if (bossGroup != null){
bossGroup.shutdownGracefully();
}
if (workerGroup != null){
workerGroup.shutdownGracefully();
}
}
}
4.3注意:要求要启动两个服务端,所以要再创建一个提供者的子模块,内容复制第一份即可。
5.消费者端代码实现
思路:1.先获取zookeeper实例
2.监听节点的变化
package com.lagou.rpc.consumer;
import com.lagou.rpc.api.IUserService;
import com.lagou.rpc.consumer.client.RpcClient;
import com.lagou.rpc.consumer.proxy.RpcClientProxy;
import com.lagou.rpc.consumer.serverinfo.ServerInfo;
import com.lagou.zkutils.GetCuratorClient;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
public class ClientBootStrap {
// 实现负载均衡,客户端
// 开启定时任务,定时将耗时比较长的服务端的响应时间设置为0
// 开启线程,选择客户端进行请求操作
public static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
// 创建map集合,用来缓存(维护)服务器节点和rpc客户端之间的关系
public static Map clientMap = new HashMap<>();
// 维护连接信息
public static Map serverMap = new HashMap<>();
public static void main(String[] args) throws Exception {
// 服务端和客户端是通过接口的方式沟通的
// IUserService userService = (IUserService) RpcClientProxy.createProxy(IUserService.class);
// User user = userService.getById(2);
// System.out.println(user);
//监听zk上指定路径的节点变化
// 1.先获取zookeeper实例
Curatorframework zkClient = GetCuratorClient.getZkClient();
// 2.监控节点的变化 (对当前zkClient实例进行监听,监听的节点路径,这里是父节点:/lg-rpc)
// 后续会判断当前节点的子节点列表状态:添加或者删除,代表有机器加入zookeeper,或者机器下线
watchNode(zkClient,GetCuratorClient.path);
// 作业一不让main方法结束,一直休眠
Thread.sleep(Integer.MAX_VALUE);
}
private static void watchNode(Curatorframework zkClient, String path) throws Exception {
TreeCache treeCache = new TreeCache(zkClient, path);
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(Curatorframework curatorframework, TreeCacheEvent treeCacheEvent) throws Exception {
ChildData data = treeCacheEvent.getData();
System.out.println("data.getPath==="+data.getPath());
String str = data.getPath().substring(data.getPath().lastIndexOf("/")+1);
System.out.println("str ==="+str);
TreeCacheEvent.Type type = treeCacheEvent.getType();
if (type == TreeCacheEvent.Type.NODE_ADDED){// 如果监听的节点变化类型是添加
String value = new String(data.getData());
System.out.println("value===="+value);
String[] split = str.split(":");
if (split.length != 2){
return;
}
if (!serverMap.containsKey(str)){
String[] valueSplit = value.split(":");
if (valueSplit.length != 2){
return;
}
RpcClient rpcClient = new RpcClient(split[0], Integer.parseInt(split[1]));
clientMap.put(data.getPath(), rpcClient);
// 已经建立好了连接
IUserService iUserService = (IUserService) RpcClientProxy.createProxy(IUserService.class, rpcClient);
ServerInfo serverInfo = new ServerInfo();
serverInfo.setLastTime(Long.valueOf(valueSplit[0]));
serverInfo.setUsedTime(Long.valueOf(valueSplit[1]));
serverInfo.setiUserService(iUserService);
serverMap.put(str,serverInfo);
}
}else if (type == TreeCacheEvent.Type.NODE_REMOVED){ // 节点删除
System.out.println("监听到有节点删除:"+str);
clientMap.remove(str);
}
System.out.println("------现有连接列表-----");
Iterator> iterator = clientMap.entrySet().iterator();
while (iterator.hasNext()){
Map.Entry next = iterator.next();
System.out.println(next.getKey()+":"+next.getValue());
}
}
});
treeCache.start();
}
}
至此,第一部分已经完成。
--------------------------------------------------------------------------------------------------------------------------------
在“编程题一”的基础上,实现基于Zookeeper的简易版负载均衡策略
Zookeeper记录每个服务端的最后一次响应时间,有效时间为5秒,5s内如果该服务端没有新的请求,响应时间清零或失效。(如下图所示)
当客户端发起调用,每次都选择最后一次响应时间短的服务端进行服务调用,如果时间一致,随机选取一个服务端进行调用,从而实现负载均衡
--------------------------------------------------------------------------------------------------------------------------------
实现:
服务器端:
1.在zookeeper节点上注册信息,当前请求时间以及服务响应时间,在创建节点时,就向节点中添加了数据信息。(因为后续的请求,需要根据响应时间小的服务器进行交互)
2.实现负载均衡的操作主要在客户端 :
2.1封装服务器连接信息,为了维护服务器节点以及其连接信息
package com.lagou.rpc.consumer.serverinfo;
import com.lagou.rpc.api.IUserService;
public class ServerInfo {
// 用来封装连接信息
// 最后一次执行时间
private long lastTime;
// 响应时间
private long usedTime;
private IUserService iUserService;
public long getLastTime() {
return lastTime;
}
public void setLastTime(long lastTime) {
this.lastTime = lastTime;
}
public long getUsedTime() {
return usedTime;
}
public void setUsedTime(long usedTime) {
this.usedTime = usedTime;
}
public IUserService getiUserService() {
return iUserService;
}
public void setiUserService(IUserService iUserService) {
this.iUserService = iUserService;
}
}
2.2根据服务响应时间进行排序,选择响应时间小的服务器作为交互,并且更新zookeeper上服务器执行时间以及响应时间的信息
package com.lagou.rpc.consumer;
import com.lagou.rpc.api.IUserService;
import com.lagou.rpc.consumer.serverinfo.ServerInfo;
import com.lagou.rpc.pojo.User;
import com.lagou.zkutils.GetCuratorClient;
import org.apache.curator.framework.Curatorframework;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Map;
public class SimpleTask implements Runnable {
private Curatorframework zkClient;
public SimpleTask(Curatorframework zkClient) {
this.zkClient = zkClient;
}
@Override
public void run() {
while (true){
if (ClientBootStrap.serverMap.isEmpty()){
// 如果没有服务提供者(服务端),继续 下一次循环
continue;
}
// TODO 这种方式直接把 Map 转成 list
// 1.获取到服务提供者列表
ArrayList> entries = new ArrayList<>(ClientBootStrap.serverMap.entrySet());
// 排序 服务器列表,根据响应时间
Collections.sort(entries, new Comparator>() {
@Override
public int compare(Map.Entry o1, Map.Entry o2) {
return (int) (o1.getValue().getUsedTime() - o2.getValue().getUsedTime());
}
});
System.out.println("--------排序完毕-------");
for (Map.Entry entry : entries) {
// 一个 entry 就代表一个服务端节点
System.out.println(entry.getKey()+":"+entry.getValue());
}
// 2.选择响应时间最小的服务器交互,也就是排序中第一个服务器
ServerInfo serverInfo = entries.get(0).getValue();
System.out.println("使用的服务端:"+entries.get(0).getKey());
IUserService iUserService = serverInfo.getiUserService();
// 当前系统时间
long startTime = System.currentTimeMillis();
// 执行请求
User user = iUserService.getById(1);
System.out.println("user==="+user);
// 执行请求结束的系统时间
long endTime = System.currentTimeMillis();
// 响应时间
long usedTime = endTime - startTime;
serverInfo.setLastTime(endTime);
serverInfo.setUsedTime(usedTime);
// 更新在zookeeper节点上的最后一次请求时间和响应时间(刚开始在服务端创建节点的时候就已经初始化节点上的值,系统时间:0)
try {
zkClient.setData().forPath(GetCuratorClient.path+"/"+entries.get(0).getKey(),
(endTime+":"+usedTime).getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
2.3编写定时任务,如果响应时间超过5秒,那么把响应时间清零
package com.lagou.rpc.consumer;
import com.lagou.zkutils.GetCuratorClient;
import org.apache.curator.framework.Curatorframework;
import java.util.List;
public class ScheduleTask implements Runnable {
private Curatorframework zkClient;
public ScheduleTask(Curatorframework zkClient) {
this.zkClient = zkClient;
}
@Override
public void run() {
try {
// 查询 /lg-rpc 下的所有子节点
List stringPaths = zkClient.getChildren().forPath(GetCuratorClient.path);
for (String stringPath : stringPaths) {// 一个stringPath代表 一个服务器节点
// 获取到每一个服务器对应的节点上的内容,这里存储的是执行时间和响应时间 例如:执行时间:响应时间
byte[] bytes = zkClient.getData().forPath(GetCuratorClient.path + "/" + stringPath);
String data = new String(bytes);
String[] split = data.split(":");
if (split.length != 2){
continue;
}
// 最后一次执行时间
long lastTime = Long.parseLong(split[0]);
if (System.currentTimeMillis() - lastTime > 5000){// 如果当前时间-上一次执行时间 >5s
// 把响应时间重置为0
ClientBootStrap.serverMap.get(stringPath).setUsedTime(0);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.4编写主启动类,创建定时任务线程池,每隔5秒执行一次定时任务,创建一个线程,选择客户端,执行请求操作。
package com.lagou.rpc.consumer;
import com.lagou.rpc.api.IUserService;
import com.lagou.rpc.consumer.client.RpcClient;
import com.lagou.rpc.consumer.proxy.RpcClientProxy;
import com.lagou.rpc.consumer.serverinfo.ServerInfo;
import com.lagou.zkutils.GetCuratorClient;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ClientBootStrap {
// 实现负载均衡,客户端
// 开启定时任务,定时将耗时比较长的服务端的响应时间设置为0
// 开启线程,选择客户端进行请求操作
public static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
// 创建map集合,用来缓存(维护)服务器节点和rpc客户端之间的关系
public static Map clientMap = new HashMap<>();
// 维护服务器节点和连接信息的关系 例如:127.0.0.1:8899,lastTime:XXX(上次执行时的系统时间),UsedTime:XXX(响应时间)
public static Map serverMap = new HashMap<>();
public static void main(String[] args) throws Exception {
// 服务端和客户端是通过接口的方式沟通的
// IUserService userService = (IUserService) RpcClientProxy.createProxy(IUserService.class);
// User user = userService.getById(2);
// System.out.println(user);
//监听zk上指定路径的节点变化
// 1.先获取zookeeper实例
Curatorframework zkClient = GetCuratorClient.getZkClient();
// 2.监控节点的变化 (对哪个zkClient监听,监听的节点路径,这里是父节点)
watchNode(zkClient,GetCuratorClient.path);
// 3.定时任务,参数一:要执行的定时任务; 参数二:系统运行后延迟5s执行; 参数三:该任务执行完毕后延迟5S再次执行; 参数四:时间单位
// 消费者在启动时创建定时线程池,每隔5s自动上报,更新zookeeper临时节点的值
executorService.scheduleWithFixedDelay(new ScheduleTask(zkClient),5,5, TimeUnit.SECONDS);
// 4.选择一个客户端执行任务
new Thread(new SimpleTask(zkClient)).start();
// 作业一不让main方法结束,一直休眠
// Thread.sleep(Integer.MAX_VALUE);
}
private static void watchNode(Curatorframework zkClient, String path) throws Exception {
TreeCache treeCache = new TreeCache(zkClient, path);
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(Curatorframework curatorframework, TreeCacheEvent treeCacheEvent) throws Exception {
ChildData data = treeCacheEvent.getData();
System.out.println("data.getPath==="+data.getPath());
String str = data.getPath().substring(data.getPath().lastIndexOf("/")+1); //例如:127.0.0.1:8000(一个服务器节点)
System.out.println("str ==="+str);
TreeCacheEvent.Type type = treeCacheEvent.getType();
if (type == TreeCacheEvent.Type.NODE_ADDED){// 如果监听的节点变化类型是添加
// 获取到的就是当服务器在zookeeper上创建节点时候写入的时间信息,格式: 系统执行时间:响应时间
String value = new String(data.getData());
System.out.println("value===="+value);
String[] split = str.split(":");
if (split.length != 2){
return;
}
if (!serverMap.containsKey(str)){ // 如果容器中没有该服务器节点信息
String[] valueSplit = value.split(":");
if (valueSplit.length != 2){
return;
}
RpcClient rpcClient = new RpcClient(split[0], Integer.parseInt(split[1]));
clientMap.put(data.getPath(), rpcClient);
// 已经建立好了连接
IUserService iUserService = (IUserService) RpcClientProxy.createProxy(IUserService.class, rpcClient);
ServerInfo serverInfo = new ServerInfo();
// 设置上一次任务执行的系统时间
serverInfo.setLastTime(Long.parseLong(valueSplit[0]));
// 设置响应时间
serverInfo.setUsedTime(Long.parseLong(valueSplit[1]));
serverInfo.setiUserService(iUserService);
// 把连接信息放入容器中
serverMap.put(str,serverInfo);
}
}else if (type == TreeCacheEvent.Type.NODE_REMOVED){ // 节点删除
System.out.println("监听到有节点删除:"+str);
clientMap.remove(str);
}
System.out.println("------现有连接列表-----");
Iterator> iterator = clientMap.entrySet().iterator();
while (iterator.hasNext()){
Map.Entry next = iterator.next();
System.out.println(next.getKey()+":"+next.getValue());
}
}
});
treeCache.start();
}
}
至此,实现负载均衡功能完毕。
测试效果,需要在提供者端通过debug的方式,因为在编写SimpleTask类时,run方法中用了死循环,循环请求,方便测试。
项目地址:下载地址



