栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

全站最硬核 百万字强肝RocketMq源码 火热更新中~(一)

全站最硬核 百万字强肝RocketMq源码 火热更新中~(一)

文章目录

总起namesrv

org.apache.rocketmq.namesrv.kvconfig.KVConfigManagerorg.apache.rocketmq.logging.InternalLoggerFactoryorg.apache.rocketmq.logging.inner.Loggerorg.apache.rocketmq.logging.inner.Logger.DefaultLoggerRepositoryorg.apache.rocketmq.logging.inner.Levelorg.apache.rocketmq.logging.inner.Appender.AppenderPipelineImplorg.apache.rocketmq.logging.inner.Appenderorg.apache.rocketmq.logging.inner.LoggingEventorg.apache.rocketmq.namesrv.NamesrvControllerorg.apache.rocketmq.remoting.RemotingServerorg.apache.rocketmq.remoting.netty.NettyRemotingServerorg.apache.rocketmq.remoting.netty.NettyServerConfigorg.apache.rocketmq.remoting.netty.TlsHelperorg.apache.rocketmq.remoting.netty.ResponseFutureorg.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnceorg.apache.rocketmq.remoting.RPCHookorg.apache.rocketmq.remoting.exception.RemotingTimeoutExceptionorg.apache.rocketmq.remoting.netty.NettyRemotingServer.HandshakeHandlerorg.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandlerorg.apache.rocketmq.remoting.common.Pairorg.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyConnectManageHandlerorg.apache.rocketmq.common.Configurationorg.apache.rocketmq.common.DataVersion

总起

RocketMq消息中间件,真正的源码级学习。从第一个类到最后一个类,一段一段读。
解释下这个结构,我是从按顺序,从第一个类开始读,每读到一个不熟悉的类,就点进去全部读完,再继续。
比如读类A,其中引用了类B,而类B我没有读过,那么我去把类B全部读完,再回到类A继续读。
下面可以看到这种读法了,办法比较笨,就是一行一行硬度,但是我相信
这样做是值得的,也能真正有收获。

namesrv org.apache.rocketmq.namesrv.kvconfig.KVConfigManager

从头开始的第一个类

org.apache.rocketmq.logging.InternalLoggerFactory

代码入口首先是日志工具org.apache.rocketmq.logging.InternalLoggerFactory

InternalLoggerFactory是一个抽象类,有两个实现类:InnerLoggerFactory和Slf4jLoggerFactory

内部有两个重要内容,一个是InternalLoggerFactory,另一个是InternalLogger

还有一个ConcurrentHashMap类型的缓存:loggerFactoryCache key是唯一标识,value是InternalLoggerFactory,也就是这个类本身。

LoggerFactory的获取,优先返回用户设置的loggerType,其次是默认的(slf4j),最后是(inner),如果全部初始化失败则抛出异常。

注意一个细节:InternalLoggerFactory的构造方法非常简洁,抽取成了一个私有方法:

protected void doRegister() {
    String loggerType = getLoggerType();
    if (loggerFactoryCache.get(loggerType) != null) {
        return;
    }
    loggerFactoryCache.put(loggerType, this);
}
// 方法的作用就是把当前日志工厂塞入缓存

抽象方法:

protected abstract String getLoggerType();

作用就是返回当前日志工厂的类型(slf4j或者inner)

一个比较重要的方法是:

protected abstract InternalLogger getLoggerInstance(String name);

获取打印日志的实例InternalLogger

因为实际打印日志时,使用的就是InternalLogger实例。下面去看一下getLoggerInstance(String name)的一个实现类:slf4j中的实现

Slf4jLoggerFactory中的getLoggerInstance所获取的InternalLogger,其实就是对org.slf4j.LoggerFactory的一个封装

public Slf4jLogger(String name) {
    logger = LoggerFactory.getLogger(name);
}

可以看到,没有做任何处理。

再看下getLoggerInstance(String name)的一个实现类:InnerLoggerFactory中的实现

同样是看从哪里获取的InnerLogger

public InnerLogger(String name) {
    logger = Logger.getLogger(name);
}
org.apache.rocketmq.logging.inner.Logger

这里的Logger的全名是:org.apache.rocketmq.logging.inner.Logger 也就是说这是rocketmq自己开发的Logger

org.apache.rocketmq.logging.inner.Logger.DefaultLoggerRepository

进来之后首先声明一个DefaultLoggerRepository

private static final DefaultLoggerRepository REPOSITORY = new DefaultLoggerRepository(new RootLogger(Level.DEBUG));

DefaultLoggerRepository是Logger的内部类,底层主要是由HashTable实现

final Hashtable ht = new Hashtable();

key为CategoryKey(String类型的name + name的hashcode),value为Object

org.apache.rocketmq.logging.inner.Level

内部有一个对日志等级的定义:org.apache.rocketmq.logging.inner.Level

对外暴露三个属性:level,levelStr,syslogEquivalent

transient int level;
transient String levelStr;
transient int syslogEquivalent;

为防止被序列化,加了transient关键字,内部定义了六种日志级别:

final static public Level OFF = new Level(OFF_INT, OFF_NAME, 0);

final static public Level ERROR = new Level(ERROR_INT, ERROR_NAME, 3);

final static public Level WARN = new Level(WARN_INT, WARN_NAME, 4);

final static public Level INFO = new Level(INFO_INT, INFO_NAME, 6);

final static public Level DEBUG = new Level(DEBUG_INT, DEBUG_NAME, 7);

final static public Level ALL = new Level(ALL_INT, ALL_NAME, 7);

值得注意的是这个toLevel方法:

public static Level toLevel(String sArg, Level defaultLevel) {
    if (sArg == null) {
        return defaultLevel;
    }
    String s = sArg.toUpperCase();

    if (s.equals(ALL_NAME)) {
        return Level.ALL;
    }
    if (s.equals(DEBUG_NAME)) {
        return Level.DEBUG;
    }
    if (s.equals(INFO_NAME)) {
        return Level.INFO;
    }
    if (s.equals(WARN_NAME)) {
        return Level.WARN;
    }
    if (s.equals(ERROR_NAME)) {
        return Level.ERROR;
    }
    if (s.equals(OFF_NAME)) {
        return Level.OFF;
    }
    if (s.equals(INFO_NAME)) {
        return Level.INFO;
    }
    return defaultLevel;
}

String s = sArg.toUpperCase(); 这个就很用户友好了,也就是说获取日志级别,用户输入大小写都是兼容的。

此时我们再次回到DefaultLoggerRepository,此时我们就明白了,DefaultLoggerRepository其实就是Logger内部用来做缓存的内部类,底层由Hashtable实现,并对Hashtable使用synchronized关键字做了并发处理。

回到Logger,继续往下走,又看到

Appender.AppenderPipelineImpl appenderPipeline;

org.apache.rocketmq.logging.inner.Appender.AppenderPipelineImpl

从翻译看,这个叫追加器管道,是AppenderPipeline的实现类,下面是这个接口的定义:

public interface AppenderPipeline {

    void addAppender(Appender newAppender);

    Enumeration getAllAppenders();

    Appender getAppender(String name);

    boolean isAttached(Appender appender);

    void removeAllAppenders();

    void removeAppender(Appender appender);

    void removeAppender(String name);
}

进入实现类AppenderPipelineImpl可以看到,其实这个管道,就是一个元素类型为Appender的Vector集合

public static class AppenderPipelineImpl implements AppenderPipeline {


    protected Vector appenderList;

附:Vector,与ArrayList、linkedList相比,特点在于线程安全。多线程环境推荐使用。

org.apache.rocketmq.logging.inner.Appender
public abstract class Appender {

    public static final int CODE_WRITE_FAILURE = 1;
    public static final int CODE_FLUSH_FAILURE = 2;
    public static final int CODE_CLOSE_FAILURE = 3;
    public static final int CODE_FILE_OPEN_FAILURE = 4;

    public final static String LINE_SEP = System.getProperty("line.separator");

    boolean firstTime = true;

    protected Layout layout;

    protected String name;

    protected boolean closed = false;

可以看到Appender内部最重要的就是这个Layout,翻译为分层。

最重要的方法是

doAppend( )

public synchronized void doAppend(LoggingEvent event) {
    if (closed) {
        SysLogger.error("Attempted to append to closed appender named [" + name + "].");
        return;
    }
    this.append(event);
}

会往Appender内继续追加日志事件LoggingEvent,再看下日志事件LoggingEvent是什么

org.apache.rocketmq.logging.inner.LoggingEvent

可以看到这就是一个比较具体的“日志”了,内部包含了一条日志所应该包含的信息:

public class LoggingEvent implements java.io.Serializable {

    transient public final String fqnOfCategoryClass;

    transient private Object message;

    transient private Level level;

    transient private Logger logger;

    private String renderedMessage;

    private String threadName;

    public final long timeStamp;

    private Throwable throwable;

比较特别的是日志对象里还包含了打印日志的工具Logger和线程名threadName

从日志体出来,回到Logger,往下走看到两个获取Logger的方法:

static public Logger getLogger(Class clazz) {
    return getRepository().getLogger(clazz.getName());
}

public static Logger getRootLogger() {
    return getRepository().getRootLogger();
}

也就是说在之前提到的DefaultLoggerRepository中,有两类Logger,除了普通的Logger以外,还有一个根日志器rootLogger,当然进入底层可以看到其实rootLogger也是一个普通的Logger,只不过命名为root

public Logger getRootLogger() {
    return root;
}
public static class RootLogger extends Logger {

    public RootLogger(Level level) {
        super("root");
        setLevel(level);
    }
}

这里有一个重要的方法:callAppenders

public void callAppenders(LoggingEvent event) {
    int writes = 0;

    for (Logger logger = this; logger != null; logger = logger.parent) {
        synchronized (logger) {
            if (logger.appenderPipeline != null) {
                writes += logger.appenderPipeline.appendLoopOnAppenders(event);
            }
            if (!logger.additive) {
                break;
            }
        }
    }

    if (writes == 0) {
        getRepository().emitNoAppenderWarning(this);
    }
}

可以看到这个日志,核心是把Logger都塞到管道appenderPipeline里面。而这个起始的变量writes,就是记录appenderPipeline中logger总数的值,当总数为0时会抛出告警。

public void debug(Object message) {
    if (getRepository().isDisabled(Level.DEBUG_INT)) {
        return;
    }
    if (Level.DEBUG.isGreaterOrEqual(this.getEffectiveLevel())) {
        forcedLog(FQCN, Level.DEBUG, message, null);
    }
}


public void debug(Object message, Throwable t) {
    if (getRepository().isDisabled(Level.DEBUG_INT)) {
        return;
    }
    if (Level.DEBUG.isGreaterOrEqual(this.getEffectiveLevel())) {
        forcedLog(FQCN, Level.DEBUG, message, t);
    }
}


public void error(Object message) {
    if (getRepository().isDisabled(Level.ERROR_INT)) {
        return;
    }
    if (Level.ERROR.isGreaterOrEqual(this.getEffectiveLevel())) {
        forcedLog(FQCN, Level.ERROR, message, null);
    }
}

public void error(Object message, Throwable t) {
    if (getRepository().isDisabled(Level.ERROR_INT)) {
        return;
    }
    if (Level.ERROR.isGreaterOrEqual(this.getEffectiveLevel())) {
        forcedLog(FQCN, Level.ERROR, message, t);
    }

}


protected void forcedLog(String fqcn, Level level, Object message, Throwable t) {
    callAppenders(new LoggingEvent(fqcn, this, level, message, t));
}

可以看到debug,error级别的打印日志,底层都是调用了callAppenders这个方法,而callAppenders追到最下面,就是调用了Appender的doAppend方法,也就是向appenderPipeline管道里面追加一个LoggerEvent。

public void warn(Object message) {
    if (getRepository().isDisabled(Level.WARN_INT)) {
        return;
    }

    if (Level.WARN.isGreaterOrEqual(this.getEffectiveLevel())) {
        forcedLog(FQCN, Level.WARN, message, null);
    }
}

public void warn(Object message, Throwable t) {
    if (getRepository().isDisabled(Level.WARN_INT)) {
        return;
    }
    if (Level.WARN.isGreaterOrEqual(this.getEffectiveLevel())) {
        forcedLog(FQCN, Level.WARN, message, t);
    }
}

可以看到info级别的日志,warn级别的日志,也都是一样的。

public void info(Object message) {
    if (getRepository().isDisabled(Level.INFO_INT)) {
        return;
    }
    if (Level.INFO.isGreaterOrEqual(this.getEffectiveLevel())) {
        forcedLog(FQCN, Level.INFO, message, null);
    }
}

public void info(Object message, Throwable t) {
    if (getRepository().isDisabled(Level.INFO_INT)) {
        return;
    }
    if (Level.INFO.isGreaterOrEqual(this.getEffectiveLevel())) {
        forcedLog(FQCN, Level.INFO, message, t);
    }
}

至此我们基本看完了InternalLoggerFactory类,回到起点KVConfigManager

private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

可以看到我们其实才读了一行代码,在这里KVConfigManager的头部,声明了log,LoggerName为“RocketmqNamesrv”,这也对应我们当前在读的总包,是在namesrv包下。

到了第二行:

private final NamesrvController namesrvController;

这里声明了一个NamesrvController,然后接下来我们去看下这里面做了哪些事。

org.apache.rocketmq.namesrv.NamesrvController
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    private final NamesrvConfig namesrvConfig;

    private final NettyServerConfig nettyServerConfig;

    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "NSScheduledThread"));
    private final KVConfigManager kvConfigManager;
    private final RouteInfoManager routeInfoManager;

进来之后首先是之前我们已经看过的InternalLogger,与KVConfigManager里声明的一模一样。接着下面是两个Config,内部就是一些常规的配置参数,不过并没有使用注解开发,全部是原生的get set方法。

还声明了一个单线程实例的ExecutorService

进入第一个比较重要的接口:

org.apache.rocketmq.remoting.RemotingServer

RemotingServer是一个接口,我们直接进入它的唯一实现类看看:

org.apache.rocketmq.remoting.netty.NettyRemotingServer
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);

此处的日志工具的LoggerName是RocketmqRemoting了,因为此时其实已经调到了remoting包下,不在namesrv包了。

先不追究RemotingHelper,继续读NettyRemotingServer

private final ServerBootstrap serverBootstrap;
private final EventLoopGroup eventLoopGroupSelector;
private final EventLoopGroup eventLoopGroupBoss;
private final NettyServerConfig nettyServerConfig;

private final ExecutorService publicExecutor;
private final ChannelEventListener channelEventListener;

private final Timer timer = new Timer("ServerHouseKeepingService", true);
private DefaultEventExecutorGroup defaultEventExecutorGroup;

接下来声明的,是RocketMq中为数不多依赖的东西,netty中的ServerBootstrap,EventLoopGroup,EventLoopGroup,ChannelEventListener,DefaultEventExecutorGroup,使用过netty的同学应该对这几个类并不陌生,我们先不去看netty的源码了,一会看他怎么用。

除这几个netty的类以外,还有一个声明为守护进程的Timer,一个来自JUC包下提交异步任务的ExecutorService

private HandshakeHandler handshakeHandler;
private NettyEncoder encoder;
private NettyConnectManageHandler connectionManageHandler;
private NettyServerHandler serverHandler;

接下来还是netty的handler等相关类的声明

public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
    this(nettyServerConfig, null);
}

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
    final ChannelEventListener channelEventListener) {

继续往下是NettyRemotingServer的两个构造方法,一个是没有channelEventListener的构造方法,另一个有

构造方法首先声明两个信号量,一个是单向信号量容量,一个异步信号量容量

super(nettyServerConfig.getServerOnewaySemaphorevalue(), nettyServerConfig.getServerAsyncSemaphorevalue());
// super如下:(非源码,这两段是拼在一起的)
    public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) {
        this.semaphoreoneway = new Semaphore(permitsOneway, true);
        this.semaphoreAsync = new Semaphore(permitsAsync, true);
    }

在NettyServerConfig中的配置分别是:单向信号量256,异步信号量64

org.apache.rocketmq.remoting.netty.NettyServerConfig
private int listenPort = 8888;
private int serverWorkerThreads = 8;
private int serverCallbackExecutorThreads = 0;
private int serverSelectorThreads = 3;
private int serveronewaySemaphorevalue = 256;
private int serverAsyncSemaphorevalue = 64;
private int serverChannelMaxIdleTimeSeconds = 120;

继续往下创建一个线程池:publicExecutor

this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
    private AtomicInteger threadIndex = new AtomicInteger(0);

    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
    }
});

内置了一个原子变量,对线程名做了一个重写。

根据是否轮询

useEpoll( )

提供了不同的eventLoopGroupBoss和eventLoopGroupSelector。

对于轮询的模式,使用了EpollEventLoopGroup,非轮询的模式使用了NioEventLoopGroup

public void loadSslContext() {
    TlsMode tlsMode = TlsSystemConfig.tlsMode;
    log.info("Server is running in TLS {} mode", tlsMode.getName());

    if (tlsMode != TlsMode.DISABLED) {
        try {
            sslContext = TlsHelper.buildSslContext(false);
            log.info("SSLContext created for server");
        } catch (CertificateException e) {
            log.error("Failed to create SSLContext for server", e);
        } catch (IOException e) {
            log.error("Failed to create SSLContext for server", e);
        }
    }
}

之后就是构建SslContext了,而sslContext这个类属于netty内部的,就不去深究了。这里普及下ssl和tls分别是什么:

SSL(Secure Sockets Layer 安全套接字协议),及其继任者传输层安全(Transport Layer Security,TLS)是为网络通信提供安全及数据完整性的一种安全协议。TLS与SSL在传输层与应用层之间对网络连接进行加密。

也就是两种网络协议。

而构建sslContext的方法在TlsHelper中,下面看下这个方法做了什么:

org.apache.rocketmq.remoting.netty.TlsHelper

这个类中最主要的就是buildSslContext方法,所以看这个方法顺便把这个类过一遍。

入口如下:

    public static SslContext buildSslContext(boolean forClient) throws IOException, CertificateException {
        File configFile = new File(TlsSystemConfig.tlsConfigFile);
        extractTlsConfigFromFile(configFile);
        logTheFinalUsedTlsConfig();

可以看到首先是从TlsSystemConfig中读取配置文件路径,解析配置文件,然后打印配置文件内容中的重要信息日志,进入

extractTlsConfigFromFile(configFile)

看下

除了配置文件判空外,比较核心的代码就是下面这段:

Properties properties;
properties = new Properties();
InputStream inputStream = null;
try {
    inputStream = new FileInputStream(configFile);
    properties.load(inputStream);
} catch (IOException ignore) {
} finally {
    if (null != inputStream) {
        try {
            inputStream.close();
        } catch (IOException ignore) {
        }
    }
}

使用io流从configFile中读取参数信息,再下面就是赋值了

tlsTestModeEnable = Boolean.parseBoolean(properties.getProperty(TLS_TEST_MODE_ENABLE, String.valueOf(tlsTestModeEnable)));
tlsServerNeedClientAuth = properties.getProperty(TLS_SERVER_NEED_CLIENT_AUTH, tlsServerNeedClientAuth);
tlsServerKeyPath = properties.getProperty(TLS_SERVER_KEYPATH, tlsServerKeyPath);
tlsServerKeyPassword = properties.getProperty(TLS_SERVER_KEYPASSWORD, tlsServerKeyPassword);
tlsServerCertPath = properties.getProperty(TLS_SERVER_CERTPATH, tlsServerCertPath);
tlsServerAuthClient = Boolean.parseBoolean(properties.getProperty(TLS_SERVER_AUTHCLIENT, String.valueOf(tlsServerAuthClient)));
tlsServerTrustCertPath = properties.getProperty(TLS_SERVER_TRUSTCERTPATH, tlsServerTrustCertPath);

tlsClientKeyPath = properties.getProperty(TLS_CLIENT_KEYPATH, tlsClientKeyPath);
tlsClientKeyPassword = properties.getProperty(TLS_CLIENT_KEYPASSWORD, tlsClientKeyPassword);
tlsClientCertPath = properties.getProperty(TLS_CLIENT_CERTPATH, tlsClientCertPath);
tlsClientAuthServer = Boolean.parseBoolean(properties.getProperty(TLS_CLIENT_AUTHSERVER, String.valueOf(tlsClientAuthServer)));
tlsClientTrustCertPath = properties.getProperty(TLS_CLIENT_TRUSTCERTPATH, tlsClientTrustCertPath);

从配置文件获取信息之后,就是根据配置文件的内容来构建sslContext了,这里由于对网络协议不是很熟,就不分析了。

调用tlsHelper构建好sslContext后,就开始进入NettyRemotingServer的

start()

方法了,这是很重要的方法,用过netty框架的同学应该知道,这其实就相当于我们自己写的netty服务端的启动部分。

@Override
public void start() {
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
        nettyServerConfig.getServerWorkerThreads(),
        new ThreadFactory() {

            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
            }
        });

进来后首先是创建一个DefaultEventExecutorGroup对象,内部设置线程数和线程工厂,线程工厂里重写了所生成线程的名称。

ServerBootstrap childHandler =
    this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
        .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 1024)
        .option(ChannelOption.SO_REUSEADDR, true)
        .option(ChannelOption.SO_KEEPALIVE, false)
        .childOption(ChannelOption.TCP_NODELAY, true)
        .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
        .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
        .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
        .childHandler(new ChannelInitializer() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline()
                    .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                    .addLast(defaultEventExecutorGroup,
                        encoder,
                        new NettyDecoder(),
                        new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                        connectionManageHandler,
                        serverHandler
                    );
            }
        });

可以看到是很常见的netty服务端ServerBootstrap的初始化方式

try {
    ChannelFuture sync = this.serverBootstrap.bind().sync();
    InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
    this.port = addr.getPort();
} catch (InterruptedException e1) {
    throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}

if (this.channelEventListener != null) {
    this.nettyEventExecutor.start();
}

绑定端口,启动服务端。

启动后接下来是一个定时任务:

this.timer.scheduleAtFixedRate(new TimerTask() {

    @Override
    public void run() {
        try {
            NettyRemotingServer.this.scanResponseTable();
        } catch (Throwable e) {
            log.error("scanResponseTable exception", e);
        }
    }
}, 1000 * 3, 1000);

可以看到是3秒后执行,之后每过1秒再执行一次。执行的内容是scanResponseTable,下面看下这个任务:

首先是这个方法的注释:This method is periodically invoked to scan and expire deprecated request. 指的是此方法用于扫描和过期(动词,使过期)请求。下面是此方法的源码:

public void scanResponseTable() {
    final List rfList = new linkedList();
    Iterator> it = this.responseTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry next = it.next();
        ResponseFuture rep = next.getValue();

        if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
            rep.release();
            it.remove();
            rfList.add(rep);
            log.warn("remove timeout request, " + rep);
        }
    }

    for (ResponseFuture rf : rfList) {
        try {
            executeInvokeCallback(rf);
        } catch (Throwable e) {
            log.warn("scanResponseTable, operationComplete Exception", e);
        }
    }
}

第一部分代码到while循环结束,可以看到是比较好理解的,遍历responseTable,并对其中已经超过过期时间1秒钟的响应进行删除,且会把过期的响应加入到刚创建的rfList中

也就是说每秒钟对responseTable中的请求进行一次过滤,并把所有当次清除的响应进行一次回调:

executeInvokeCallback(rf)

接下来看一下回调里面干了什么。

 

先看下方法注释:

先在回调执行器里执行回调方法,如果回调方法为空,则直接在当前线程中运行方法。

private void executeInvokeCallback(final ResponseFuture responseFuture) {
    boolean runInThisThread = false;
    ExecutorService executor = this.getCallbackExecutor();
    if (executor != null) {
        try {
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        responseFuture.executeInvokeCallback();
                    } catch (Throwable e) {
                        log.warn("execute callback in executor exception, and callback throw", e);
                    } finally {
                        responseFuture.release();
                    }
                }
            });
        } catch (Exception e) {
            runInThisThread = true;
            log.warn("execute callback in executor exception, maybe executor busy", e);
        }
    } else {
        runInThisThread = true;
    }

    if (runInThisThread) {
        try {
            responseFuture.executeInvokeCallback();
        } catch (Throwable e) {
            log.warn("executeInvokeCallback Exception", e);
        } finally {
            responseFuture.release();
        }
    }
}

可以看到这个回调执行器其实指的就是一个异步线程池,所以通过看源码,上面的注释我们可以这样理解:

如果异步线程池存在,则提交异步任务执行内置回调方法,如果异步线程池不存在,则当前线程直接执行。接下来我们看下所谓执行的回调方法内部是干了什么:

这个回调方法是写在responseFuture中的,所以我们直接把这个类通读下:

org.apache.rocketmq.remoting.netty.ResponseFuture
private final int opaque;
private final Channel processChannel;
private final long timeoutMillis;
private final InvokeCallback invokeCallback;
private final long beginTimestamp = System.currentTimeMillis();
private final CountDownLatch countDownLatch = new CountDownLatch(1);

private final SemaphoreReleaseOnlyOnce once;

private final AtomicBoolean executeCallbackonlyOnce = new AtomicBoolean(false);
private volatile RemotingCommand responseCommand;
private volatile boolean sendRequestOK = true;
private volatile Throwable cause;

首先是一些声明的变量。值得注意的是 一个只能释放一次的信号once和一个原子布尔变量executeCallbackOnlyOnce,含义是该回调方法只能被调用一次。关于SemaphoreReleaseOnlyOnce,过一下这个类:

org.apache.rocketmq.remoting.common.SemaphoreReleaseonlyOnce

这个类的内容本身不是很复杂,主要是对Semaphore做了一层包装:

private final AtomicBoolean released = new AtomicBoolean(false);
private final Semaphore semaphore;

public SemaphoreReleaseOnlyOnce(Semaphore semaphore) {
    this.semaphore = semaphore;
}

public void release() {
    if (this.semaphore != null) {
        if (this.released.compareAndSet(false, true)) {
            this.semaphore.release();
        }
    }
}

public Semaphore getSemaphore() {
    return semaphore;
}

代码量不大,主要是一个标记为只能释放一次的原子布尔变量,和一个release方法,逻辑就是如果没有被release的情况下,才能release

Semaphore是jdk1.5之后新增加的一个类,翻译为信号,但其实可以理解为一个锁,这个不属于rocketMQ源码,这里就不深究了,简单说就是必须从Semaphore中获取到一个锁,才能执行对应方法,释放后,其他线程才能获取该锁(当然允许同时执行的线程数是自定义的)。

而这里可以看到SemaphoreReleaseOnlyOnce,回调方法是只允许执行一次的,

再回到ResponseFuture,我们就明白了,也就是说为了保证回调方法只执行一次,作者做了两道限制:

一个是executeCallbackOnlyOnce原子布尔变量,一个是SemaphoreReleaseOnlyOnce互斥锁

除了

executeInvokeCallback()

方法外,再往下看:

public boolean isTimeout() {
    long diff = System.currentTimeMillis() - this.beginTimestamp;
    return diff > this.timeoutMillis;
}

判断超时,这个方法很简单,判断当前时间减去开始时间是否比超时时间长。

public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
    this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
    return this.responseCommand;
}

public void putResponse(final RemotingCommand responseCommand) {
    this.responseCommand = responseCommand;
    this.countDownLatch.countDown();
}

接下来是一组方法,一个等待响应,一个塞响应。加入putResponse被调用,responseCommand有了值,则计时器会被立即停止。waitResponse也会立即返回。CountDownLatch是jdk1.5开始引入的一个计时器,作用其实就是阻塞线程一段时间。

private final CountDownLatch countDownLatch = new CountDownLatch(1);

计时器同样也只允许被调用一次,也就是说等待响应----获取响应 这个过程对单个对象来说只允许一次。

通读下来ResponseFuture有一个特点就是:一次性。不论是获取响应还是执行回调方法,单个responseFuture都是严格只允许执行一次的。

搞清这个回调方法,我们退回到起点,也就是NettyRemotingServer的服务端启动的地方,也就是在启动服务端后,对所有的ResponseTable做了一个扫描,过滤掉所有的过期响应,以及对过期响应做一次回调。这个responseTable缓存了所有正在进行的请求,类型是ConcurrentHashMap。

回到NettyRemotingServer,启动start方法下面,紧接着是shutdown()方法

public void shutdown() {
    try {
        if (this.timer != null) {
            this.timer.cancel();
        }

        this.eventLoopGroupBoss.shutdownGracefully();

        this.eventLoopGroupSelector.shutdownGracefully();

        if (this.nettyEventExecutor != null) {
            this.nettyEventExecutor.shutdown();
        }

        if (this.defaultEventExecutorGroup != null) {
            this.defaultEventExecutorGroup.shutdownGracefully();
        }
    } catch (Exception e) {
        log.error("NettyRemotingServer shutdown exception, ", e);
    }

    if (this.publicExecutor != null) {
        try {
            this.publicExecutor.shutdown();
        } catch (Exception e) {
            log.error("NettyRemotingServer shutdown exception, ", e);
        }
    }
}

可以看到就是关闭了所有资源包括:定时器timer,bossGroup,workerGroup,执行器publicExecutor(线程池)

接下来:注册RPC钩子:

@Override
public void registerRPCHook(RPCHook rpcHook) {
    if (rpcHook != null && !rpcHooks.contains(rpcHook)) {
        rpcHooks.add(rpcHook);
    }
}

rpcHooks是一个泛型为rpcHook的集合:

protected List rpcHooks = new ArrayList();

进入RPCHook:

org.apache.rocketmq.remoting.RPCHook
public interface RPCHook {
    void doBeforeRequest(final String remoteAddr, final RemotingCommand request);

    void doAfterResponse(final String remoteAddr, final RemotingCommand request,
        final RemotingCommand response);
}

可以看到接口内部有两个方法,分别是请求前执行和响应后执行。接口的实现类的实例化对象作为一个对象存入rpcHooks集合中,后期会调用并统一执行。

@Override
public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
    ExecutorService executorThis = executor;
    if (null == executor) {
        executorThis = this.publicExecutor;
    }

    Pair pair = new Pair(processor, executorThis);
    this.processorTable.put(requestCode, pair);
}

注册处理器,其实就是把请求码,处理器和执行器注册到NettyRemotingServer的processorTable中。执行器如果为空的话,会选择默认的publicExecutor

processorTable:

protected final HashMap> processorTable =
    new HashMap>(64);

继续往下看:

@Override
public RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, final long timeoutMillis)
    throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
    return this.invokeSyncImpl(channel, request, timeoutMillis);
}

同步引用,继续看调用的方法:

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
    final long timeoutMillis)
    throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
    final int opaque = request.getOpaque();

    try {
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
        this.responseTable.put(opaque, responseFuture);
        final SocketAddress addr = channel.remoteAddress();

进来之后可以看到,首先把同步调用的responseFuture包好放到responseTable里,然后准备发送请求,请求之后再用这个responseFuture去接结果,可以认为是留了个钩子在调用方了。

channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture f) throws Exception {
        if (f.isSuccess()) {
            responseFuture.setSendRequestOK(true);
            return;
        } else {
            responseFuture.setSendRequestOK(false);
        }

        responseTable.remove(opaque);
        responseFuture.setCause(f.cause());
        responseFuture.putResponse(null);
        log.warn("send a request command to channel <" + addr + "> failed.");
    }
});

可以看到接下来就是发送请求了,所谓同步调用invokeSync,其实就是使用netty的channel远程发送请求,并对结果做监听。

RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
    if (null == responseCommand) {
        if (responseFuture.isSendRequestOK()) {
            throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                responseFuture.getCause());
        } else {
            throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
        }
    }

    return responseCommand;
} finally {
    this.responseTable.remove(opaque);
}

之后就是等待结果的返回,然后从响应表responseTable中移除对应的Future了。

然后这里对于自定义异常的写法,我们也看下:

org.apache.rocketmq.remoting.exception.RemotingTimeoutException
public class RemotingTimeoutException extends RemotingException {

    private static final long serialVersionUID = 4106899185095245979L;

    public RemotingTimeoutException(String message) {
        super(message);
    }

  
    public RemotingTimeoutException(String addr, long timeoutMillis) {
        this(addr, timeoutMillis, null);
    }

    public RemotingTimeoutException(String addr, long timeoutMillis, Throwable cause) {
        super("wait response on the channel <" + addr + "> timeout, " + timeoutMillis + "(ms)", cause);
    }
}

这个类比较短小,直接都贴过来看。没什么逻辑,但是写法值得学习,而且对异常信息的打印也很全。

回到NettyRemotingServer,在同步调用invokeSync下面自然就是异步调用invokeAsync

@Override
public void invokeAsync(Channel channel, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
    throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
    this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
}

底层实现与invokeSync的区别就是首先做了锁,然后是没有实时对Future做回收,也就不存在同步调用那里的阻塞等待结果了。

不过这里就不对invokeAsync深入了,因为这个方法没有调用方。。

@Override
public void invokeOneway(Channel channel, RemotingCommand request, long timeoutMillis) throws InterruptedException,
    RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
    this.invokeOnewayImpl(channel, request, timeoutMillis);
}

接下来看下invokeOneway,单向调用

boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
    final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
    try {
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                once.release();
                if (!f.isSuccess()) {
                    log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
                }
            }
        });
    } catch (Exception e) {
        once.release();
        log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
        throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
    }
}

首先同样是先取锁,获取成功,则进行rpc远程调用,其实还是netty发送request到远端,加一个监听器来判断是否发送成功。

如果发生异常,可以看到代码中首先释放锁,这也是为了防止因为发生异常而死锁的情况,之后打印日志抛出异常。

else {
    if (timeoutMillis <= 0) {
        throw new RemotingTooMuchRequestException("invokeonewayImpl invoke too fast");
    } else {
        String info = String.format(
            "invokeonewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreOnewayValue: %d",
            timeoutMillis,
            this.semaphoreOneway.getQueueLength(),
            this.semaphoreOneway.availablePermits()
        );
        log.warn(info);
        throw new RemotingTimeoutException(info);
    }
}

如果取锁失败,则判断超时时间是否设置正确,如果<=0,则抛出异常;如果设置合理,那么会打印日志,内容为:当前rpc调用设置的超时时长,等待的线程数,可用锁的数量,打印完日志后抛出异常。

可以看到单向调用其实指的就是没有返回值,监听器的future也只是返回一下请求是否成功,并没有返回值。

再下面是声明了三个handler,和我们自己写的netty demo里的serverHandler,clientHandler一样的

org.apache.rocketmq.remoting.netty.NettyRemotingServer.HandshakeHandler

第一个是HandshakeHandler 也就是握手建立连接的handler

@ChannelHandler.Sharable
class HandshakeHandler extends SimpleChannelInboundHandler {

内部实现了一个方法:channelRead0

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {

        // mark the current position so that we can peek the first byte to determine if the content is starting with
        // TLS handshake
        msg.markReaderIndex();

        byte b = msg.getByte(0);

        if (b == HANDSHAKE_MAGIC_CODE) {
            switch (tlsMode) {
                case DISABLED:
                    ctx.close();
                    log.warn("Clients intend to establish an SSL connection while this server is running in SSL disabled mode");
                    break;
                case PERMISSIVE:
                case ENFORCING:
                    if (null != sslContext) {
                        ctx.pipeline()
                            .addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
                            .addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
                        log.info("Handlers prepended to channel pipeline to establish SSL connection");
                    } else {
                        ctx.close();
                        log.error("Trying to establish an SSL connection but sslContext is null");
                    }
                    break;

                default:
                    log.warn("Unknown TLS mode");
                    break;
            }
        } else if (tlsMode == TlsMode.ENFORCING) {
            ctx.close();
            log.warn("Clients intend to establish an insecure connection while this server is running in SSL enforcing mode");
        }

        // reset the reader index so that handshake negotiation may proceed as normal.
        msg.resetReaderIndex();

        try {
            // Remove this handler
            ctx.pipeline().remove(this);
        } catch (NoSuchElementException e) {
            log.error("Error while removing HandshakeHandler", e);
        }

        // Hand over this message to the next .
        ctx.fireChannelRead(msg.retain());
    }
}

首先校验传送信息msg的首字符是否是0x16,如果不是,则认为是不安全的连接,关闭掉ctx并打印日志。

如果是安全连接,接下来校验tlsMode是否是符合要求的模式,如果是关闭模式,则同样不能建立连接。

对于符合要求的连接,做以下三行操作:

ctx.pipeline()
    .addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
    .addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());

在ctx的pipeline管道里面添加两个handler,第一行加了一个自定义handler,第二行是添加了一个编码器用于加密。

握手连接handler结束后,再下面是NettyServerHandler:

org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler
@ChannelHandler.Sharable
class NettyServerHandler extends SimpleChannelInboundHandler {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        processMessageReceived(ctx, msg);
    }
}

内容比较简单,可以看到是对接收到的消息做处理

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
    final RemotingCommand cmd = msg;
    if (cmd != null) {
        switch (cmd.getType()) {
            case REQUEST_COMMAND:
                processRequestCommand(ctx, cmd);
                break;
            case RESPONSE_COMMAND:
                processResponseCommand(ctx, cmd);
                break;
            default:
                break;
        }
    }
}

处理分两种类型,请求处理和响应处理

内部先看下对传来的请求是如何处理的:

public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
    final Pair matched = this.processorTable.get(cmd.getCode());
    final Pair pair = null == matched ? this.defaultRequestProcessor : matched;
    final int opaque = cmd.getOpaque();

可以看到进来首先从processTable中先把netty请求处理器和执行器(线程池)准备好,这里我们发现一个规律,就是无论执行什么任务,做什么动作。这两个对象总是会传:一个processor(处理器)一个执行器(executorService)。

同时我们看下这个rocketmq内部写的pair:

org.apache.rocketmq.remoting.common.Pair
public class Pair {
    private T1 object1;
    private T2 object2;

    public Pair(T1 object1, T2 object2) {
        this.object1 = object1;
        this.object2 = object2;
    }

    public T1 getObject1() {
        return object1;
    }

    public void setObject1(T1 object1) {
        this.object1 = object1;
    }

    public T2 getObject2() {
        return object2;
    }

    public void setObject2(T2 object2) {
        this.object2 = object2;
    }
}

内容很简单,就是两个object一对儿。

接下来初始化一个Runnable接口:

if (pair != null) {
    Runnable run = new Runnable() {
        @Override
        public void run() {
            try {
                String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                doBeforeRpcHooks(remoteAddr, cmd);
                final RemotingResponseCallback callback = new RemotingResponseCallback() {
                    @Override
                    public void callback(RemotingCommand response) {
                        doAfterRpcHooks(remoteAddr, cmd, response);
                        if (!cmd.isOnewayRPC()) {
                            if (response != null) {
                                response.setOpaque(opaque);
                                response.markResponseType();
                                try {
                                    ctx.writeAndFlush(response);
                                } catch (Throwable e) {
                                    log.error("process request over, but response failed", e);
                                    log.error(cmd.toString());
                                    log.error(response.toString());
                                }
                            } else {
                            }
                        }
                    }
                };
                if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
                    AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
                    processor.asyncProcessRequest(ctx, cmd, callback);
                } else {
                    NettyRequestProcessor processor = pair.getObject1();
                    RemotingCommand response = processor.processRequest(ctx, cmd);
                    callback.callback(response);
                }
            } catch (Throwable e) {
                log.error("process request exception", e);
                log.error(cmd.toString());

                if (!cmd.isOnewayRPC()) {
                    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
                        RemotingHelper.exceptionSimpleDesc(e));
                    response.setOpaque(opaque);
                    ctx.writeAndFlush(response);
                }
            }
        }
    };

先从channel中获取到发送请求的客户端的地址:remoteAddr,然后根据这个地址去执行rpcHooks,还记得我们之前记过的,RPCHooks缓存的是执行请求前或获取响应后要执行的内容。

而在定义回调方法callback内,先执行响应后的动作,之后把响应回传给客户端,对于单向的rpc调用(无返回值),则不做操作。

继续往下,根据处理器是否异步同步做了不同的处理,但都调用了上面定义的callback方法

再往下就是从processorTable中获取执行器提交上面定义的runnable任务了。

回到NettyServerHandler,其实

processMessageReceived(ctx, msg)

这个方法就是执行器提交了收到的请求,并执行了前置和后置的钩子方法,并把响应通过channel返回回去。再看下

processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd)

处理响应的方法

public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
    final int opaque = cmd.getOpaque();
    final ResponseFuture responseFuture = responseTable.get(opaque);
    if (responseFuture != null) {
        responseFuture.setResponseCommand(cmd);

        responseTable.remove(opaque);

        if (responseFuture.getInvokeCallback() != null) {
            executeInvokeCallback(responseFuture);
        } else {
            responseFuture.putResponse(cmd);
            responseFuture.release();
        }
    } else {
        log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
        log.warn(cmd.toString());
    }
}

可以看到方法内部从responseTable中取出responseFuture,把ResponseCommand塞入responseFuture,执行responseFuture的回调方法,之后把responseTable中的对应Future删除。

接下来第三个handler:NettyConnectManageHandler、再回顾下前两个handler,第一个是握手handler,也就是建立连接;第二个是NettyServerHandler,也就是服务端接受响应或请求的handler。

第三个handler名称为netty连接管理handler:

org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyConnectManageHandler
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
    log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);
    super.channelRegistered(ctx);
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
    final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
    log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress);
    super.channelUnregistered(ctx);
}

首先是注册和解注册方法:channelRegistered、channelUnregistered,基本是继承了父类方法,只是从ChannelHandlerContext中取出地址打了日志。

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
    log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
    super.channelActive(ctx);

    if (NettyRemotingServer.this.channelEventListener != null) {
        NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
    }
}

接下来重写channelActive方法,除了日志和继承父方法外,还把本次连接的remoteAddress地址,channel本地存下来,存到了NettyEvent事件中的一个事件队列:

private final linkedBlockingQueue eventQueue = new linkedBlockingQueue();

可以理解为,服务端每接收到一个客户端的连接,就在服务器端留一个存根,也就是把channel存下来。后期这个channel可以用于比如给客户端主动发消息等操作。

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
    log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);
    super.channelInactive(ctx);

    if (NettyRemotingServer.this.channelEventListener != null) {
        NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
    }
}

与channelActive对应的一个channelInactive方法,逻辑基本一样,唯一区别是向NettyEvent队列中放的是关闭的NettyEvent事件。

接下来是重写的

userEventTriggered(ChannelHandlerContext ctx, Object evt)

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent) {
        IdleStateEvent event = (IdleStateEvent) evt;
        if (event.state().equals(IdleState.ALL_IDLE)) {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
            RemotingUtil.closeChannel(ctx.channel());
            if (NettyRemotingServer.this.channelEventListener != null) {
                NettyRemotingServer.this
                    .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
            }
        }
    }

    ctx.fireUserEventTriggered(evt);
}

这个方法其实就是同一个地址来的,建立了重复连接。比如此时节点A已经与服务端建立了连接,结果节点A又发来一个连接,则触发trigger。可以看到这里的处理是,关闭掉这个新连接,并把该连接放置到NettyEvent队列中,状态置为“闲置IDLE”。

接下来一个异常捕获方法exceptionCaught

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
    log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);
    log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);

    if (NettyRemotingServer.this.channelEventListener != null) {
        NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
    }

    RemotingUtil.closeChannel(ctx.channel());
}

可以看到同样是往NettyEvent队列里放置了channel,状态置为“异常EXCEPTION”。

到这儿NettyRemotingServer类就结束了,其实这些重写是很好的例子demo,之后使用netty做底层通信,都可以拿来借鉴。我看到这里,就想起之前自己写的netty-demo,其实也是同样的思路,对于客户端传来的连接ChannelHandlerContext,存到本地,后期用于向客户端主动推送。但是RocketMQ底层显然更规范和标准,每个事件都有对应的枚举。

其实到这儿也对netty的channel有了一个全新的认识,netty对于任何事件,比如:建立连接,断开连接,抛送异常,发送消息等,这些动作其实都是通过channel来传输,根据上面我们就能看到,这些动作的结果,其实就是给了一个ChannelHandlerContext。

现在回到NamesrvController,看了这么多,其实我们才读到NamesrvController的第56行,哈哈。继续往下,BrokerHousekeepingService,点进去看下:

public class BrokerHousekeepingService implements ChannelEventListener {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    private final NamesrvController namesrvController;

    public BrokerHousekeepingService(NamesrvController namesrvController) {
        this.namesrvController = namesrvController;
    }

    @Override
    public void onChannelConnect(String remoteAddr, Channel channel) {
    }

    @Override
    public void onChannelClose(String remoteAddr, Channel channel) {
        this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
    }

    @Override
    public void onChannelException(String remoteAddr, Channel channel) {
        this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
    }

    @Override
    public void onChannelIdle(String remoteAddr, Channel channel) {
        this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
    }
}

首先是继承了ChannelEventListener,channel事件监听器。结构其实很简单,对于channel的关闭,异常,空闲事件,都做了destroy,还记得我们前面看到的NameSrvHandler中的trigger方法吗,在那里把重复的连接置为了空闲,可以看到置为空闲其实就是destroy掉了。接下来看看这个

onChannelDestroy(remoteAddr, channel)

方法内部做了什么。

public void onChannelDestroy(String remoteAddr, Channel channel) {
    String brokerAddrFound = null;
    if (channel != null) {
        try {
            try {
                this.lock.readLock().lockInterruptibly();
                Iterator> itBrokerLiveTable =
                    this.brokerLiveTable.entrySet().iterator();
                while (itBrokerLiveTable.hasNext()) {
                    Entry entry = itBrokerLiveTable.next();
                    if (entry.getValue().getChannel() == channel) {
                        brokerAddrFound = entry.getKey();
                        break;
                    }
                }
            } finally {
                this.lock.readLock().unlock();
            }
        } catch (Exception e) {
            log.error("onChannelDestroy Exception", e);
        }
    }

进来首先,如果channel不为空,则获取读锁,这里注意RouteInfoManager内部是声明了读写锁的:

private final ReadWriteLock lock = new ReentrantReadWriteLock();

获取锁成功后,接下来从brokerLiveTable(在线的broker表)中遍历出value(BrokerLiveInfo)的channel为目标channel的key作为brokerAddrFound。也就是说第一个if,就是从brokerLiveTable中取出一个key:brokerAddrFound(broker的地址)

整个方法的主要部分都集中在对这个取出的key:brokerAddrFound的处理上。虽然代码很长,但整体逻辑并不复杂,都是删除key为brokerAddrFound的各种表中的元素:(合法化处理还有异常捕获这里就不分析了)

this.lock.writeLock().lockInterruptibly();
this.brokerLiveTable.remove(brokerAddrFound);
this.filterServerTable.remove(brokerAddrFound);

获取读锁(增删改都是取读锁),先从brokerLiveTable,filterServerTable中移除key为broker地址的元素。这也符合该方法的逻辑:channelDestory。

while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
    BrokerData brokerData = itBrokerAddrTable.next().getValue();

    Iterator> it = brokerData.getBrokerAddrs().entrySet().iterator();
    while (it.hasNext()) {
        Entry entry = it.next();
        Long brokerId = entry.getKey();
        String brokerAddr = entry.getValue();
        if (brokerAddr.equals(brokerAddrFound)) {
            brokerNameFound = brokerData.getBrokerName();
            it.remove();
            log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
                brokerId, brokerAddr);
            break;
        }
    }

    if (brokerData.getBrokerAddrs().isEmpty()) {
        removeBrokerName = true;
        itBrokerAddrTable.remove();
        log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
            brokerData.getBrokerName());
    }
}

从brokerData中移除

if (brokerNameFound != null && removeBrokerName) {
    Iterator>> it = this.clusterAddrTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry> entry = it.next();
        String clusterName = entry.getKey();
        Set brokerNames = entry.getValue();
        boolean removed = brokerNames.remove(brokerNameFound);
        if (removed) {
            log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
                brokerNameFound, clusterName);

            if (brokerNames.isEmpty()) {
                log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
                    clusterName);
                it.remove();
            }

            break;
        }
    }
}

clusterAddrTable 集群地址表 移除

if (removeBrokerName) {
        Iterator>> itTopicQueueTable =
            this.topicQueueTable.entrySet().iterator();
        while (itTopicQueueTable.hasNext()) {
            Entry> entry = itTopicQueueTable.next();
            String topic = entry.getKey();
            List queueDataList = entry.getValue();

            Iterator itQueueData = queueDataList.iterator();
            while (itQueueData.hasNext()) {
                QueueData queueData = itQueueData.next();
                if (queueData.getBrokerName().equals(brokerNameFound)) {
                    itQueueData.remove();
                    log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
                        topic, queueData);
                }
            }

            if (queueDataList.isEmpty()) {
                itTopicQueueTable.remove();
                log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
                    topic);
            }
        }
    }
} finally {
    this.lock.writeLock().unlock();
}

itTopicQueueTable topic队列表 移除,之后在finally代码块中释放写锁

回到NamesrvController,关于brokerHousekeepingService就结束了。

private ExecutorService remotingExecutor;

private Configuration configuration;

接下来是声明一个执行器,和一个配置类Configuration,进入Configuration看下:

org.apache.rocketmq.common.Configuration
private final InternalLogger log;

private List configObjectList = new ArrayList(4);
private String storePath;
private boolean storePathFromConfig = false;
private Object storePathObject;
private Field storePathField;
private DataVersion dataVersion = new DataVersion();
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
 

首先是声明日志工具,配置列表等 值得注意的是storePathField存储路径域,数据类型为Field,提供了访问类或接口的途径。

然后是一个读写锁,一个数据版本dataVersion,进入看下:

org.apache.rocketmq.common.DataVersion
public class DataVersion extends RemotingSerializable {
    private long timestamp = System.currentTimeMillis();
    private AtomicLong counter = new AtomicLong(0);

结构比较简单,一个long类型的时间戳,一个原子类型的long遍历作为计数器。内容也比较简单,每次操作该类的实例化对象,都会使版本号counter加一,并记录当前的操作时间timestamp。

回到Configuration配置类,除掉这些参数声明外,比较重要的就是初始化方式了,也就是在初始化的时候把配置内容都塞到配置列表:

public Configuration(InternalLogger log, Object... configObjects) {
    this.log = log;
    if (configObjects == null || configObjects.length == 0) {
        return;
    }
    for (Object configObject : configObjects) {
        registerConfig(configObject);
    }
}

直接进入注册配置:registerConfig中看下

public Configuration registerConfig(Object configObject) {
    try {
        readWriteLock.writeLock().lockInterruptibly();

        try {

            Properties registerProps = MixAll.object2Properties(configObject);

            merge(registerProps, this.allConfigs);

            configObjectList.add(configObject);
        } finally {
            readWriteLock.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("registerConfig lock error");
    }
    return this;
}

内容可以看到首先获取读锁,object转成properties,然后把待注册的配置合到(merge)allConfigs(全部配置)中,然后再把配置添加到全局的configObjectList中,之后释放锁。也就是把配置类注册到待注册的集合或者配置类中。这里我们看到使用了MixAll这个类,类的内容比较多,而且是一个工具类,所以就只看下这里用到的

object2Properties(final Object object)

方法,功能我们已经看到了,就是从object转到Properties,其实这个方法我们后续也可以借鉴的。

public static Properties object2Properties(final Object object) {
    Properties properties = new Properties();

    Field[] fields = object.getClass().getDeclaredFields();
    for (Field field : fields) {
        if (!Modifier.isStatic(field.getModifiers())) {
            String name = field.getName();
            if (!name.startsWith("this")) {
                Object value = null;
                try {
                    field.setAccessible(true);
                    value = field.get(object);
                } catch (IllegalAccessException e) {
                    log.error("Failed to handle properties", e);
                }

                if (value != null) {
                    properties.setProperty(name, value.toString());
                }
            }
        }
    }

    return properties;
}

这个方法本身不是特别难,获取到对象object的类文件,再获取到类文件内部的 属性名,属性值,塞入properties。一个细节是这些属性要排除掉静态修饰符的,this开头的。也就是说该方法不会把静态对象和this修饰的对象合入properties。

回到Configuration类继续往下:

public void setStorePathFromConfig(Object object, String fieldName) {
    assert object != null;

    try {
        readWriteLock.writeLock().lockInterruptibly();

        try {
            this.storePathFromConfig = true;
            this.storePathObject = object;
            // check
            this.storePathField = object.getClass().getDeclaredField(fieldName);
            assert this.storePathField != null
                && !Modifier.isStatic(this.storePathField.getModifiers());
            this.storePathField.setAccessible(true);
        } catch (NoSuchFieldException e) {
            throw new RuntimeException(e);
        } finally {
            readWriteLock.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("setStorePathFromConfig lock error");
    }
}

这个方法是从配置文件中读取存储路径。这里其实我们能发现,Configuration是一个配置类,也就是pojo类,而内部有很多操作此类的方法,其实也就是充血模型了,而rocketMQ底层我们能注意到,很多充血模型。包括之前看到的ResponseFuture,LoggingEvent等,都是充血模型。

在这个类里,存储路径直接被存到Field中。

紧接着是和

setStorePathFromConfig()

方法对应的get方法:

if (this.storePathFromConfig) {
    try {
        realStorePath = (String) storePathField.get(this.storePathObject);
    } catch (IllegalAccessException e) {
        log.error("getStorePath error, ", e);
    }
}

忽略掉对锁的获取和释放以及合理校验,最核心的代码其实就是这行,可以看到从storePathField取出来对象转为String就是存储路径了。

接下来一个update(Properties properties) 同样是把properties添加到Configuration中:

mergeIfExist(properties, this.allConfigs);

for (Object configObject : configObjectList) {
    // not allConfigs to update...
    MixAll.properties2Object(properties, configObject);
}

核心代码就是这两行,把待更新的properties添加到allConfigs和configObjectList,这个和之前的合并都是一样的。更新末尾还有一个

persist()

方法:

readWriteLock.readLock().lockInterruptibly();

try {
    String allConfigs = getAllConfigsInternal();

    MixAll.string2File(allConfigs, getStorePath());

主要代码有如上三行,可以看到是配置持久化,所以取的是读锁,从内置的配置中获取所有配置内容,然后持久化到文件。

接下来看下

MixAll.string2File( allConfigs, getStorePath( ) )

做了什么

public static void string2File(final String str, final String fileName) throws IOException {

    String tmpFile = fileName + ".tmp";
    string2FileNotSafe(str, tmpFile);

    String bakFile = fileName + ".bak";
    String prevContent = file2String(fileName);
    if (prevContent != null) {
        string2FileNotSafe(prevContent, bakFile);
    }

    File file = new File(fileName);
    file.delete();

    file = new File(tmpFile);
    file.renameTo(new File(fileName));
}

可以看到是创建了两个不同后缀的文件,一个是.tmp,一个是.bak,先进入string2FileNotSafe(str, tmpFile)看下干了什么:

File file = new File(fileName);
File fileParent = file.getParentFile();
if (fileParent != null) {
    fileParent.mkdirs();
}
FileWriter fileWriter = null;

try {
    fileWriter = new FileWriter(file);
    fileWriter.write(str);

可以看到内容很简单,根据文件路径创建文件,把内容写入文件中。

回到

string2File(final String str, final String fileName);

看下

String prevContent = file2String(fileName);

这一行干了什么,从外面看,是从文件名获取到一个前置内容prevContent,判断应该是把文件的前置内容落入.bak文件中

try {
    inputStream = new FileInputStream(file);
    int len = inputStream.read(data);
    result = len == data.length;
} finally {
    if (inputStream != null) {
        inputStream.close();
    }
}

if (result) {
    return new String(data);
}

可以看到如果从文件中读出的字节数等于文件的字节数(也就是完全读出成功了),则返回读出的内容。

if (prevContent != null) {
    string2FileNotSafe(prevContent, bakFile);
}

此时就明白这个prevContent的含义了,其实就是先从源文件中预读出一次,预读出成功后,再把预读出的内容全部写入到bak后缀的文件中。其实就是做了一层缓存,先预读出,再把缓存写入源文件。此时我们在.bak和.tmp两个后缀的文件中有读出内容,下面两个操作,

File file = new File(fileName);
file.delete();

file = new File(tmpFile);
file.renameTo(new File(fileName));

就把.tmp文件冲掉了,应该是这样的,这两行代码其实没有特别搞清楚。但可以肯定.bak文件是一定拥有全部输出内容的。

此时我们就把

persist( )

这个持久化方法读明白了,之前我们自己做持久化,就是落库落库落库,落mysql,但在RocketMq的源码中,我们看到了“持久化”的底层操作,其实就是io流写入本地硬盘。

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号