栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

深夜读码-zookeeper 之SyncRequestProcessor 小代码 大优雅

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

深夜读码-zookeeper 之SyncRequestProcessor 小代码 大优雅

引言

zookeeper 的业务处理流程就像工作流一样,其实就是一个单链表;在zookeeper启动的时候,会确立各个节点的角色特性,即leader、follower和observer,每个角色确立后,就会初始化它的工作责任链;


本篇要分享的是 zookeeper的源码分析之SyncRequestProcessor处理器,其目的是进行持久化,也就是将消息存储到磁盘文件中;代码不多,但有不少值得借鉴的地方;

主要成员变量
代码一

private final linkedBlockingQueue queuedRequests =
 new linkedBlockingQueue();
private final linkedList toFlush = new linkedList();
private final RequestProcessor nextProcessor;
private static int snapCount = ZooKeeperServer.getSnapCount();
private static int randRoll;
queuedRequests:

在zookeeper中各个工作责任链之间进行消息通信的是通过linkedBlockingQueue 来进行线程间信息交互的;queuedRequests就是SyncRequestProcessor和上一责任链之间进行消息交互的队列;

toFlush :

待flush到磁盘的事务日志消息容器,包括增、删、改消息,查询类消息不进入该容器;

snapCount:

生成snapshot的事务记录参数值,可在zoo.cfg中进行配置,即事务日志记录数大于等于snapCount(其具体算法在下面进行探讨,这里先这样记录)的时候,进行snapshot文件的生成;

randRoll:

生成snapshot的随机值,和snapcount配合使用;

业务处理

由于SyncRequestProcessor是继承自ZooKeeperThread,所以它的主要逻辑是在run函数中,直接进入run函数中;

代码二

setRandRoll(r.nextInt(snapCount/2));
while (true) {
    Request si = null;
    if (toFlush.isEmpty()) {
 si = queuedRequests.take();
    } else {
 si = queuedRequests.poll();
 if (si == null) {
     flush(toFlush);
     continue;
 }
    }

zookeeper没有直接采用queuedRequests.take()进行消息接收,而是采用了两种方式take()和poll();take函数会等待直至消息的到来;而poll()则是如果没有消息,就会立即返回null;

zookeeper为什么这样设计,先抛个问题,我们先看下面的逻辑,然后再回答这个问题;
代码三

if (si != null)
	if (LOG.isDebugEnabled()) {
 LOG.debug("{}",si);
 LOG.debug("toFlush .size = "+ toFlush.size());
    }
	
    // track the number of records written to the log
    if (zks.getZKDatabase().append(si)) { 
 logCount++;
 if (logCount > (snapCount / 2 + randRoll)) {
     setRandRoll(r.nextInt(snapCount/2));
     // roll the log
     zks.getZKDatabase().rollLog();
     // take a snapshot
     if (snapInProcess != null && snapInProcess.isAlive()) {
  LOG.warn("Too busy to snap, skipping");
     } else {
  snapInProcess = new ZooKeeperThread("Snapshot Thread") {
   public void run() {
try {
    zks.takeSnapshot();
} catch(Exception e) {
    LOG.warn("Unexpected exception", e);
}
   }
      };
  snapInProcess.start();
     }
     logCount = 0;
 }
    } else if (toFlush.isEmpty()) {
 // optimization for read heavy workloads
 // iff this is a read, and there are no pending
 // flushes (writes), then just pass this to the next
 // processor
 if (nextProcessor != null) {
     nextProcessor.processRequest(si);
     if (nextProcessor instanceof Flushable) {
  ((Flushable)nextProcessor).flush();
     }
 }
 continue;
    }
    toFlush.add(si);
    if (toFlush.size() > 1000) {
 flush(toFlush);
    }
}   
zookeeper的两种持久化方式,一种是进行增量事务日志,一种是snapshot文件;增量事务日志就是将所有的事务操作记录下来;而snapshot文件就是把内存中的数据进行全量备份下来;

SyncRequestProcessor 先调用了zks.getZKDatabase().append(si),该函数是将事务日志
记录下来,如果是事务类消息,即增删改,则返回true;如果是查询类消息,就返回false;当返回true的时候,即记录事物日志,这时候做了一个判断 if (logCount > (snapCount / 2 + randRoll)) ;SyncRequestProcessor 并没有直接进行logCount 和snapCount 的判断 ,即logCount >snapCount ;而是生成了一个随机数,其目的主要是考虑到在zookeeper集群中,各个节点的内存数据在某一时刻是基本一致的,如果都是进行logCount >snapCount ,就生成snapshot,势必导致zookeeper集群中各个节点在某一时刻,都会去进行snapshot,因为磁盘io操作总是相对较慢的,所以会导致节点都忙于刷磁盘文件了,系统负载会增加上去,那么对外的服务就会受到影响;所以这里采用logCount > (snapCount / 2 + randRoll)一个随机数和logCount的比较,是一种全局观,有一定的规划思想在里面;

那么当zks.getZKDatabase().append(si)返回为false的时候,则判断了toFlush.isEmpty(),其实这也就是非事务消息的逻辑,当该消息是非事务消息,即查询类消息时候,则直接进行nextProcessor的处理,处理完就进行continue;

只有事务消息才会进入toFlush,也就是toFlush.add(si)的逻辑;后续有一个flush函数,我们来看flush的函数都做了什么;

private void flush(linkedList toFlush)
 throws IOException, RequestProcessorException
    {
 if (toFlush.isEmpty())
     return;

 zks.getZKDatabase().commit();
 while (!toFlush.isEmpty()) {
     Request i = toFlush.remove();
     if (nextProcessor != null) {
  nextProcessor.processRequest(i);
     }
 }
 if (nextProcessor != null && nextProcessor instanceof Flushable) {
     ((Flushable)nextProcessor).flush();
 }
    }

这段代码的主要功能
1、zks.getZKDatabase().commit(); 消息进行刷盘至磁盘文件中;
2、将消息递交下一处理链;

现在返回到上一个问题,从上一个处理链中为什么要采用两种方式take()和poll()获取消息呢;

其实也是考虑到服务的负载问题,

1 queuedRequests队列没有消息,而toFlush里也没有消息,直接采用take函数获取,这时候说明应用的负载轻,线程处于阻塞等待,也就是直接将该线程挂起,减轻负载; 2 queuedRequests队列没有消息,而toFlush里面有消息,这说明当前系统已经空闲了,那么要把之前还没有返回给客户端的消息处理完,那么就是调用flush函数; 3 queuedRequests队列一直有消息,toFlush里面也有消息,这说明当前系统负载过重,但是还是需要在一定量的时候(toFlush.size() > 1000)将消息响应返回给客户端,同时把收到的事务日志消息进行flush,避免写日志没刷新到磁盘中;

好了,看到这里,或许大家已经明白了,希望大家有所收获,欢迎大家一块探讨zookeeper的问题,或者 zookeeper的源码分析!

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号