- 1.zk背景介绍
- 2.zk安装
- 3.zk配置文件解析
- 4.zk的工作机制
- 1)数据存储
- 2)监听通知
- 5.zk原理(只实现了一部分paxos的功能)
- 1)原子广播协议-z(zk)-A(atomic)-B(broadcast)
- 2)角色介绍(leader和follower)
- 3)数据存储
- 4)redis和zk更新同步的流程
- 5)zk初始化同步流程
- 1)启动选举leader(zk怎么选leader?),然后进行数据同步
- 2)leader崩溃的情况(两种)
- 3)选举的leader需要保证的事情:
- 6)数据同步原理
- 6.客户端原理
- 7.zookeeper应用和总结
- 8.zk接口(根据java改编)
- 介绍:
zookeeper是一个开源、分布式的,为分布式系统提供协调管理服务的开源软件
- 介绍
在分布式系统中,有大量的微服务协同对外提供服务,这时需要一个稳定的、安全的协调管理工具,负责管理和协调这些微服务
- cap原则:
①数据一致性,举例:用户连接上一个节点,修改密码,突然断开连接上另外一个节点时,也能登录上,体现了一致性
②可用性:举例,一个节点宕机后,也能顺利连接上另外一个节点(合理的时间内得到合理的结果)
③分区容错性:假如上海和深圳机房断开连接了,依然能对外提供一致性和可用性的系统,就符合分区容错性(一般都满足分区容错性)
- zookeeper的base原则(zk满足cp原则:可用性和分区容错性)
1)ba:基本可用的意思 2)s-soft state:软状态的意思 3)e:最终一致性
- 核心思想
分布式系统中即使无法做到强一致性,使用适当的方式让分布式系统达到最终一致性
(基本可用性:也就是相应时间和功能做妥协)
- 弱状态:
允许同步时候的延时
- 对比redis
1)zk的快照相当于rdb持久化 2)zk的事务日志相当于aof 3)zk的数据跟redis一样存在内存中,只是不支持那么多的数据 (value-string,zk的string是二进制安全字符串,长度配合以 结尾的字符串)2.zk安装
1)安装依赖
2)进入源码文件夹编译zk
mvn clean install -DskipTests
3)编译c驱动
3.zk配置文件解析mvn clean -pfull -build
mvn install -pfull -build -DskipTests
/conf/zoo_sample.cfg
1)心跳包间隔,这里表示每隔2秒发送一次 tickTime = 2000 2)初始化同步,最大允许10次延时,也就是20秒内同步成功就认为成功 initLimit = 5 3)更新同步时,最大允许5次延时,也就是10秒内同步成功就成功 syncLimit = 5 4)快照或持久化数据存储路径 dataDir = /tmp/zookeeper 5)zk监听端口 clientPort = 21814.zk的工作机制
1)数据存储数据存储 + 监听通知机制
采用类似unix文件系统形式以及KV方式进行存储(节点配合树的方式)
- 补充
2)监听通知节点就是key指,value就是存储的节点的数据部分
- 定义
zk通过存储和管理关键数据,接收分布式系统中的节点来注册监听关键数据的变化,当关键数据发生变化,zk将会通知哪些注册了数据变化的节点
- 简单举例
node1注册集群,记录节点信息gate1
node2注册集群,记录节点信息gate2
node3监听集群信息,当node2宕机时,获得信息做出相应处理
- 节点类型
1)持久节点(定时写到快照中持久化) 该数据节点被创建后,就会一直存在zk上,知道删除操作主动清除这个节点 2)持久顺序节点(自动帮我们命名) 每个父结点都为它的第一级子结点维护一份顺序,用于记录每隔子节点创建的先后顺序(为给定节点名加上一个数字的后缀,这个数字后缀最大值为整型的最大值) 3)临时节点 临时节点的生命周期和客户端的会话绑定在一起,如果客户端失效,这个节点会被自动清除;临时节点不能创建子结点,只能作为叶子节点; 4)临时顺序节点 在临时节点的基础上添加了顺序特性
- 启动单点模式的zk
/bin/zkServer.sh start-foreground zoo_single.cfg //启动服务器
- 查看zk是否启动
ps axu|grep java
- 创建客户端
cd /bin
./zkCli.sh 默认端口为2181 //启动客户端
create /zhilin 创建zhilin节点(默认创建的持久化节点)
create /linke hello 创建linke节点(hello就是存储的内容)
ls / -R查看节点信息
quit退出
- 创建节点的命令
create -se /zhilin/ 创建顺序和短暂的节点,s表示顺序,e表示短暂的节点
-s /linke/ 创建持久的节点
- 节点的stat属性
- redis和zk同步的比较
5.zk原理(只实现了一部分paxos的功能) 1)原子广播协议-z(zk)-A(atomic)-B(broadcast)①redis是用环形队列同步,zk是用的先进先出的队列(zk用的事务请求)
②redis:replic_id+offset = run_id在环形缓冲区中,那么只做增量同步;zxid在队列里面那么只做增量同步,zxid(64位) = ephoch(32位,leader的序号,从1开始,宕机后重新选举就+1) +(拼接起来) 事务id(32位,从1开始,每次操作+1)
③redis的master和slaver,zk里面是leader和fllower
- 概念图
- 作用:
用来解决事务性,如果连接follwer节点去写事务的时候,事务先转发给leader,leader生成一个提案proposal并为提案生成全局的zxid,然后再原子广播给follwer和observe,follower收到了广播内容,记录事务日志后再向leader发送ack确认包收到了,leader收到过半follower的ack包之后,就commit提交再广播一次,第二次广播follower才会将数据写到内存或在内存中修改
- 备注:
2)角色介绍(leader和follower)①一般都是奇数个follower节点,zk选举有点类似paxos,都是过半生效,且允许n/2个节点宕机(5对应2个节点宕机)
- leader
事务请求的唯一调度和处理者,保证集群事务处理的顺序性;集群内部各节点的调度者
- follower
处理客户端非事务请求,转发事务请求(写事务,需要leader广播)给leader服务器;参与事务请求proposal(提案)投票;参与leader选举投票
- observer
3)数据存储只提供非事务服务,通常用于在不影响集群处理能力的前提下提升集群的非事务处理能力(降低访问压力的节点)
4)redis和zk更新同步的流程数据存储包括内存数据存储和磁盘数据存储;磁盘数据包括事务日志和快照数据
- 初始化同步流程
- zk更新同步流程(节点是健康的状态,主要解决节点数据一致性的问题)(FIFO队列去实现异步解耦)
补充解释:
1.分布式节点说的follower下面的node1到node8的任意一个节点发起事务请求
2.zxid(事务id)从1开始累加到64位整数的最大值,由ephochID和事务id组成,epoochID是选举的次数;
(learner:follower和observer合起来叫learner)
3.在更新同步的过程中,zk集群不能对外提供事务服务(可用性不强,但是强一致性),所以zk不能存储特别多的数据,因为可用性比较差
- redis同步的流程
5)zk初始化同步流程原理:redis使用环形队列,
1.异步解耦
2.salve记录环形队列的offset,从而实现增量同步
1)启动选举leader(zk怎么选leader?),然后进行数据同步初始化同步包括两种情况:1.启动选举leader,2.leader崩溃
①投票选举出leader(投票也需要follower过半策略),根据两个要素来决定谁是leader
1)事务在leader上提交了,过半的leader都响应ack了,但是在commit消息出发前挂了
2)事务在leader生成后,leader就挂了
1)选举出来的leader不能包含未提交提案
2)新选举的leader中含有最大zxid
为什么需要回滚?
有些数据在follower写入事务日志,但是没写入内存,三个阶段(提案、ack、commit提交)都有可能leader宕机了
1)差异化同步:增量同步
2)回滚再差异化同步
3)回滚同步
4)全量同步
- 创建两个线程,io线程负责⽹络事件的读写(得知命令是做什么的),completion线程处理异步回调函数和watcher调(这个线程去处理对应的命令)
⽤; - io线程将数据序列化通过⽹络发送到zk节点;
- zk节点返回数据,通过io线程反序列化,通过pipe异步发送给completion线程处理;
- zk应用
- 统⼀命名服务
- 统⼀配置管理
- 分布式锁
- 分布式队列
- 负载均衡
- zk总结
8.zk接口(根据java改编)①可用性低,但是强一致性,不利于存储大量的数据
②更新分为初始化同步和更新同步
#include#include #include #include #include #include "include/zookeeper.h" #include "include/zookeeper_log.h" typedef struct zk_mutex_s { int seq_me; // 序号 int seq_watch; // 监听的序号 char watch[64]; // 监听的节点名 } zk_mutex_t; static zk_mutex_t zmt = {0}; static int quit = 0; void zk_delete_lock_res1_children(int rc, const void *data) { printf("zk_delete_lock_res1_children rc = %dn", rc); quit = 1; } void zk_watch_children(int rc, const struct Stat *stat, const void *data) { printf("zk_watch_children_disappear rc = %dn", rc); } void zk_get_children_lock_res1(int rc, const struct String_vector *strings, const void *data) { printf("zk_get_children_lock_res1 rc = %dn", rc); if (rc == 0) { int i,v,n; n = 0; char * value_self = NULL; char * value_other = NULL; zhandle_t* zk_hdl = (zhandle_t*)data; for (i = 0; i < strings->count; i++) { sscanf(strings->data[i], "%d", &v); if (v < zmt.seq_me) { if (n == 0) { zmt.seq_watch = v; value_other = strings->data[i]; n++; } else if (n > 0 && v > zmt.seq_watch) { zmt.seq_watch = v; value_other = strings->data[i]; n++; } } else if (v == zmt.seq_me) { value_self = strings->data[i]; } } if (n == 0) { // 说明自己已经最小的了 char temp[64] = {0}; sprintf(temp, "/lock/res1/%s", value_self); printf("%s 获取锁, 获取执行权, 释放锁n", temp); zoo_adelete(zk_hdl, temp, -1, zk_delete_lock_res1_children, zk_hdl); } else if (n > 0) { // 找到可以监听的对象 memset(zmt.watch, 0, 64*sizeof(char)); sprintf(zmt.watch, "/lock/res1/%s", value_other); zoo_aexists(zk_hdl, zmt.watch, 1, zk_watch_children, zk_hdl); } } } void zk_create_lock_res1_es(int rc, const char *value, const void *data) { printf("zk_create_lock_res1_es rc = %dn", rc); if (rc == 0) { zhandle_t* zk_hdl = (zhandle_t*)data; int seq; sscanf(value, "/lock/res1/%d", &seq); printf("tname = %s seq = %dn", value, seq); zmt.seq_me = seq; if (seq > 0) { int ret; ret = zoo_aget_children(zk_hdl, "/lock/res1", 0, zk_get_children_lock_res1, data); if (ret) { printf("error: zk_create_lock_res1_es:zoo_aget_childrenn"); exit(EXIT_FAILURE); } return; } printf("%s 获取锁, 获取执行权, 释放锁n", value); zoo_adelete(zk_hdl, value, -1, zk_delete_lock_res1_children, ""); } } void zk_watcher_disconnect(zhandle_t *zh, int type, int state, const char *path, void *ctx) { if (0 == strcmp(zmt.watch, path)) { int ret = zoo_aget_children(zh, "/lock/res1", 0, zk_get_children_lock_res1, zh); if (ret) { printf("error: zk_watcher_disconnect:zoo_aget_childrenn"); exit(EXIT_FAILURE); } } } int main(int argc, const char *argv[]) { zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); zoo_set_log_stream(stdout); zhandle_t* zk_hdl = zookeeper_init("127.0.0.1:2181", zk_watcher_disconnect, 30000, 0, "zookeeper for distribute mutex.", 0); if (!zk_hdl) { printf("error: connecting to zookeeper server...n"); exit(EXIT_FAILURE); } int ret; ret = zoo_acreate(zk_hdl, "/lock/res1/", "mark", 5, &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL_SEQUENTIAL, zk_create_lock_res1_es, zk_hdl); if (ret) { printf("error: create /lock/res1 EPHEMERAL SEQUENTIALn"); exit(EXIT_FAILURE); } ret = 0; for (;;) { if (quit) break; usleep(2500); } zookeeper_close(zk_hdl); return 0; }



