本项目所有代码可见:https://github.com/weiyu-zeng/SimpleRPC
前言在simpleRPC-07中,我们为注册的服务接口提供一个负载均衡模块,
实现 zookeeper在simpleRPC-06时我们安装了zookeeper
这里我们还要把zookeeper打开:
启动成功之后,我们再去开客户端:
点回车,输入 ls /:
我们之前的MyRPC还在,ok的,继续写代码
创建名为simpleRPC-07的module,然后在java下创建com.rpc的package
然后我们回到simpleRPC-06,把com.rpc中的所有内容,复制一下:
然后回到simpleRPC-07中的com.rpc,粘贴过来:
然后记得把log4j的配置文件也复制过来:
依赖配置跟simpleRPC-06也是一样的:
pom.xml
SimpleRPC org.example 1.0-SNAPSHOT 4.0.0 simpleRPC-078 8 org.projectlombok lombok1.18.12 provided io.netty netty-all4.1.51.Final com.alibaba fastjson1.2.67 org.apache.curator curator-recipes2.13.0 org.slf4j slf4j-nop1.7.30
到此为止,simpleRPC-07的内容和simpleRPC-06是一样的了。
记得reload一下maven,我一般喜欢这样:
loadbalance创建一个loadbalance 的package:
我们定义负载均衡接口:LoadBalance.java:
package com.rpc.loadbalance;
import java.util.List;
public interface LoadBalance {
String balance(List addressList);
}
我们编写随机负载均衡:
RandomLoadBalance.java
package com.rpc.loadbalance;
import java.util.List;
import java.util.Random;
public class RandomLoadBalance implements LoadBalance {
@Override
public String balance(List addressList) {
Random random = new Random();
int choose = random.nextInt(addressList.size());
System.out.println("负载均衡选择了" + choose + "服务器");
return addressList.get(choose);
}
}
我们编写轮询负载均衡:
RoundLoadBalance.java
package com.rpc.loadbalance;
import java.util.List;
public class RoundLoadBalance implements LoadBalance{
private int choose = -1;
@Override
public String balance(List addressList) {
choose++;
choose = choose % addressList.size(); // 索引
System.out.println("负载均衡选择了" + choose + "服务器");
return addressList.get(choose); //
}
}
register
ZkServiceRegister.java
package com.rpc.register;
import com.rpc.loadbalance.LoadBalance;
import com.rpc.loadbalance.RoundLoadBalance;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import java.net.InetSocketAddress;
import java.util.List;
public class ZkServiceRegister implements ServiceRegister {
// curator 提供的zookeeper客户端
private Curatorframework client;
// zookeeper根路径结点
private static final String ROOT_PATH = "MyRPC";
// 初始化负载均衡器, 这里用的是随机, 一般通过构造函数传入
private LoadBalance loadBalance = new RoundLoadBalance();
// 构造方法
// 这里负责zookeeper客户端的初始化,并与zookeeper服务端建立连接。
// 初始化包括指定重连策略,指定连接zookeeper的端口,指定超时时间,指定命名空间
// 初始化完成之后start()开启zookeeper客户端。
public ZkServiceRegister() {
// 重连策略:指数时间重试
RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
// zookeeper的地址固定,不管是服务提供者还是消费者,都要与之建立连接
// sessionTimeoutMs 与 zoo.cfg中的tickTime 有关系,
// zk还会根据minSessionTimeout与maxSessionTimeout两个参数重新调整最后的超时值。默认分别为tickTime 的2倍和20倍
// 使用心跳监听状态
this.client = CuratorframeworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(40000)
.retryPolicy(policy)
.namespace(ROOT_PATH)
.build();
this.client.start();
System.out.println("zookeeper 连接成功");
}
// 注册:传入服务方法名(String),传入主机名和端口号的套接字地址(InetSocketAddress)
@Override
public void register(String serviceName, InetSocketAddress serverAddress) {
try {
// serviceName创建成永久节点,服务提供者下线时,不删服务名,只删地址
Stat stat = client.checkExists().forPath("/" + serviceName);
if (stat == null) {
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/" + serviceName);
}
// 路径地址,一个/代表一个节点
String path = "/" + serviceName + "/" + getServiceAddress(serverAddress);
// 临时节点,服务器下线就删除节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
} catch (Exception e) {
System.out.println("此服务已存在");
}
}
// 根据服务名返回地址
@Override
public InetSocketAddress serviceDiscovery(String serviceName) {
try {
List strings = client.getChildren().forPath("/" + serviceName);
// 负载均衡选择器,选择一个
String string = loadBalance.balance(strings);
// String string = strings.get(0);
return parseAddress(string);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
// 地址 -> XXX.XXX.XXX.XXX:port 字符串
private String getServiceAddress(InetSocketAddress serverAddress) {
return serverAddress.getHostName() + ":" + serverAddress.getPort();
}
// 字符串解析为地址:按照":"切分开,前半是host(String),后半是port(int)
private InetSocketAddress parseAddress(String address) {
String[] result = address.split(":");
return new InetSocketAddress(result[0], Integer.parseInt(result[1]));
}
}
client
和simpleRPC-06 一样
codec和simpleRPC-06 一样
common和simpleRPC-06 一样
service和simpleRPC-06 一样
server和simpleRPC-06 一样
文件结构文件结构如下:
运行先确认一下zookeeper server还在开启,否则可能卡在这个地方
zookeeper 连接成功
先运行TestServer.java:
我们在simpleRPC-06中已经注册了服务,所以显示User 和 Blog的两个服务已存在。
然后运行TestClient.java:



