- Java 客户端操作
- Java 客户端 API
- 服务器的动态感知
- 服务注册
- 服务发现
- 分布式锁业务处理
- 单机环境(一个虚拟机中)
- 分布式环境_同名节点
- 分布式环境_顺序节点
Java 从 0 到架构师目录:【Java从0到架构师】学习记录
Zookeeper 连接状态:
Java 客户端操作:
- 自带的 zkclient
org.apache.zookeeper zookeeper 3.6.0
- Apache 开源的 Curator
org.apache.curator curator-framework 4.3.0 org.apache.curator curator-client 4.3.0
- Apache 开源的 ZkClient (com.101tec)
Java 客户端 APIcom.101tec zkclient 0.11
我们使用 Zookeeper 自带的 zkclient 来操作:
创建会话 :
@SpringBootApplication
public class ZookeeperApiDemoApplication {
private static String ZK_SERVER_ADDR="192.168.48.100:2181,192.168.48.100:2182,192.168.48.100:2183";
private static int SESSION_TIMEOUT=30000;
public static void main(String[] args) {
SpringApplication.run(ZookeeperApiDemoApplication.class, args);
}
//创建一个zookeeper的连接
@Bean
public ZooKeeper zooKeeper() throws Exception{
// 第一个参数: 连接地址和端口 第二个参数: 会话超时时间, 第三个参数: 事件监听程序
ZooKeeper zooKeeper = new ZooKeeper(ZK_SERVER_ADDR, SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("event = " + event);
if(event.getState()== Event.KeeperState.SyncConnected){
System.out.println("zookeeper客户端连接成功");
}
}
});
return zooKeeper;
}
}
创建节点 :
@RequestMapping("createNode")
public String createNode(String path, String data, String type) throws Exception{
String result = zooKeeper.create(path,
data.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.valueOf(type));
return result;
}
获取节点中的数据 :
// 同步获取数据
@RequestMapping("getData")
public String getData(String path) throws Exception {
// 1 先去查询版本信息 如果没有, 返回的是一个null
Stat stat = zooKeeper.exists(path, false);
// 同步获取数据
byte[] data = zooKeeper.getData(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("event = " + event);
}
}, stat);
System.out.println("new String(data) = " + new String(data));
return new String(data);
}
// 异步数据处理
@RequestMapping("getDataAsync")
public String getDataAsync(String path) throws Exception{
// 1 先去查询版本信息
Stat stat = zooKeeper.exists(path, false);
zooKeeper.getData(path, false, new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
System.out.println("异步处理回调数据");
System.out.println("收到的数据:"+new String(data));
System.out.println("ctx = " + ctx);
}
}, "测试数据");
return "异步获取数据";
}
// 获取子节点列表
@RequestMapping("getChildren")
public List getChildren(String path) throws Exception{
List children = zooKeeper.getChildren(path, false);
return children;
}
判断节点是否存在 :
@RequestMapping("exists")
public boolean exists(String path) throws Exception{
Stat stat = zooKeeper.exists(path, false);
return stat != null ;
}
删除节点 :
@RequestMapping("delete")
public boolean delete(String path) throws Exception{
Stat stat = zooKeeper.exists(path, false);
if (stat != null){
zooKeeper.delete(path,stat.getVersion());
}
return true;
}
更新数据 :
@RequestMapping("update")
public boolean update(String path,String data) throws Exception{
Stat stat = zooKeeper.exists(path, false);
if (stat != null){
zooKeeper.setData(path, data.getBytes(), stat.getVersion());
}
return true;
}
事件处理 :
绑定一次事件:
@RequestMapping("addWatch1")
public String addWatch1(String path) throws Exception{
Stat stat = zooKeeper.exists(path, false);
// 定义一个监视器对象
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) { // 数据改变事件,而且还是一次性
System.out.println("事件类型:" + event.getType());
System.out.println("数据发生改变, 请及时更新");
try {
byte[] data = zooKeeper.getData(path, this, stat);
System.out.println("更新后的数据:" + new String(data));
} catch (Exception e) {
e.printStackTrace();
}
}
};
zooKeeper.getData(path, watcher, stat);
return "success";
}
绑定永久事件:
@RequestMapping("addWatch2")
public String addWatch2(String path) throws Exception{
Stat stat = zooKeeper.exists(path, false);
// 只是获取数据, 没有绑定事件
byte[] data = zooKeeper.getData(path, null, stat);
System.out.println("获取到数据:" + new String(data));
// 绑定永久的事件 --> 1 数据变化事件 2 子节点改变事件
zooKeeper.addWatch(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("event = " + event);
// 数据改变事件
if (event.getType() == Event.EventType.NodeDataChanged){
try {
// 重新获取数据
Stat stat = zooKeeper.exists(path, false);
// 只是获取数据, 没有绑定事件
byte[] data = zooKeeper.getData(path, null, stat);
System.out.println("更新的数据:" + new String(data));
} catch (Exception e) {
e.printStackTrace();
}
// 子节点改变事件
} else if (event.getType() == Event.EventType.NodeChildrenChanged){
// 重新获取子节点列表
System.out.println("子节点数据发生改变");
}
}
}, AddWatchMode.PERSISTENT);
return "success";
}
递归绑定事件:对于创建的节点以及子节点都绑定事件
@RequestMapping("addWatch")
public List addWatch(String path) throws Exception{
// 1 先获取所有的子节点
List children = zooKeeper.getChildren(path, false);
// 2 绑定一个监听事件
zooKeeper.addWatch(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged){
System.out.println("子节点数据发送改变");
System.out.println("重新获取子节点数据");
try {
List children1 = zooKeeper.getChildren(path, false);
System.out.println("children1 = " + children1);
System.out.println("=========================");
} catch (Exception e) {
e.printStackTrace();
}
} else if (event.getType() == Event.EventType.NodeDataChanged){
System.out.println("节点数据发生改变");
try {
byte[] data = zooKeeper.getData(path, false, new Stat());
System.out.println("获取到的数据是:" + new String(data));
} catch (Exception e) {
e.printStackTrace();
}
}
}
}, AddWatchMode.PERSISTENT_RECURSIVE);
return children;
}
服务器的动态感知
服务注册
创建一个 SpringBoot 项目
- 导入对应的依赖包
org.apache.zookeeper zookeeper 3.6.0
-
实例化一个 Zookeeper 的客户端连接
-
在连接成功以后,开始创建一个对应的临时顺序节点,注册自己的 ip 和端口
启动项目,连接zk,并且注册地址和端口信息
- 配置文件
server.port=8888 server.host=192.168.48.1
- 实现代码
// 注册服务
@SpringBootApplication
public class ZookeeperSeckillServerApplication{
private static String ZK_SERVER_ADDR = "192.168.48.100:2181,192.168.48.100:2182,192.168.48.100:2183";
private static int SESSION_TIMEOUT = 30000;
private static String PATH = "/servers";
private static String SUB_PATH = "/seckillServer";
@Value("${server.host}")
private String host;
@Value("${server.port}")
private String port;
private ZooKeeper zooKeeper;
public static void main(String[] args) {
SpringApplication.run(ZookeeperSeckillServerApplication.class, args);
}
@Bean
public ZooKeeper zooKeeper() throws Exception {
// 参数1: 连接地址和端口, 参数2: 会话超过事件, 参数3:事件监听程序
zooKeeper = new ZooKeeper(SERVER_ADDR, SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("event = " + event);
if(event.getState() == Event.KeeperState.SyncConnected){
System.out.println("zookeeper客户端连接成功");
//注册对应的信息
try {
zooKeeper.create(PATH + SUB_PATH, (host + ":" + port).getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
return zooKeeper;
}
}
服务发现
创建一个 SpringBoot 项目
- 导入依赖
org.apache.zookeeper zookeeper 3.6.0
- 创建一个 Zookeeper 客户端连接
- 连接成功以后,获取地址列表
- 绑定子节点改变事件(每次改变获取最新的的服务地址)
启动项目,连接 zk,并且获取服务地址列表
注册永久的事件监听:
@SpringBootApplication
public class ZookeeperOrderServerApplication {
private static String ZK_SERVER_ADDR = "192.168.48.100:2181,192.168.48.100:2182,192.168.48.100:2183";
private static int SESSION_TIMEOUT = 30000;
private static String PATH = "/servers";
public static List addrList;
// volatile: 保证在多线程之间的变量的可见性
private volatile ZooKeeper zooKeeper;
public static void main(String[] args) {
SpringApplication.run(ZookeeperOrderServerApplication.class, args);
}
@Bean
public ZooKeeper zooKeeper() throws Exception{
// 第一个参数: 连接地址和端口 第二个参数: 会话超时时间, 第三个参数: 事件监听程序
zooKeeper = new ZooKeeper(ZK_SERVER_ADDR, SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("event = " + event);
if(event.getState()== Event.KeeperState.SyncConnected){
System.out.println("zookeeper客户端连接成功");
try {
//1 获取对应的地址列表
getData();
//2 绑定永久的事件监听
zooKeeper.addWatch(PATH, new Watcher() {
@Override
public void process(WatchedEvent event) {// 开启另外的线程处理
try {
getData();
} catch (Exception e) {
e.printStackTrace();
}
}
}, AddWatchMode.PERSISTENT);
} catch (Exception e) {
e.printStackTrace();
}
}
}
//获取数据
private void getData() throws KeeperException, InterruptedException {
List serverAddr = zooKeeper.getChildren(PATH, null);
List tempList = new ArrayList<>();
for (String path : serverAddr) {
//获取节点路径数据
byte[] data = zooKeeper.getData(PATH + "/" + path, null, new Stat());
String addrInfo = new String(data);
// 把数据添加到临时列表
tempList.add(addrInfo);
}
addrList = tempList;
System.out.println("获取到秒杀服务的最新地址n" + addrList);
}
});
return zooKeeper;
}
}
分布式锁业务处理
为什么程序中需要锁?
- 多任务环境:多个任务同时执行,可以是多线程,也可以是多进程
- 多个任务的资源共享操作:所有的任务都需要对同一资源进行写操作
- 对资源的访问是互斥的:对于资源的访问,多个任务同时执行,同一时间只能一个任务访问资源,其他的任务处于阻塞状态
锁的基本概念:
- 竞争锁:任务通过竞争获取锁才能对该资源进行操作
公平竞争:按照一定的顺序,先来先执行
非公平竞争:没有顺序,不管先后顺序执行 - 占有锁:当有一个任务在对资源进行更新时,其他任务都不可以对这个资源进行操作
- 任务阻塞
- 释放锁:直到该任务完成更新,释放资源
锁的应用场景:
- 单机环境(一个虚拟机中)
- 分布式环境_同名节点
- 分布式环境_顺序节点
业务实现:
// 订单ID生成器
public class OrderIDGenerator {
private int count = 0;
public synchronized String getId(){
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-dd-mm");
String format = sdf.format(new Date());
try {
// 模拟网络延迟
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
return format + "-" + (++count);
}
}
测试实现:
public class OrderService implements Runnable {
private OrderIDGenerator orderIDGenerator = null;
private static CountDownLatch countDownLatch = new CountDownLatch(50);
private static Set result = new HashSet<>(50);
public OrderService(OrderIDGenerator orderIDGenerator) {
this.orderIDGenerator = orderIDGenerator;
}
public static void main(String[] args) throws Exception{
OrderIDGenerator idGenerator = new OrderIDGenerator();
System.out.println("开始模拟多线程生成订单号");
for(int i = 0; i < 50; i++){
new Thread(new OrderService(idGenerator)).start();
}
countDownLatch.await();
System.out.println("生成的订单号个数:" + result.size());
System.out.println("======================");
for (String order : result) {
System.out.println(order);
}
}
@Override
public void run() {
result.add(orderIDGenerator.getId());
countDownLatch.countDown();
}
}
分布式环境_同名节点
分布式锁业务流程分析:
分布式锁流程图:
业务实现:
@RestController
public class OrderController {
private RestTemplate restTemplate = new RestTemplate();
@Autowired
private ZooKeeper zooKeeper;
private String path = "/locks";
private String node = "/orderIdLock";
@RequestMapping("createOrder")
public String createOrder() throws Exception{
// 获取id
if (tryLock()) {
// 调用业务方法
String id = restTemplate.getForObject("http://localhost:8080/getId", String.class);
System.out.println("获取到的id:" + id);
// 释放锁
unlock();
} else {
waitLock();
}
return "success";
}
// 竞争锁资源: 尝试获取id, 如果获取到了, 返回true, 否则返回false
public boolean tryLock(){
try {
// 因为不是顺序节点, 对于同一个路径, 只能创建一次
String path = zooKeeper.create(this.path + this.node,
null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
// 释放锁资源
public void unlock(){
try {
Stat stat = zooKeeper.exists(this.path + this.node, false);
zooKeeper.delete(this.path + this.node, stat.getVersion());
} catch (Exception e) {
e.printStackTrace();
}
}
// 阻塞状态
public void waitLock(){
try {
zooKeeper.getChildren(this.path, new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
createOrder(); // 重新创建订单
} catch (Exception e) {
e.printStackTrace();
}
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
分布式环境_顺序节点
分布式锁流程:
@RestController
public class Order02Controller {
private RestTemplate restTemplate = new RestTemplate();
@Autowired
private ZooKeeper zooKeeper;
private String path = "/locks02";
private String node = "/orderIdLock";
@RequestMapping("createOrder02")
public String createOrder() throws Exception {
String currentPath = zooKeeper.create(this.path + this.node,
null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
currentPath = currentPath.substring(currentPath.lastIndexOf("/") + 1);
//获取id
if (tryLock(currentPath)) {
// 调用业务方法
String id = restTemplate.getForObject("http://localhost:8080/getId", String.class);
System.out.println("获取到的id:" + id);
// 释放锁
unlock(currentPath);
} else {
waitLock(currentPath);
}
return "success";
}
//尝试获取id, 如果获取到了, 返回true, 否则返回false
//竞争锁资源
public boolean tryLock(String currentPath) {
try {
//获取到所有的节点
List children = zooKeeper.getChildren(this.path, false);
Collections.sort(children);
if (StringUtils.pathEquals(currentPath, children.get(0))) {
return true;
} else {
return false;
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
//释放锁资源
public void unlock(String currentPath) {
try {
Stat stat = zooKeeper.exists(this.path + "/" + currentPath, false);
zooKeeper.delete(this.path + "/" + currentPath, stat.getVersion());
} catch (Exception e) {
e.printStackTrace();
}
}
//阻塞状态
public void waitLock(String currentPath) {
try {
CountDownLatch count = new CountDownLatch(1);
List children = zooKeeper.getChildren(this.path, false);
//获取到前一个节点
Collections.sort(children);
int index = children.indexOf(currentPath);
if (index > 0) {
String preNode = children.get(index - 1);
//前一个节点删除操作
zooKeeper.getData(this.path + "/" + preNode, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDeleted) {
try {
String id = restTemplate.getForObject("http://localhost:8080/getId", String.class);
System.out.println("获取到的id:" + id);
//释放锁
unlock(currentPath);
count.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}, new Stat());
}
count.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}



