- zookeeper Java客户端
- 项目构建
- 创建客户端实例
- 测试结果
- 创建zookeeper实例参数说明
- 演示java客户端增删查改zookeeper节点
- Apache Curator 开源客户端
- 什么是 Curator
- Curator 实战
- 会话创建
- 创建节点
- 异步接口
- zookeeper集群&不停机动态扩容/缩容
- zookeeper集群搭建
zookeeper 官方的客户端没有和服务端代码分离,他们为同一个jar 文件,所以我们直接引入 zookeeper的maven即可, 这里版本请保持与服务端版本一致,不然会有很多兼容性的问题
创建客户端实例org.apache.zookeeper zookeeper 3.5.8
- 定义要发送的包装类
package zookeeper.client;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@ToString
@NoArgsConstructor
public class MyConfig {
private String key;
private String name;
}
- 定义zookeeper客户端
package zookeeper.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@Slf4j
public class ConfigCenter {
private final static String CONNECT_STR="192.168.1.104:2181";
private final static Integer SESSION_TIMEOUT=30*1000;
private static ZooKeeper zooKeeper=null;
private static CountDownLatch countDownLatch=new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
zooKeeper=new ZooKeeper(CONNECT_STR, SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType()== Event.EventType.None
&& event.getState() == Event.KeeperState.SyncConnected){
log.info("连接已建立");
countDownLatch.countDown();
}
}
});
countDownLatch.await();
MyConfig myConfig = new MyConfig();
myConfig.setKey("anykey");
myConfig.setName("anyName");
ObjectMapper objectMapper=new ObjectMapper();
byte[] bytes = objectMapper.writevalueAsBytes(myConfig);
String s = zooKeeper.create("/myconfig", bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Watcher watcher = new Watcher() {
@SneakyThrows
@Override
public void process(WatchedEvent event) {
if (event.getType()== Event.EventType.NodeDataChanged
&& event.getPath()!=null && event.getPath().equals("/myconfig")){
log.info(" PATH:{} 发生了数据变化" ,event.getPath());
byte[] data = zooKeeper.getData("/myconfig", this, null);
MyConfig newConfig = objectMapper.readValue(new String(data), MyConfig.class);
log.info("数据发生变化: {}",newConfig);
}
}
};
byte[] data = zooKeeper.getData("/myconfig", watcher, null);
MyConfig originalMyConfig = objectMapper.readValue(new String(data), MyConfig.class);
log.info("原始数据: {}", originalMyConfig);
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}
}
- log4j.properties配置类
# Copyright 2012 The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR ConDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Define some default values that can be overridden by system properties
zookeeper.root.logger=ERROR, ConSOLE
zookeeper.console.threshold=INFO
zookeeper.log.dir=.
zookeeper.log.file=zookeeper.log
zookeeper.log.threshold=INFO
zookeeper.log.maxfilesize=256MB
zookeeper.log.maxbackupindex=20
zookeeper.tracelog.dir=${zookeeper.log.dir}
zookeeper.tracelog.file=zookeeper_trace.log
log4j.rootLogger=${zookeeper.root.logger}
#
# console
# Add "console" to rootlogger above if you want to use this
#
log4j.appender.ConSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Threshold=${zookeeper.console.threshold}
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n
#
# Add ROLLINGFILE to rootLogger to get log file output
#
log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender
log4j.appender.ROLLINGFILE.Threshold=${zookeeper.log.threshold}
log4j.appender.ROLLINGFILE.File=${zookeeper.log.dir}/${zookeeper.log.file}
log4j.appender.ROLLINGFILE.MaxFileSize=${zookeeper.log.maxfilesize}
log4j.appender.ROLLINGFILE.MaxBackupIndex=${zookeeper.log.maxbackupindex}
log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n
#
# Add TRACEFILE to rootLogger to get log file output
# Log TRACE level and above messages to a log file
#
log4j.appender.TRACEFILE=org.apache.log4j.FileAppender
log4j.appender.TRACEFILE.Threshold=TRACE
log4j.appender.TRACEFILE.File=${zookeeper.tracelog.dir}/${zookeeper.tracelog.file}
log4j.appender.TRACEFILE.layout=org.apache.log4j.PatternLayout
### Notice we are including log4j's NDC here (%x)
log4j.appender.TRACEFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L][%x] - %m%n
log4j.logger.zookeeper.client=INFO
测试结果
| 参数名称 | 含义 |
|---|---|
| connectString | ZooKeeper服务器列表,由英文逗号分开的host:port字符串组成, 每一个都代表一台ZooKeeper机器,如, host1:port1,host2:port2,host3:port3。另外,也可以在connectString中设置客户端连接上ZooKeeper 后的根目录,方法是在host:port字符串之后添加上这个根目录,例 如,host1:port1,host2:port2,host3:port3/zk-base,这样就指定了该客户端连 接上ZooKeeper服务器之后,所有对ZooKeeper 的操作,都会基于这个根目录。例如,客户端对/sub-node 的操作,最终创建 /zk-node/sub-node, 这个目录也叫Chroot,即客户端隔离命名空间。 |
| sessionTimeout | 会话的超时时间,是一个以“毫秒”为单位的整型值。在ZooKeeper中有会话的概念,在一个会话周期内,ZooKeeper客户端和服务器之间会通过心跳 检测机制来维持会话的有效性,一旦在sessionTimeout时间内没有进行有效 的心跳检测,会话就会失效。 |
| watcher | ZooKeeper允许 客户端在构造方法中传入一个接口 watcher (org.apache. zookeeper. Watcher)的实现类对象来作为默认的Watcher事件通知处理器。当然,该参 数可以设置为null 以表明不需要设置默认的 Watcher处理器。 |
| canBeReadOnly | 这是一个boolean类型的参数,用于标识当前会话是否支持“read-only(只 读)”模式。默认情况下,在ZooKeeper集群中,一个机器如果和集群中过半及以上机器失去了网络连接,那么这个机器将不再处理客户端请求(包括读写请求)。但是在某些使用场景下,当ZooKeeper服务器发生此类故障的时候,我 们还是希望ZooKeeper服务器能够提供读服务(当然写服务肯定无法提供)—— 这就是 ZooKeeper的“read-only”模式。 |
| sessionId和 sessionPasswd | 分别代表会话ID和会话秘钥。这两个参数能够唯一确定一个会话,同时客户端使用这两个参数可以实现客户端会话复用,从而达到恢复会话的效果。具体使用方法是,第一次连接上ZooKeeper服务器时,通过调用ZooKeeper对象实例的以下两个接口,即可获得当前会话的ID和秘钥: long getSessionId(); byte[]getSessionPasswd( ); 荻取到这两个参数值之后,就可以在下次创建ZooKeeper对象实例的时候传入构造方法了 |
package zookeeper.client;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@Slf4j
public abstract class Standalonebase {
private static final String CONNECT_STR = "192.168.1.104:2181";
private static final int SESSION_TIMEOUT = 30 * 1000;
private static ZooKeeper zooKeeper = null;
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected
&& event.getType() == Event.EventType.None) {
countDownLatch.countDown();
log.info("连接建立");
}
}
};
@Before
public void init() {
try {
log.info(" start to connect to zookeeper server: {}", getConnectStr());
zooKeeper = new ZooKeeper(getConnectStr(), getSessionTimeout(), watcher);
log.info(" 连接中...");
countDownLatch.await();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static ZooKeeper getZooKeeper() {
return zooKeeper;
}
@After
public void test() {
try {
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
protected String getConnectStr() {
return CONNECT_STR;
}
protected int getSessionTimeout() {
return SESSION_TIMEOUT;
}
}
package zookeeper.client;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import org.junit.Test;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class AclOperations extends Standalonebase {
@Test
public void createWithAclTest1() throws KeeperException, InterruptedException {
List acLList = new ArrayList();
ACL e = new ACL();
Id m_ = new Id();
m_.setId("anyone");
m_.setScheme("world");
int perms = ZooDefs.Perms.ADMIN | ZooDefs.Perms.READ;
e.setId(m_);
e.setPerms(perms);
acLList.add(e);
String s = getZooKeeper().create("/zk-node-1", "shikaiqiang".getBytes(), acLList, CreateMode.PERSISTENT);
log.info("create path: {}",s);
}
@Test
public void createWithAclTest2() throws KeeperException, InterruptedException {
// 对连接添加授权信息
getZooKeeper().addAuthInfo("digest","u400:p400".getBytes());
List acLList = new ArrayList();
ACL e = new ACL();
Id m_ = new Id();
m_.setId("u400:p400");
m_.setScheme("auth");
int perms = ZooDefs.Perms.ADMIN | ZooDefs.Perms.READ;
e.setId(m_);
e.setPerms(perms);
acLList.add(e);
String s = getZooKeeper().create("/zk-node-2", "shikaiqiang".getBytes(), acLList, CreateMode.PERSISTENT);
log.info("create path: {}",s);
}
@Test
public void createWithAclTest3() throws KeeperException, InterruptedException {
// 对连接添加授权信息
getZooKeeper().addAuthInfo("digest","u400:p400".getBytes());
byte[] data = getZooKeeper().getData("/test", false, null);
log.info("GET_data: {}",new String(data));
}
public static void main(String[] args) throws NoSuchAlgorithmException {
String sId = DigestAuthenticationProvider.generateDigest("skq:123456");
System.out.println(sId);
// -Dzookeeper.DigestAuthenticationProvider.superDigest=gj:X/NSthOB0fD/OT6iilJ55WJVado=
}
}
package zookeeper.client;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
@Slf4j
public class baseOperations extends Standalonebase {
private String first_node = "/first-node";
@Test
public void testCreate() throws KeeperException, InterruptedException {
ZooKeeper zooKeeper = getZooKeeper();
String s = zooKeeper.create(first_node, "first".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
log.info("Create:{}", s);
}
@Test
public void testGetData() {
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getPath() != null && event.getPath().equals(first_node)
&& event.getType() == Event.EventType.NodeDataChanged) {
log.info(" PATH: {} 发现变化", first_node);
try {
byte[] data = getZooKeeper().getData(first_node, this, null);
log.info(" data: {}", new String(data));
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
try {
byte[] data = getZooKeeper().getData(first_node, watcher, null); //
log.info(" data: {}", new String(data));
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void testSetData() throws KeeperException, InterruptedException {
ZooKeeper zooKeeper = getZooKeeper();
Stat stat = new Stat();
byte[] data = zooKeeper.getData(first_node, false, stat);
// int version = stat.getVersion();
zooKeeper.setData(first_node, "third".getBytes(), 0);
}
@Test
public void testDelete() throws KeeperException, InterruptedException {
// -1 代表匹配所有版本,直接删除
// 任意大于 -1 的代表可以指定数据版本删除
getZooKeeper().delete(first_node, -1);
}
@Test
public void asyncTest() {
String userId = "xxx";
getZooKeeper().getData("/test", false, (rc, path, ctx, data, stat) -> {
Thread thread = Thread.currentThread();
log.info(" Thread Name: {}, rc:{}, path:{}, ctx:{}, data:{}, stat:{}", thread.getName(), rc, path, ctx, data, stat);
}, "test");
log.info(" over .");
}
}
Curator 是一套由netflix 公司开源的,Java 语言编程的 ZooKeeper 客户端框架,Curator项目是现在ZooKeeper 客户端中使用最多,对ZooKeeper 版本支持最好的第三方客户端,并推荐使用,Curator 把我们平时常用的很多 ZooKeeper服务开发功能做了封装,例如 Leader 选举、 分布式计数器、分布式锁。这就减少了技术人员在使用 ZooKeeper 时的大部分底层细节开发工 作。在会话重新连接、Watch 反复注册、多种异常处理等使用场景中,用原生的 ZooKeeper 处理比较复杂。而在使用 Curator 时,由于其对这些功能都做了高度的封装,使用起来更加简单,不但减少了开发时间,而且增强了程序的可靠性。
Curator 实战首先要引入Curator 框架相关的开发包,第一个是 curator-framework 包,该包是对 ZooKeeper 底层 API 的一 些封装。另一个是 curator-recipes 包,该包封装了一些 ZooKeeper 服务的高级特性,如: Cache 事件监听、选举、分布式锁、分布式 Barrier。为了方便测试引入 了junit ,lombok,由于Zookeeper本身以来了 log4j 日志框架,所以这里可以创建对应的 log4j配置文件后直接使用。
会话创建org.apache.curator curator-recipes 5.0.0 org.apache.zookeeper zookeeper org.apache.curator curator-x-discovery 5.0.0 org.apache.zookeeper zookeeper org.apache.zookeeper zookeeper 3.5.8 junit junit 4.13 org.projectlombok lombok 1.18.12 com.fasterxml.jackson.core jackson-databind 2.8.3
要进行客户端服务器交互,第一步就要创建会话 Curator 提供了多种方式创建会话,比如用静态工厂方式创建:
// 重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3) Curatorframework client = CuratorframeworkFactory.newClient(zookeeperConnectionString, retryPolicy); client.start();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
Curatorframework client = CuratorframeworkFactory.builder()
.connectString("192.168.1.104:2181")
.sessionTimeoutMs(5000) // 会话超时时间
.connectionTimeoutMs(5000) // 连接超时时间
.retryPolicy(retryPolicy)
.namespace("base") // 包含隔离名称
.build();
client.start();
这段代码的编码风格采用了流式方式,最核心的类是 Curatorframework 类,该类的作用是定 义一个 ZooKeeper 客户端对象,并在之后的上下文中使用。在定义 Curatorframework 对象 实例的时候,我们使用了 CuratorframeworkFactory 工厂方法,并指定了 connectionString 服务器地址列表、retryPolicy 重试策略 、sessionTimeoutMs 会话超时时间、 connectionTimeoutMs 会话创建超时时间。
connectionString:服务器地址列表,在指定服务器地址列表的时候可以是一个地址,也可以 是多个地址。如果是多个地址,那么每个服务器地址列表用逗号分隔, 如 host1:port1,host2:port2,host3;port3 。
retryPolicy:重试策略,当客户端异常退出或者与服务端失去连接的时候,可以通过设置客户端重新连接 ZooKeeper 服务端。而 Curator 提供了 一次重试、多次重试等不同种类的实现方式。在 Curator 内部,可以通过判断服务器返回的 keeperException 的状态代码来判断是否进行重试处理,如果返回的是 OK 表示一切操作都没有问题,而 SYSTEMERROR 表示系统或服务端错误。
| 策略名称 | 描述 |
|---|---|
| ExponentialBackoffRetry | 重试一组次数,重试之间的睡眠时间增加 |
| RetryNTimes | 重试最大次数 |
| RetryOneTime | 只重试一次 |
| RetryUntilElapsed | 在给定的时间结束之前重试 |
超时时间:Curator 客户端创建过程中,有两个超时时间的设置。一个是 sessionTimeoutMs 会话超时时间,用来设置该条会话在 ZooKeeper 服务端的失效时间。另一个是 connectionTimeoutMs 客户端创建会话的超时时间,用来限制客户端发起一个会话连接到接收 ZooKeeper 服务端应答的时间。sessionTimeoutMs 作用在服务端,而 connectionTimeoutMs 作用在客户端。
创建节点在 Curator 中,可以使用 create 函数创建数据节点,并通过 withMode 函数指定节点类型 (持久化节点,临时节点,顺序节点,临时顺序节点,持久化顺序节点等),默认是持久化节点,之后调用 forPath 函数来指定节点的路径和数据信息
guaranteed:该函数的功能如字面意思一样,主要起到一个保障删除成功的作用,其底层工作方式是:只要该客户端的会话有效,就会在后台持续发起删除请求,直到该数据节点在 ZooKeeper 服务端被删除。
deletingChildrenIfNeeded:指定了该函数后,系统在删除该数据节点的时候会以递归的方式直接删除其子节点,以及子节点的子节点。
package zookeeper.curator;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import java.util.concurrent.TimeUnit;
@Slf4j
public abstract class CuratorStandalonebase {
private static final String CONNECT_STR = "192.168.1.104:2181";
private static final int sessionTimeoutMs = 60 * 1000;
private static final int connectionTimeoutMs = 5000;
private static Curatorframework curatorframework;
@Before
public void init() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(5000, 30);
curatorframework = CuratorframeworkFactory.builder().connectString(getConnectStr())
.retryPolicy(retryPolicy)
.sessionTimeoutMs(sessionTimeoutMs)
.connectionTimeoutMs(connectionTimeoutMs)
.canBeReadOnly(true)
.build();
curatorframework.getConnectionStateListenable().addListener((client, newState) -> {
if (newState == ConnectionState.CONNECTED) {
log.info("连接成功!");
}
});
log.info("连接中......");
curatorframework.start();
}
public void createIfNeed(String path) throws Exception {
Stat stat = curatorframework.checkExists().forPath(path);
if (stat == null) {
String s = curatorframework.create().forPath(path);
log.info("path {} created! ", s);
}
}
public static Curatorframework getCuratorframework() {
return curatorframework;
}
@After
public void test() {
try {
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
protected String getConnectStr() {
return CONNECT_STR;
}
}
创建节点的方式如下面的代码所示,回顾我们之前课程中讲到的内容,描述一个节点要包括节点 的类型,即临时节点还是持久节点、节点的数据信息、节点是否是有序节点等属性和性质。
package zookeeper.curator;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.Curatorframework;
import org.apache.zookeeper.CreateMode;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class CuratorbaseOperations extends CuratorStandalonebase {
// 递归创建子节点
@Test
public void testCreateWithParent() throws Exception {
Curatorframework curatorframework = getCuratorframework();
String pathWithParent = "/node-parent/sub-node-1";
String path = curatorframework.create().creatingParentsIfNeeded().forPath(pathWithParent);
log.info("curator create node :{} successfully.", path);
}
// protection 模式,防止由于异常原因,导致僵尸节点
@Test
public void testCreate() throws Exception {
Curatorframework curatorframework = getCuratorframework();
String forPath = curatorframework
.create()
.withProtection()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL).
forPath("/curator-node", "some-data".getBytes());
log.info("curator create node :{} successfully.", forPath);
}
@Test
public void testGetData() throws Exception {
Curatorframework curatorframework = getCuratorframework();
byte[] bytes = curatorframework.getData().forPath("/test");
log.info("get data from node :{} successfully.", new String(bytes));
}
@Test
public void testSetData() throws Exception {
Curatorframework curatorframework = getCuratorframework();
curatorframework.setData().forPath("/test", "changed!".getBytes());
byte[] bytes = curatorframework.getData().forPath("/test");
log.info("get data from node /curator-node :{} successfully.", new String(bytes));
}
@Test
public void testDelete() throws Exception {
Curatorframework curatorframework = getCuratorframework();
String pathWithParent = "/node-parent";
curatorframework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);
}
@Test
public void testListChildren() throws Exception {
Curatorframework curatorframework = getCuratorframework();
String pathWithParent = "/node-parent";
List strings = curatorframework.getChildren().forPath(pathWithParent);
strings.forEach(System.out::println);
}
}
Curator 引入了BackgroundCallback 接口,用来处理服务器端返回来的信息,这个处理过程是 在异步线程中调用,默认在 EventThread 中调用,也可以自定义线程池。
public interface BackgroundCallback
{
public void processResult(Curatorframework client, CuratorEvent event) throws Exception;
}
如上接口,主要参数为client 客户端 和服务端事件event
inBackground 异步处理默认在EventThread中执行,指定线程池
@Test
public void testThreadPool() throws Exception {
Curatorframework curatorframework = getCuratorframework();
ExecutorService executorService = Executors.newSingleThreadExecutor();
String ZK_NODE = "/zk-node";
curatorframework.getData().inBackground((client, event) -> {
log.info(" background: {}", event);
}, executorService).forPath(ZK_NODE);
}
zookeeper集群&不停机动态扩容/缩容
zookeeper集群模式一共有三种类型的角色
- Leader: 处理所有的事务请求(写请求),可以处理读请求,集群中只能有一个Leader
- Follower:只能处理读请求,同时作为 Leader的候选节点,即如果Leader宕机,Follower要参与到新的Leader选举中,有可能成为新的Leader节点。
- Observer:只能处理读请求。不能参与选举
配置zoo1.cfg文件
分别配置zoo1.cfg,zoo2.cfg,zoo3.cfg,zoo4.cfg
启动4个zookeeper实例
连接实例
bin/zkCli.sh -server 192.168.1.104:2181,192.168.1.104:2182,192.168.1.104:2183,192.168.1.104:2184
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy=new ExponentialBackoffRetry( 5*1000, 10 );
String connectStr = "192.168.1.104:2181,192.168.1.104:2182,192.168.1.104:2183,192.168.1.104:2184";
Curatorframework curatorframework = CuratorframeworkFactory.newClient(connectStr, retryPolicy);
curatorframework.start();
String pathWithParent = "/test";
byte[] bytes = curatorframework.getData().forPath(pathWithParent);
System.out.println(new String(bytes));
while (true) {
try {
byte[] bytes2 = curatorframework.getData().forPath(pathWithParent);
System.out.println(new String(bytes2));
TimeUnit.SECONDS.sleep(5);
} catch (Exception e) {
e.printStackTrace();
}
}
}



