- 2021SC@SDUSC
- 类图
- ServerCnxnFactory源码分析
- 总结
服务端为每一个客户端连接维护了一个ServerCnxn来进行网络IO操作。而ZooKeeper中使用ServerCnxnFactory来管理与维护客户端的连接。
类图 ServerCnxnFactory源码分析(1)ServerCnxnFactory的属性,主要说明ServerCnxn管理器的信息。
//表明此管理器为ServerCnxnFactory
public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory";
//管理的最大连接数量
private static final String ZOOKEEPER_MAX_CONNECTION = "zookeeper.maxCnxns";
//当不存在连接时连接数量为0
public static final int ZOOKEEPER_MAX_CONNECTION_DEFAULT = 0;
//获取日志
private static final Logger LOG = LoggerFactory.getLogger(ServerCnxnFactory.class);
//说明此管理器是否启用了SSL
protected boolean secure;
static final ByteBuffer closeConn = ByteBuffer.allocate(0);
// ZooKeeper服务器接受的连接总数
protected int maxCnxns;
//SaslServer调用返回处理
protected SaslServerCallbackHandler saslServerCallbackHandler;
//登录状态
public Login login;
//zk服务器
protected ZooKeeperServer zkServer;
//登录的用户
private static String loginUser = Login.SYSTEM_USER;
(2)核心函数——抽象方法,具体让NIO和Netty实现。
//获取本地接口
public abstract int getLocalPort();
//获取连接
public abstract Iterable getConnections();
//管理器配置
public abstract void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException;
//重配置
public abstract void reconfigure(InetSocketAddress addr);
public abstract int getMaxClientCnxnsPerHost();
public abstract void setMaxClientCnxnsPerHost(int max);
//这种方法是为了保持启动(zks)的兼容性并实现zks的共享
//当我们添加secureCnxnFactory时。
public abstract void startup(ZooKeeperServer zkServer, boolean startServer) throws IOException, InterruptedException;
public abstract int getSocketListenBacklog();
//休眠
public abstract void join() throws InterruptedException;
//关机
public abstract void shutdown();
//开机
public abstract void start();
//释放所有资源
public abstract void closeAll(ServerCnxn.DisconnectReason reason);
//通过不同参数,有四种创建管理器的方法
public static ServerCnxnFactory createFactory(int clientPort, int maxClientCnxns) throws IOException {
return createFactory(new InetSocketAddress(clientPort), maxClientCnxns, -1);
}
public static ServerCnxnFactory createFactory(int clientPort, int maxClientCnxns, int backlog) throws IOException {
return createFactory(new InetSocketAddress(clientPort), maxClientCnxns, backlog);
}
public static ServerCnxnFactory createFactory(InetSocketAddress addr, int maxClientCnxns) throws IOException {
return createFactory(addr, maxClientCnxns, -1);
}
public static ServerCnxnFactory createFactory(InetSocketAddress addr, int maxClientCnxns, int backlog) throws IOException {
ServerCnxnFactory factory = createFactory();
factory.configure(addr, maxClientCnxns, backlog);
return factory;
}
//获取本地地址
public abstract InetSocketAddress getLocalAddress();
//重置所有连接数据
public abstract void resetAllConnectionStats();
//获取所有连接信息
public abstract Iterable
(3)核心函数——具体方法,确保管理器正常获取服务端的数据以及自身的服务内容。
//添加会话
public void addSession(long sessionId, ServerCnxn cnxn) {
sessionMap.put(sessionId, cnxn);
}
//添加会话
public void addSession(long sessionId, ServerCnxn cnxn) {
sessionMap.put(sessionId, cnxn);
}
//移除从会话映射的连接
public void removeCnxnFromSessionMap(ServerCnxn cnxn) {
long sessionId = cnxn.getSessionId();
if (sessionId != 0) {
sessionMap.remove(sessionId);
}
}
//关闭会话
public boolean closeSession(long sessionId, ServerCnxn.DisconnectReason reason) {
ServerCnxn cnxn = sessionMap.remove(sessionId);
if (cnxn != null) {
try {
cnxn.close(reason);
} catch (Exception e) {
LOG.warn("exception during session close", e);
}
return true;
}
return false;
}
//获取活跃的连接数量
public int getNumAliveConnections() {
return cnxns.size();
}
//获取服务器
public final ZooKeeperServer getZooKeeperServer() {
return zkServer;
}
//配置
public void configure(InetSocketAddress addr, int maxcc) throws IOException {
configure(addr, maxcc, -1);
}
public void configure(InetSocketAddress addr, int maxcc, int backlog) throws IOException {
configure(addr, maxcc, backlog, false);
}
//判断是否启用SSL
public boolean isSecure() {
return secure;
}
//为服务器启动SSL
public final void setZooKeeperServer(ZooKeeperServer zks) {
this.zkServer = zks;
if (zks != null) {
if (secure) {
zks.setSecureServerCnxnFactory(this);
} else {
zks.setServerCnxnFactory(this);
}
}
}
//创建管理器
public static ServerCnxnFactory createFactory() throws IOException {
String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
if (serverCnxnFactoryName == null) {
serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
}
try {
ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
.getDeclaredConstructor()
.newInstance();
LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
return serverCnxnFactory;
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate " + serverCnxnFactoryName, e);
throw ioe;
}
}
//用户无注册的连接
public void unregisterConnection(ServerCnxn serverCnxn) {
ConnectionBean jmxConnectionBean = connectionBeans.remove(serverCnxn);
if (jmxConnectionBean != null) {
MBeanRegistry.getInstance().unregister(jmxConnectionBean);
}
}
//用户有注册的连接
public void registerConnection(ServerCnxn serverCnxn) {
if (zkServer != null) {
ConnectionBean jmxConnectionBean = new ConnectionBean(serverCnxn, zkServer);
try {
MBeanRegistry.getInstance().register(jmxConnectionBean, zkServer.jmxServerBean);
connectionBeans.put(serverCnxn, jmxConnectionBean);
} catch (JMException e) {
LOG.warn("Could not register connection", e);
}
}
}
protected void configureSaslLogin() throws IOException {
String serverSection = System.getProperty(ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY, ZooKeeperSaslServer.DEFAULT_LOGIN_CONTEXT_NAME);
//此处的“配置”指的是javax.security.auth.login.Configuration。
AppConfigurationEntry[] entries = null;
SecurityException securityException = null;
try {
entries = Configuration.getConfiguration().getAppConfigurationEntry(serverSection);
} catch (SecurityException e) {
//如果用户不打算使用JAAS身份验证,可能是无害的。
securityException = e;
}
// jaas.conf中没有条目
// 如果在获取jaas文件和用户通过指定LOGIN_CONTEXT_NAME_KEY或jaas文件要求sasl
// 那么抛出一个异常,否则将在没有身份验证的情况下继续。
if (entries == null) {
String jaasFile = System.getProperty(Environment.JAAS_CONF_KEY);
String loginContextName = System.getProperty(ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY);
if (securityException != null && (loginContextName != null || jaasFile != null)) {
String errorMessage = "No JAAS configuration section named '" + serverSection + "' was found";
if (jaasFile != null) {
errorMessage += " in '" + jaasFile + "'.";
}
if (loginContextName != null) {
errorMessage += " But " + ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY + " was set.";
}
LOG.error(errorMessage);
throw new IOException(errorMessage);
}
return;
}
//jaas.conf条目可用
try {
saslServerCallbackHandler = new SaslServerCallbackHandler(Configuration.getConfiguration());
login = new Login(serverSection, saslServerCallbackHandler, new ZKConfig());
setLoginUser(login.getUserName());
login.startThreadIfNeeded();
} catch (LoginException e) {
throw new IOException("Could not configure server because SASL configuration did not allow the "
+ " ZooKeeper server to authenticate itself properly: "
+ e);
}
}
//创建此方法是为了避免 ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD 找到bug问题
private static void setLoginUser(String name) {
loginUser = name;
}
public static String getUserName() {
return loginUser;
}
public int getMaxCnxns() {
return maxCnxns;
}
//初始化最大连接数
protected void initMaxCnxns() {
maxCnxns = Integer.getInteger(ZOOKEEPER_MAX_CONNECTION, ZOOKEEPER_MAX_CONNECTION_DEFAULT);
if (maxCnxns < 0) {
maxCnxns = ZOOKEEPER_MAX_CONNECTION_DEFAULT;
LOG.warn("maxCnxns should be greater than or equal to 0, using default vlaue {}.",
ZOOKEEPER_MAX_CONNECTION_DEFAULT);
} else if (maxCnxns == ZOOKEEPER_MAX_CONNECTION_DEFAULT) {
LOG.warn("maxCnxns is not configured, using default value {}.",
ZOOKEEPER_MAX_CONNECTION_DEFAULT);
} else {
LOG.info("maxCnxns configured value is {}.", maxCnxns);
}
}
protected boolean limitTotalNumberOfCnxns() {
if (maxCnxns <= 0) {
// maxCnxns limit is disabled
return false;
}
int cnxns = getNumAliveConnections();
if (cnxns >= maxCnxns) {
LOG.error("Too many connections " + cnxns + " - max is " + maxCnxns);
return true;
}
return false;
}
总结
Zookeeper作为一个服务器,自然要与客户端进行网络通信,如何高效的与客户端进行通信,让网络IO不成为ZooKeeper的瓶颈是ZooKeeper急需解决的问题。
ZooKeeper中使用ServerCnxnFactory管理与客户端的连接,其有两个实现,一个是NIOServerCnxnFactory,使用Java原生NIO实现;一个是NettyServerCnxnFactory,使用netty实现。



