总起namesrv
org.apache.rocketmq.namesrv.kvconfig.KVConfigManagerorg.apache.rocketmq.logging.InternalLoggerFactoryorg.apache.rocketmq.logging.inner.Loggerorg.apache.rocketmq.logging.inner.Logger.DefaultLoggerRepositoryorg.apache.rocketmq.logging.inner.Levelorg.apache.rocketmq.logging.inner.Appender.AppenderPipelineImplorg.apache.rocketmq.logging.inner.Appenderorg.apache.rocketmq.logging.inner.LoggingEventorg.apache.rocketmq.namesrv.NamesrvControllerorg.apache.rocketmq.remoting.RemotingServerorg.apache.rocketmq.remoting.netty.NettyRemotingServerorg.apache.rocketmq.remoting.netty.NettyServerConfigorg.apache.rocketmq.remoting.netty.TlsHelperorg.apache.rocketmq.remoting.netty.ResponseFutureorg.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnceorg.apache.rocketmq.remoting.RPCHookorg.apache.rocketmq.remoting.exception.RemotingTimeoutExceptionorg.apache.rocketmq.remoting.netty.NettyRemotingServer.HandshakeHandlerorg.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandlerorg.apache.rocketmq.remoting.common.Pairorg.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyConnectManageHandlerorg.apache.rocketmq.common.Configurationorg.apache.rocketmq.common.DataVersion
总起RocketMq消息中间件,真正的源码级学习。从第一个类到最后一个类,一段一段读。
解释下这个结构,我是从按顺序,从第一个类开始读,每读到一个不熟悉的类,就点进去全部读完,再继续。
比如读类A,其中引用了类B,而类B我没有读过,那么我去把类B全部读完,再回到类A继续读。
下面可以看到这种读法了,办法比较笨,就是一行一行硬度,但是我相信
这样做是值得的,也能真正有收获。
从头开始的第一个类
org.apache.rocketmq.logging.InternalLoggerFactory代码入口首先是日志工具org.apache.rocketmq.logging.InternalLoggerFactory
InternalLoggerFactory是一个抽象类,有两个实现类:InnerLoggerFactory和Slf4jLoggerFactory
内部有两个重要内容,一个是InternalLoggerFactory,另一个是InternalLogger
还有一个ConcurrentHashMap类型的缓存:loggerFactoryCache key是唯一标识,value是InternalLoggerFactory,也就是这个类本身。
LoggerFactory的获取,优先返回用户设置的loggerType,其次是默认的(slf4j),最后是(inner),如果全部初始化失败则抛出异常。
注意一个细节:InternalLoggerFactory的构造方法非常简洁,抽取成了一个私有方法:
protected void doRegister() {
String loggerType = getLoggerType();
if (loggerFactoryCache.get(loggerType) != null) {
return;
}
loggerFactoryCache.put(loggerType, this);
}
// 方法的作用就是把当前日志工厂塞入缓存
抽象方法:
protected abstract String getLoggerType();
作用就是返回当前日志工厂的类型(slf4j或者inner)
一个比较重要的方法是:
protected abstract InternalLogger getLoggerInstance(String name);
获取打印日志的实例InternalLogger
因为实际打印日志时,使用的就是InternalLogger实例。下面去看一下getLoggerInstance(String name)的一个实现类:slf4j中的实现
Slf4jLoggerFactory中的getLoggerInstance所获取的InternalLogger,其实就是对org.slf4j.LoggerFactory的一个封装
public Slf4jLogger(String name) {
logger = LoggerFactory.getLogger(name);
}
可以看到,没有做任何处理。
再看下getLoggerInstance(String name)的一个实现类:InnerLoggerFactory中的实现
同样是看从哪里获取的InnerLogger
public InnerLogger(String name) {
logger = Logger.getLogger(name);
}
org.apache.rocketmq.logging.inner.Logger
这里的Logger的全名是:org.apache.rocketmq.logging.inner.Logger 也就是说这是rocketmq自己开发的Logger
org.apache.rocketmq.logging.inner.Logger.DefaultLoggerRepository进来之后首先声明一个DefaultLoggerRepository
private static final DefaultLoggerRepository REPOSITORY = new DefaultLoggerRepository(new RootLogger(Level.DEBUG));
DefaultLoggerRepository是Logger的内部类,底层主要是由HashTable实现
final Hashtableht = new Hashtable ();
key为CategoryKey(String类型的name + name的hashcode),value为Object
org.apache.rocketmq.logging.inner.Level内部有一个对日志等级的定义:org.apache.rocketmq.logging.inner.Level
对外暴露三个属性:level,levelStr,syslogEquivalent
transient int level; transient String levelStr; transient int syslogEquivalent;
为防止被序列化,加了transient关键字,内部定义了六种日志级别:
final static public Level OFF = new Level(OFF_INT, OFF_NAME, 0); final static public Level ERROR = new Level(ERROR_INT, ERROR_NAME, 3); final static public Level WARN = new Level(WARN_INT, WARN_NAME, 4); final static public Level INFO = new Level(INFO_INT, INFO_NAME, 6); final static public Level DEBUG = new Level(DEBUG_INT, DEBUG_NAME, 7); final static public Level ALL = new Level(ALL_INT, ALL_NAME, 7);
值得注意的是这个toLevel方法:
public static Level toLevel(String sArg, Level defaultLevel) {
if (sArg == null) {
return defaultLevel;
}
String s = sArg.toUpperCase();
if (s.equals(ALL_NAME)) {
return Level.ALL;
}
if (s.equals(DEBUG_NAME)) {
return Level.DEBUG;
}
if (s.equals(INFO_NAME)) {
return Level.INFO;
}
if (s.equals(WARN_NAME)) {
return Level.WARN;
}
if (s.equals(ERROR_NAME)) {
return Level.ERROR;
}
if (s.equals(OFF_NAME)) {
return Level.OFF;
}
if (s.equals(INFO_NAME)) {
return Level.INFO;
}
return defaultLevel;
}
String s = sArg.toUpperCase(); 这个就很用户友好了,也就是说获取日志级别,用户输入大小写都是兼容的。
此时我们再次回到DefaultLoggerRepository,此时我们就明白了,DefaultLoggerRepository其实就是Logger内部用来做缓存的内部类,底层由Hashtable实现,并对Hashtable使用synchronized关键字做了并发处理。
回到Logger,继续往下走,又看到
Appender.AppenderPipelineImpl appenderPipeline;
org.apache.rocketmq.logging.inner.Appender.AppenderPipelineImpl从翻译看,这个叫追加器管道,是AppenderPipeline的实现类,下面是这个接口的定义:
public interface AppenderPipeline {
void addAppender(Appender newAppender);
Enumeration getAllAppenders();
Appender getAppender(String name);
boolean isAttached(Appender appender);
void removeAllAppenders();
void removeAppender(Appender appender);
void removeAppender(String name);
}
进入实现类AppenderPipelineImpl可以看到,其实这个管道,就是一个元素类型为Appender的Vector集合
public static class AppenderPipelineImpl implements AppenderPipeline {
protected Vector appenderList;
附:Vector,与ArrayList、linkedList相比,特点在于线程安全。多线程环境推荐使用。
org.apache.rocketmq.logging.inner.Appenderpublic abstract class Appender {
public static final int CODE_WRITE_FAILURE = 1;
public static final int CODE_FLUSH_FAILURE = 2;
public static final int CODE_CLOSE_FAILURE = 3;
public static final int CODE_FILE_OPEN_FAILURE = 4;
public final static String LINE_SEP = System.getProperty("line.separator");
boolean firstTime = true;
protected Layout layout;
protected String name;
protected boolean closed = false;
可以看到Appender内部最重要的就是这个Layout,翻译为分层。
最重要的方法是
doAppend( )
public synchronized void doAppend(LoggingEvent event) {
if (closed) {
SysLogger.error("Attempted to append to closed appender named [" + name + "].");
return;
}
this.append(event);
}
会往Appender内继续追加日志事件LoggingEvent,再看下日志事件LoggingEvent是什么
org.apache.rocketmq.logging.inner.LoggingEvent可以看到这就是一个比较具体的“日志”了,内部包含了一条日志所应该包含的信息:
public class LoggingEvent implements java.io.Serializable {
transient public final String fqnOfCategoryClass;
transient private Object message;
transient private Level level;
transient private Logger logger;
private String renderedMessage;
private String threadName;
public final long timeStamp;
private Throwable throwable;
比较特别的是日志对象里还包含了打印日志的工具Logger和线程名threadName
从日志体出来,回到Logger,往下走看到两个获取Logger的方法:
static public Logger getLogger(Class clazz) {
return getRepository().getLogger(clazz.getName());
}
public static Logger getRootLogger() {
return getRepository().getRootLogger();
}
也就是说在之前提到的DefaultLoggerRepository中,有两类Logger,除了普通的Logger以外,还有一个根日志器rootLogger,当然进入底层可以看到其实rootLogger也是一个普通的Logger,只不过命名为root
public Logger getRootLogger() {
return root;
}
public static class RootLogger extends Logger {
public RootLogger(Level level) {
super("root");
setLevel(level);
}
}
这里有一个重要的方法:callAppenders
public void callAppenders(LoggingEvent event) {
int writes = 0;
for (Logger logger = this; logger != null; logger = logger.parent) {
synchronized (logger) {
if (logger.appenderPipeline != null) {
writes += logger.appenderPipeline.appendLoopOnAppenders(event);
}
if (!logger.additive) {
break;
}
}
}
if (writes == 0) {
getRepository().emitNoAppenderWarning(this);
}
}
可以看到这个日志,核心是把Logger都塞到管道appenderPipeline里面。而这个起始的变量writes,就是记录appenderPipeline中logger总数的值,当总数为0时会抛出告警。
public void debug(Object message) {
if (getRepository().isDisabled(Level.DEBUG_INT)) {
return;
}
if (Level.DEBUG.isGreaterOrEqual(this.getEffectiveLevel())) {
forcedLog(FQCN, Level.DEBUG, message, null);
}
}
public void debug(Object message, Throwable t) {
if (getRepository().isDisabled(Level.DEBUG_INT)) {
return;
}
if (Level.DEBUG.isGreaterOrEqual(this.getEffectiveLevel())) {
forcedLog(FQCN, Level.DEBUG, message, t);
}
}
public void error(Object message) {
if (getRepository().isDisabled(Level.ERROR_INT)) {
return;
}
if (Level.ERROR.isGreaterOrEqual(this.getEffectiveLevel())) {
forcedLog(FQCN, Level.ERROR, message, null);
}
}
public void error(Object message, Throwable t) {
if (getRepository().isDisabled(Level.ERROR_INT)) {
return;
}
if (Level.ERROR.isGreaterOrEqual(this.getEffectiveLevel())) {
forcedLog(FQCN, Level.ERROR, message, t);
}
}
protected void forcedLog(String fqcn, Level level, Object message, Throwable t) {
callAppenders(new LoggingEvent(fqcn, this, level, message, t));
}
可以看到debug,error级别的打印日志,底层都是调用了callAppenders这个方法,而callAppenders追到最下面,就是调用了Appender的doAppend方法,也就是向appenderPipeline管道里面追加一个LoggerEvent。
public void warn(Object message) {
if (getRepository().isDisabled(Level.WARN_INT)) {
return;
}
if (Level.WARN.isGreaterOrEqual(this.getEffectiveLevel())) {
forcedLog(FQCN, Level.WARN, message, null);
}
}
public void warn(Object message, Throwable t) {
if (getRepository().isDisabled(Level.WARN_INT)) {
return;
}
if (Level.WARN.isGreaterOrEqual(this.getEffectiveLevel())) {
forcedLog(FQCN, Level.WARN, message, t);
}
}
可以看到info级别的日志,warn级别的日志,也都是一样的。
public void info(Object message) {
if (getRepository().isDisabled(Level.INFO_INT)) {
return;
}
if (Level.INFO.isGreaterOrEqual(this.getEffectiveLevel())) {
forcedLog(FQCN, Level.INFO, message, null);
}
}
public void info(Object message, Throwable t) {
if (getRepository().isDisabled(Level.INFO_INT)) {
return;
}
if (Level.INFO.isGreaterOrEqual(this.getEffectiveLevel())) {
forcedLog(FQCN, Level.INFO, message, t);
}
}
至此我们基本看完了InternalLoggerFactory类,回到起点KVConfigManager
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
可以看到我们其实才读了一行代码,在这里KVConfigManager的头部,声明了log,LoggerName为“RocketmqNamesrv”,这也对应我们当前在读的总包,是在namesrv包下。
到了第二行:
private final NamesrvController namesrvController;
这里声明了一个NamesrvController,然后接下来我们去看下这里面做了哪些事。
org.apache.rocketmq.namesrv.NamesrvController private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final NamesrvConfig namesrvConfig;
private final NettyServerConfig nettyServerConfig;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"NSScheduledThread"));
private final KVConfigManager kvConfigManager;
private final RouteInfoManager routeInfoManager;
进来之后首先是之前我们已经看过的InternalLogger,与KVConfigManager里声明的一模一样。接着下面是两个Config,内部就是一些常规的配置参数,不过并没有使用注解开发,全部是原生的get set方法。
还声明了一个单线程实例的ExecutorService
进入第一个比较重要的接口:
org.apache.rocketmq.remoting.RemotingServerRemotingServer是一个接口,我们直接进入它的唯一实现类看看:
org.apache.rocketmq.remoting.netty.NettyRemotingServerprivate static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
此处的日志工具的LoggerName是RocketmqRemoting了,因为此时其实已经调到了remoting包下,不在namesrv包了。
先不追究RemotingHelper,继续读NettyRemotingServer
private final ServerBootstrap serverBootstrap;
private final EventLoopGroup eventLoopGroupSelector;
private final EventLoopGroup eventLoopGroupBoss;
private final NettyServerConfig nettyServerConfig;
private final ExecutorService publicExecutor;
private final ChannelEventListener channelEventListener;
private final Timer timer = new Timer("ServerHouseKeepingService", true);
private DefaultEventExecutorGroup defaultEventExecutorGroup;
接下来声明的,是RocketMq中为数不多依赖的东西,netty中的ServerBootstrap,EventLoopGroup,EventLoopGroup,ChannelEventListener,DefaultEventExecutorGroup,使用过netty的同学应该对这几个类并不陌生,我们先不去看netty的源码了,一会看他怎么用。
除这几个netty的类以外,还有一个声明为守护进程的Timer,一个来自JUC包下提交异步任务的ExecutorService
private HandshakeHandler handshakeHandler; private NettyEncoder encoder; private NettyConnectManageHandler connectionManageHandler; private NettyServerHandler serverHandler;
接下来还是netty的handler等相关类的声明
public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
this(nettyServerConfig, null);
}
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
继续往下是NettyRemotingServer的两个构造方法,一个是没有channelEventListener的构造方法,另一个有
构造方法首先声明两个信号量,一个是单向信号量容量,一个异步信号量容量
super(nettyServerConfig.getServerOnewaySemaphorevalue(), nettyServerConfig.getServerAsyncSemaphorevalue());
// super如下:(非源码,这两段是拼在一起的)
public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) {
this.semaphoreoneway = new Semaphore(permitsOneway, true);
this.semaphoreAsync = new Semaphore(permitsAsync, true);
}
在NettyServerConfig中的配置分别是:单向信号量256,异步信号量64
org.apache.rocketmq.remoting.netty.NettyServerConfigprivate int listenPort = 8888; private int serverWorkerThreads = 8; private int serverCallbackExecutorThreads = 0; private int serverSelectorThreads = 3; private int serveronewaySemaphorevalue = 256; private int serverAsyncSemaphorevalue = 64; private int serverChannelMaxIdleTimeSeconds = 120;
继续往下创建一个线程池:publicExecutor
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
内置了一个原子变量,对线程名做了一个重写。
根据是否轮询
useEpoll( )
提供了不同的eventLoopGroupBoss和eventLoopGroupSelector。
对于轮询的模式,使用了EpollEventLoopGroup,非轮询的模式使用了NioEventLoopGroup
public void loadSslContext() {
TlsMode tlsMode = TlsSystemConfig.tlsMode;
log.info("Server is running in TLS {} mode", tlsMode.getName());
if (tlsMode != TlsMode.DISABLED) {
try {
sslContext = TlsHelper.buildSslContext(false);
log.info("SSLContext created for server");
} catch (CertificateException e) {
log.error("Failed to create SSLContext for server", e);
} catch (IOException e) {
log.error("Failed to create SSLContext for server", e);
}
}
}
之后就是构建SslContext了,而sslContext这个类属于netty内部的,就不去深究了。这里普及下ssl和tls分别是什么:
SSL(Secure Sockets Layer 安全套接字协议),及其继任者传输层安全(Transport Layer Security,TLS)是为网络通信提供安全及数据完整性的一种安全协议。TLS与SSL在传输层与应用层之间对网络连接进行加密。
也就是两种网络协议。
而构建sslContext的方法在TlsHelper中,下面看下这个方法做了什么:
org.apache.rocketmq.remoting.netty.TlsHelper这个类中最主要的就是buildSslContext方法,所以看这个方法顺便把这个类过一遍。
入口如下:
public static SslContext buildSslContext(boolean forClient) throws IOException, CertificateException {
File configFile = new File(TlsSystemConfig.tlsConfigFile);
extractTlsConfigFromFile(configFile);
logTheFinalUsedTlsConfig();
可以看到首先是从TlsSystemConfig中读取配置文件路径,解析配置文件,然后打印配置文件内容中的重要信息日志,进入
extractTlsConfigFromFile(configFile)
看下
除了配置文件判空外,比较核心的代码就是下面这段:
Properties properties;
properties = new Properties();
InputStream inputStream = null;
try {
inputStream = new FileInputStream(configFile);
properties.load(inputStream);
} catch (IOException ignore) {
} finally {
if (null != inputStream) {
try {
inputStream.close();
} catch (IOException ignore) {
}
}
}
使用io流从configFile中读取参数信息,再下面就是赋值了
tlsTestModeEnable = Boolean.parseBoolean(properties.getProperty(TLS_TEST_MODE_ENABLE, String.valueOf(tlsTestModeEnable))); tlsServerNeedClientAuth = properties.getProperty(TLS_SERVER_NEED_CLIENT_AUTH, tlsServerNeedClientAuth); tlsServerKeyPath = properties.getProperty(TLS_SERVER_KEYPATH, tlsServerKeyPath); tlsServerKeyPassword = properties.getProperty(TLS_SERVER_KEYPASSWORD, tlsServerKeyPassword); tlsServerCertPath = properties.getProperty(TLS_SERVER_CERTPATH, tlsServerCertPath); tlsServerAuthClient = Boolean.parseBoolean(properties.getProperty(TLS_SERVER_AUTHCLIENT, String.valueOf(tlsServerAuthClient))); tlsServerTrustCertPath = properties.getProperty(TLS_SERVER_TRUSTCERTPATH, tlsServerTrustCertPath); tlsClientKeyPath = properties.getProperty(TLS_CLIENT_KEYPATH, tlsClientKeyPath); tlsClientKeyPassword = properties.getProperty(TLS_CLIENT_KEYPASSWORD, tlsClientKeyPassword); tlsClientCertPath = properties.getProperty(TLS_CLIENT_CERTPATH, tlsClientCertPath); tlsClientAuthServer = Boolean.parseBoolean(properties.getProperty(TLS_CLIENT_AUTHSERVER, String.valueOf(tlsClientAuthServer))); tlsClientTrustCertPath = properties.getProperty(TLS_CLIENT_TRUSTCERTPATH, tlsClientTrustCertPath);
从配置文件获取信息之后,就是根据配置文件的内容来构建sslContext了,这里由于对网络协议不是很熟,就不分析了。
调用tlsHelper构建好sslContext后,就开始进入NettyRemotingServer的
start()
方法了,这是很重要的方法,用过netty框架的同学应该知道,这其实就相当于我们自己写的netty服务端的启动部分。
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
进来后首先是创建一个DefaultEventExecutorGroup对象,内部设置线程数和线程工厂,线程工厂里重写了所生成线程的名称。
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
可以看到是很常见的netty服务端ServerBootstrap的初始化方式
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
绑定端口,启动服务端。
启动后接下来是一个定时任务:
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
可以看到是3秒后执行,之后每过1秒再执行一次。执行的内容是scanResponseTable,下面看下这个任务:
首先是这个方法的注释:This method is periodically invoked to scan and expire deprecated request. 指的是此方法用于扫描和过期(动词,使过期)请求。下面是此方法的源码:
public void scanResponseTable() {
final List rfList = new linkedList();
Iterator> it = this.responseTable.entrySet().iterator();
while (it.hasNext()) {
Entry next = it.next();
ResponseFuture rep = next.getValue();
if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
rep.release();
it.remove();
rfList.add(rep);
log.warn("remove timeout request, " + rep);
}
}
for (ResponseFuture rf : rfList) {
try {
executeInvokeCallback(rf);
} catch (Throwable e) {
log.warn("scanResponseTable, operationComplete Exception", e);
}
}
}
第一部分代码到while循环结束,可以看到是比较好理解的,遍历responseTable,并对其中已经超过过期时间1秒钟的响应进行删除,且会把过期的响应加入到刚创建的rfList中
也就是说每秒钟对responseTable中的请求进行一次过滤,并把所有当次清除的响应进行一次回调:
executeInvokeCallback(rf)
接下来看一下回调里面干了什么。
先看下方法注释:
先在回调执行器里执行回调方法,如果回调方法为空,则直接在当前线程中运行方法。
private void executeInvokeCallback(final ResponseFuture responseFuture) {
boolean runInThisThread = false;
ExecutorService executor = this.getCallbackExecutor();
if (executor != null) {
try {
executor.submit(new Runnable() {
@Override
public void run() {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("execute callback in executor exception, and callback throw", e);
} finally {
responseFuture.release();
}
}
});
} catch (Exception e) {
runInThisThread = true;
log.warn("execute callback in executor exception, maybe executor busy", e);
}
} else {
runInThisThread = true;
}
if (runInThisThread) {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("executeInvokeCallback Exception", e);
} finally {
responseFuture.release();
}
}
}
可以看到这个回调执行器其实指的就是一个异步线程池,所以通过看源码,上面的注释我们可以这样理解:
如果异步线程池存在,则提交异步任务执行内置回调方法,如果异步线程池不存在,则当前线程直接执行。接下来我们看下所谓执行的回调方法内部是干了什么:
这个回调方法是写在responseFuture中的,所以我们直接把这个类通读下:
org.apache.rocketmq.remoting.netty.ResponseFutureprivate final int opaque; private final Channel processChannel; private final long timeoutMillis; private final InvokeCallback invokeCallback; private final long beginTimestamp = System.currentTimeMillis(); private final CountDownLatch countDownLatch = new CountDownLatch(1); private final SemaphoreReleaseOnlyOnce once; private final AtomicBoolean executeCallbackonlyOnce = new AtomicBoolean(false); private volatile RemotingCommand responseCommand; private volatile boolean sendRequestOK = true; private volatile Throwable cause;
首先是一些声明的变量。值得注意的是 一个只能释放一次的信号once和一个原子布尔变量executeCallbackOnlyOnce,含义是该回调方法只能被调用一次。关于SemaphoreReleaseOnlyOnce,过一下这个类:
org.apache.rocketmq.remoting.common.SemaphoreReleaseonlyOnce这个类的内容本身不是很复杂,主要是对Semaphore做了一层包装:
private final AtomicBoolean released = new AtomicBoolean(false);
private final Semaphore semaphore;
public SemaphoreReleaseOnlyOnce(Semaphore semaphore) {
this.semaphore = semaphore;
}
public void release() {
if (this.semaphore != null) {
if (this.released.compareAndSet(false, true)) {
this.semaphore.release();
}
}
}
public Semaphore getSemaphore() {
return semaphore;
}
代码量不大,主要是一个标记为只能释放一次的原子布尔变量,和一个release方法,逻辑就是如果没有被release的情况下,才能release
Semaphore是jdk1.5之后新增加的一个类,翻译为信号,但其实可以理解为一个锁,这个不属于rocketMQ源码,这里就不深究了,简单说就是必须从Semaphore中获取到一个锁,才能执行对应方法,释放后,其他线程才能获取该锁(当然允许同时执行的线程数是自定义的)。
而这里可以看到SemaphoreReleaseOnlyOnce,回调方法是只允许执行一次的,
再回到ResponseFuture,我们就明白了,也就是说为了保证回调方法只执行一次,作者做了两道限制:
一个是executeCallbackOnlyOnce原子布尔变量,一个是SemaphoreReleaseOnlyOnce互斥锁
除了
executeInvokeCallback()
方法外,再往下看:
public boolean isTimeout() {
long diff = System.currentTimeMillis() - this.beginTimestamp;
return diff > this.timeoutMillis;
}
判断超时,这个方法很简单,判断当前时间减去开始时间是否比超时时间长。
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return this.responseCommand;
}
public void putResponse(final RemotingCommand responseCommand) {
this.responseCommand = responseCommand;
this.countDownLatch.countDown();
}
接下来是一组方法,一个等待响应,一个塞响应。加入putResponse被调用,responseCommand有了值,则计时器会被立即停止。waitResponse也会立即返回。CountDownLatch是jdk1.5开始引入的一个计时器,作用其实就是阻塞线程一段时间。
private final CountDownLatch countDownLatch = new CountDownLatch(1);
计时器同样也只允许被调用一次,也就是说等待响应----获取响应 这个过程对单个对象来说只允许一次。
通读下来ResponseFuture有一个特点就是:一次性。不论是获取响应还是执行回调方法,单个responseFuture都是严格只允许执行一次的。
搞清这个回调方法,我们退回到起点,也就是NettyRemotingServer的服务端启动的地方,也就是在启动服务端后,对所有的ResponseTable做了一个扫描,过滤掉所有的过期响应,以及对过期响应做一次回调。这个responseTable缓存了所有正在进行的请求,类型是ConcurrentHashMap。
回到NettyRemotingServer,启动start方法下面,紧接着是shutdown()方法
public void shutdown() {
try {
if (this.timer != null) {
this.timer.cancel();
}
this.eventLoopGroupBoss.shutdownGracefully();
this.eventLoopGroupSelector.shutdownGracefully();
if (this.nettyEventExecutor != null) {
this.nettyEventExecutor.shutdown();
}
if (this.defaultEventExecutorGroup != null) {
this.defaultEventExecutorGroup.shutdownGracefully();
}
} catch (Exception e) {
log.error("NettyRemotingServer shutdown exception, ", e);
}
if (this.publicExecutor != null) {
try {
this.publicExecutor.shutdown();
} catch (Exception e) {
log.error("NettyRemotingServer shutdown exception, ", e);
}
}
}
可以看到就是关闭了所有资源包括:定时器timer,bossGroup,workerGroup,执行器publicExecutor(线程池)
接下来:注册RPC钩子:
@Override
public void registerRPCHook(RPCHook rpcHook) {
if (rpcHook != null && !rpcHooks.contains(rpcHook)) {
rpcHooks.add(rpcHook);
}
}
rpcHooks是一个泛型为rpcHook的集合:
protected ListrpcHooks = new ArrayList ();
进入RPCHook:
org.apache.rocketmq.remoting.RPCHookpublic interface RPCHook {
void doBeforeRequest(final String remoteAddr, final RemotingCommand request);
void doAfterResponse(final String remoteAddr, final RemotingCommand request,
final RemotingCommand response);
}
可以看到接口内部有两个方法,分别是请求前执行和响应后执行。接口的实现类的实例化对象作为一个对象存入rpcHooks集合中,后期会调用并统一执行。
@Override
public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
ExecutorService executorThis = executor;
if (null == executor) {
executorThis = this.publicExecutor;
}
Pair pair = new Pair(processor, executorThis);
this.processorTable.put(requestCode, pair);
}
注册处理器,其实就是把请求码,处理器和执行器注册到NettyRemotingServer的processorTable中。执行器如果为空的话,会选择默认的publicExecutor
processorTable:
protected final HashMap> processorTable = new HashMap >(64);
继续往下看:
@Override
public RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
return this.invokeSyncImpl(channel, request, timeoutMillis);
}
同步引用,继续看调用的方法:
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque();
try {
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
进来之后可以看到,首先把同步调用的responseFuture包好放到responseTable里,然后准备发送请求,请求之后再用这个responseFuture去接结果,可以认为是留了个钩子在调用方了。
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
可以看到接下来就是发送请求了,所谓同步调用invokeSync,其实就是使用netty的channel远程发送请求,并对结果做监听。
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
之后就是等待结果的返回,然后从响应表responseTable中移除对应的Future了。
然后这里对于自定义异常的写法,我们也看下:
org.apache.rocketmq.remoting.exception.RemotingTimeoutExceptionpublic class RemotingTimeoutException extends RemotingException {
private static final long serialVersionUID = 4106899185095245979L;
public RemotingTimeoutException(String message) {
super(message);
}
public RemotingTimeoutException(String addr, long timeoutMillis) {
this(addr, timeoutMillis, null);
}
public RemotingTimeoutException(String addr, long timeoutMillis, Throwable cause) {
super("wait response on the channel <" + addr + "> timeout, " + timeoutMillis + "(ms)", cause);
}
}
这个类比较短小,直接都贴过来看。没什么逻辑,但是写法值得学习,而且对异常信息的打印也很全。
回到NettyRemotingServer,在同步调用invokeSync下面自然就是异步调用invokeAsync
@Override
public void invokeAsync(Channel channel, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
}
底层实现与invokeSync的区别就是首先做了锁,然后是没有实时对Future做回收,也就不存在同步调用那里的阻塞等待结果了。
不过这里就不对invokeAsync深入了,因为这个方法没有调用方。。
@Override
public void invokeOneway(Channel channel, RemotingCommand request, long timeoutMillis) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
this.invokeOnewayImpl(channel, request, timeoutMillis);
}
接下来看下invokeOneway,单向调用
boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
once.release();
if (!f.isSuccess()) {
log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
}
}
});
} catch (Exception e) {
once.release();
log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
}
首先同样是先取锁,获取成功,则进行rpc远程调用,其实还是netty发送request到远端,加一个监听器来判断是否发送成功。
如果发生异常,可以看到代码中首先释放锁,这也是为了防止因为发生异常而死锁的情况,之后打印日志抛出异常。
else {
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeonewayImpl invoke too fast");
} else {
String info = String.format(
"invokeonewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreOnewayValue: %d",
timeoutMillis,
this.semaphoreOneway.getQueueLength(),
this.semaphoreOneway.availablePermits()
);
log.warn(info);
throw new RemotingTimeoutException(info);
}
}
如果取锁失败,则判断超时时间是否设置正确,如果<=0,则抛出异常;如果设置合理,那么会打印日志,内容为:当前rpc调用设置的超时时长,等待的线程数,可用锁的数量,打印完日志后抛出异常。
可以看到单向调用其实指的就是没有返回值,监听器的future也只是返回一下请求是否成功,并没有返回值。
再下面是声明了三个handler,和我们自己写的netty demo里的serverHandler,clientHandler一样的
org.apache.rocketmq.remoting.netty.NettyRemotingServer.HandshakeHandler第一个是HandshakeHandler 也就是握手建立连接的handler
@ChannelHandler.Sharable class HandshakeHandler extends SimpleChannelInboundHandler{
内部实现了一个方法:channelRead0
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
// mark the current position so that we can peek the first byte to determine if the content is starting with
// TLS handshake
msg.markReaderIndex();
byte b = msg.getByte(0);
if (b == HANDSHAKE_MAGIC_CODE) {
switch (tlsMode) {
case DISABLED:
ctx.close();
log.warn("Clients intend to establish an SSL connection while this server is running in SSL disabled mode");
break;
case PERMISSIVE:
case ENFORCING:
if (null != sslContext) {
ctx.pipeline()
.addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
.addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
log.info("Handlers prepended to channel pipeline to establish SSL connection");
} else {
ctx.close();
log.error("Trying to establish an SSL connection but sslContext is null");
}
break;
default:
log.warn("Unknown TLS mode");
break;
}
} else if (tlsMode == TlsMode.ENFORCING) {
ctx.close();
log.warn("Clients intend to establish an insecure connection while this server is running in SSL enforcing mode");
}
// reset the reader index so that handshake negotiation may proceed as normal.
msg.resetReaderIndex();
try {
// Remove this handler
ctx.pipeline().remove(this);
} catch (NoSuchElementException e) {
log.error("Error while removing HandshakeHandler", e);
}
// Hand over this message to the next .
ctx.fireChannelRead(msg.retain());
}
}
首先校验传送信息msg的首字符是否是0x16,如果不是,则认为是不安全的连接,关闭掉ctx并打印日志。
如果是安全连接,接下来校验tlsMode是否是符合要求的模式,如果是关闭模式,则同样不能建立连接。
对于符合要求的连接,做以下三行操作:
ctx.pipeline()
.addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))
.addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
在ctx的pipeline管道里面添加两个handler,第一行加了一个自定义handler,第二行是添加了一个编码器用于加密。
握手连接handler结束后,再下面是NettyServerHandler:
org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler@ChannelHandler.Sharable class NettyServerHandler extends SimpleChannelInboundHandler{ @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); } }
内容比较简单,可以看到是对接收到的消息做处理
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
处理分两种类型,请求处理和响应处理
内部先看下对传来的请求是如何处理的:
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair matched = this.processorTable.get(cmd.getCode());
final Pair pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
可以看到进来首先从processTable中先把netty请求处理器和执行器(线程池)准备好,这里我们发现一个规律,就是无论执行什么任务,做什么动作。这两个对象总是会传:一个processor(处理器)一个执行器(executorService)。
同时我们看下这个rocketmq内部写的pair:
org.apache.rocketmq.remoting.common.Pairpublic class Pair{ private T1 object1; private T2 object2; public Pair(T1 object1, T2 object2) { this.object1 = object1; this.object2 = object2; } public T1 getObject1() { return object1; } public void setObject1(T1 object1) { this.object1 = object1; } public T2 getObject2() { return object2; } public void setObject2(T2 object2) { this.object2 = object2; } }
内容很简单,就是两个object一对儿。
接下来初始化一个Runnable接口:
if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
doBeforeRpcHooks(remoteAddr, cmd);
final RemotingResponseCallback callback = new RemotingResponseCallback() {
@Override
public void callback(RemotingCommand response) {
doAfterRpcHooks(remoteAddr, cmd, response);
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {
}
}
}
};
if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
processor.asyncProcessRequest(ctx, cmd, callback);
} else {
NettyRequestProcessor processor = pair.getObject1();
RemotingCommand response = processor.processRequest(ctx, cmd);
callback.callback(response);
}
} catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString());
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
};
先从channel中获取到发送请求的客户端的地址:remoteAddr,然后根据这个地址去执行rpcHooks,还记得我们之前记过的,RPCHooks缓存的是执行请求前或获取响应后要执行的内容。
而在定义回调方法callback内,先执行响应后的动作,之后把响应回传给客户端,对于单向的rpc调用(无返回值),则不做操作。
继续往下,根据处理器是否异步同步做了不同的处理,但都调用了上面定义的callback方法
再往下就是从processorTable中获取执行器提交上面定义的runnable任务了。
回到NettyServerHandler,其实
processMessageReceived(ctx, msg)
这个方法就是执行器提交了收到的请求,并执行了前置和后置的钩子方法,并把响应通过channel返回回去。再看下
processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd)
处理响应的方法
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
responseFuture.setResponseCommand(cmd);
responseTable.remove(opaque);
if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);
} else {
responseFuture.putResponse(cmd);
responseFuture.release();
}
} else {
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
}
}
可以看到方法内部从responseTable中取出responseFuture,把ResponseCommand塞入responseFuture,执行responseFuture的回调方法,之后把responseTable中的对应Future删除。
接下来第三个handler:NettyConnectManageHandler、再回顾下前两个handler,第一个是握手handler,也就是建立连接;第二个是NettyServerHandler,也就是服务端接受响应或请求的handler。
第三个handler名称为netty连接管理handler:
org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyConnectManageHandler@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress);
super.channelUnregistered(ctx);
}
首先是注册和解注册方法:channelRegistered、channelUnregistered,基本是继承了父类方法,只是从ChannelHandlerContext中取出地址打了日志。
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
super.channelActive(ctx);
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
}
}
接下来重写channelActive方法,除了日志和继承父方法外,还把本次连接的remoteAddress地址,channel本地存下来,存到了NettyEvent事件中的一个事件队列:
private final linkedBlockingQueueeventQueue = new linkedBlockingQueue ();
可以理解为,服务端每接收到一个客户端的连接,就在服务器端留一个存根,也就是把channel存下来。后期这个channel可以用于比如给客户端主动发消息等操作。
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);
super.channelInactive(ctx);
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
}
与channelActive对应的一个channelInactive方法,逻辑基本一样,唯一区别是向NettyEvent队列中放的是关闭的NettyEvent事件。
接下来是重写的
userEventTriggered(ChannelHandlerContext ctx, Object evt)
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
RemotingUtil.closeChannel(ctx.channel());
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this
.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
}
}
}
ctx.fireUserEventTriggered(evt);
}
这个方法其实就是同一个地址来的,建立了重复连接。比如此时节点A已经与服务端建立了连接,结果节点A又发来一个连接,则触发trigger。可以看到这里的处理是,关闭掉这个新连接,并把该连接放置到NettyEvent队列中,状态置为“闲置IDLE”。
接下来一个异常捕获方法exceptionCaught
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);
log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
}
RemotingUtil.closeChannel(ctx.channel());
}
可以看到同样是往NettyEvent队列里放置了channel,状态置为“异常EXCEPTION”。
到这儿NettyRemotingServer类就结束了,其实这些重写是很好的例子demo,之后使用netty做底层通信,都可以拿来借鉴。我看到这里,就想起之前自己写的netty-demo,其实也是同样的思路,对于客户端传来的连接ChannelHandlerContext,存到本地,后期用于向客户端主动推送。但是RocketMQ底层显然更规范和标准,每个事件都有对应的枚举。
其实到这儿也对netty的channel有了一个全新的认识,netty对于任何事件,比如:建立连接,断开连接,抛送异常,发送消息等,这些动作其实都是通过channel来传输,根据上面我们就能看到,这些动作的结果,其实就是给了一个ChannelHandlerContext。
现在回到NamesrvController,看了这么多,其实我们才读到NamesrvController的第56行,哈哈。继续往下,BrokerHousekeepingService,点进去看下:
public class BrokerHousekeepingService implements ChannelEventListener {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final NamesrvController namesrvController;
public BrokerHousekeepingService(NamesrvController namesrvController) {
this.namesrvController = namesrvController;
}
@Override
public void onChannelConnect(String remoteAddr, Channel channel) {
}
@Override
public void onChannelClose(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
@Override
public void onChannelException(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
}
首先是继承了ChannelEventListener,channel事件监听器。结构其实很简单,对于channel的关闭,异常,空闲事件,都做了destroy,还记得我们前面看到的NameSrvHandler中的trigger方法吗,在那里把重复的连接置为了空闲,可以看到置为空闲其实就是destroy掉了。接下来看看这个
onChannelDestroy(remoteAddr, channel)
方法内部做了什么。
public void onChannelDestroy(String remoteAddr, Channel channel) {
String brokerAddrFound = null;
if (channel != null) {
try {
try {
this.lock.readLock().lockInterruptibly();
Iterator> itBrokerLiveTable =
this.brokerLiveTable.entrySet().iterator();
while (itBrokerLiveTable.hasNext()) {
Entry entry = itBrokerLiveTable.next();
if (entry.getValue().getChannel() == channel) {
brokerAddrFound = entry.getKey();
break;
}
}
} finally {
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("onChannelDestroy Exception", e);
}
}
进来首先,如果channel不为空,则获取读锁,这里注意RouteInfoManager内部是声明了读写锁的:
private final ReadWriteLock lock = new ReentrantReadWriteLock();
获取锁成功后,接下来从brokerLiveTable(在线的broker表)中遍历出value(BrokerLiveInfo)的channel为目标channel的key作为brokerAddrFound。也就是说第一个if,就是从brokerLiveTable中取出一个key:brokerAddrFound(broker的地址)
整个方法的主要部分都集中在对这个取出的key:brokerAddrFound的处理上。虽然代码很长,但整体逻辑并不复杂,都是删除key为brokerAddrFound的各种表中的元素:(合法化处理还有异常捕获这里就不分析了)
this.lock.writeLock().lockInterruptibly(); this.brokerLiveTable.remove(brokerAddrFound); this.filterServerTable.remove(brokerAddrFound);
获取读锁(增删改都是取读锁),先从brokerLiveTable,filterServerTable中移除key为broker地址的元素。这也符合该方法的逻辑:channelDestory。
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
BrokerData brokerData = itBrokerAddrTable.next().getValue();
Iterator> it = brokerData.getBrokerAddrs().entrySet().iterator();
while (it.hasNext()) {
Entry entry = it.next();
Long brokerId = entry.getKey();
String brokerAddr = entry.getValue();
if (brokerAddr.equals(brokerAddrFound)) {
brokerNameFound = brokerData.getBrokerName();
it.remove();
log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
brokerId, brokerAddr);
break;
}
}
if (brokerData.getBrokerAddrs().isEmpty()) {
removeBrokerName = true;
itBrokerAddrTable.remove();
log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
brokerData.getBrokerName());
}
}
从brokerData中移除
if (brokerNameFound != null && removeBrokerName) {
Iterator>> it = this.clusterAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry> entry = it.next();
String clusterName = entry.getKey();
Set brokerNames = entry.getValue();
boolean removed = brokerNames.remove(brokerNameFound);
if (removed) {
log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
brokerNameFound, clusterName);
if (brokerNames.isEmpty()) {
log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
clusterName);
it.remove();
}
break;
}
}
}
clusterAddrTable 集群地址表 移除
if (removeBrokerName) {
Iterator>> itTopicQueueTable =
this.topicQueueTable.entrySet().iterator();
while (itTopicQueueTable.hasNext()) {
Entry> entry = itTopicQueueTable.next();
String topic = entry.getKey();
List queueDataList = entry.getValue();
Iterator itQueueData = queueDataList.iterator();
while (itQueueData.hasNext()) {
QueueData queueData = itQueueData.next();
if (queueData.getBrokerName().equals(brokerNameFound)) {
itQueueData.remove();
log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
topic, queueData);
}
}
if (queueDataList.isEmpty()) {
itTopicQueueTable.remove();
log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
topic);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
itTopicQueueTable topic队列表 移除,之后在finally代码块中释放写锁
回到NamesrvController,关于brokerHousekeepingService就结束了。
private ExecutorService remotingExecutor; private Configuration configuration;
接下来是声明一个执行器,和一个配置类Configuration,进入Configuration看下:
org.apache.rocketmq.common.Configurationprivate final InternalLogger log; private List
首先是声明日志工具,配置列表等 值得注意的是storePathField存储路径域,数据类型为Field,提供了访问类或接口的途径。
然后是一个读写锁,一个数据版本dataVersion,进入看下:
org.apache.rocketmq.common.DataVersionpublic class DataVersion extends RemotingSerializable {
private long timestamp = System.currentTimeMillis();
private AtomicLong counter = new AtomicLong(0);
结构比较简单,一个long类型的时间戳,一个原子类型的long遍历作为计数器。内容也比较简单,每次操作该类的实例化对象,都会使版本号counter加一,并记录当前的操作时间timestamp。
回到Configuration配置类,除掉这些参数声明外,比较重要的就是初始化方式了,也就是在初始化的时候把配置内容都塞到配置列表:
public Configuration(InternalLogger log, Object... configObjects) {
this.log = log;
if (configObjects == null || configObjects.length == 0) {
return;
}
for (Object configObject : configObjects) {
registerConfig(configObject);
}
}
直接进入注册配置:registerConfig中看下
public Configuration registerConfig(Object configObject) {
try {
readWriteLock.writeLock().lockInterruptibly();
try {
Properties registerProps = MixAll.object2Properties(configObject);
merge(registerProps, this.allConfigs);
configObjectList.add(configObject);
} finally {
readWriteLock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("registerConfig lock error");
}
return this;
}
内容可以看到首先获取读锁,object转成properties,然后把待注册的配置合到(merge)allConfigs(全部配置)中,然后再把配置添加到全局的configObjectList中,之后释放锁。也就是把配置类注册到待注册的集合或者配置类中。这里我们看到使用了MixAll这个类,类的内容比较多,而且是一个工具类,所以就只看下这里用到的
object2Properties(final Object object)
方法,功能我们已经看到了,就是从object转到Properties,其实这个方法我们后续也可以借鉴的。
public static Properties object2Properties(final Object object) {
Properties properties = new Properties();
Field[] fields = object.getClass().getDeclaredFields();
for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
String name = field.getName();
if (!name.startsWith("this")) {
Object value = null;
try {
field.setAccessible(true);
value = field.get(object);
} catch (IllegalAccessException e) {
log.error("Failed to handle properties", e);
}
if (value != null) {
properties.setProperty(name, value.toString());
}
}
}
}
return properties;
}
这个方法本身不是特别难,获取到对象object的类文件,再获取到类文件内部的 属性名,属性值,塞入properties。一个细节是这些属性要排除掉静态修饰符的,this开头的。也就是说该方法不会把静态对象和this修饰的对象合入properties。
回到Configuration类继续往下:
public void setStorePathFromConfig(Object object, String fieldName) {
assert object != null;
try {
readWriteLock.writeLock().lockInterruptibly();
try {
this.storePathFromConfig = true;
this.storePathObject = object;
// check
this.storePathField = object.getClass().getDeclaredField(fieldName);
assert this.storePathField != null
&& !Modifier.isStatic(this.storePathField.getModifiers());
this.storePathField.setAccessible(true);
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
} finally {
readWriteLock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("setStorePathFromConfig lock error");
}
}
这个方法是从配置文件中读取存储路径。这里其实我们能发现,Configuration是一个配置类,也就是pojo类,而内部有很多操作此类的方法,其实也就是充血模型了,而rocketMQ底层我们能注意到,很多充血模型。包括之前看到的ResponseFuture,LoggingEvent等,都是充血模型。
在这个类里,存储路径直接被存到Field中。
紧接着是和
setStorePathFromConfig()
方法对应的get方法:
if (this.storePathFromConfig) {
try {
realStorePath = (String) storePathField.get(this.storePathObject);
} catch (IllegalAccessException e) {
log.error("getStorePath error, ", e);
}
}
忽略掉对锁的获取和释放以及合理校验,最核心的代码其实就是这行,可以看到从storePathField取出来对象转为String就是存储路径了。
接下来一个update(Properties properties) 同样是把properties添加到Configuration中:
mergeIfExist(properties, this.allConfigs);
for (Object configObject : configObjectList) {
// not allConfigs to update...
MixAll.properties2Object(properties, configObject);
}
核心代码就是这两行,把待更新的properties添加到allConfigs和configObjectList,这个和之前的合并都是一样的。更新末尾还有一个
persist()
方法:
readWriteLock.readLock().lockInterruptibly();
try {
String allConfigs = getAllConfigsInternal();
MixAll.string2File(allConfigs, getStorePath());
主要代码有如上三行,可以看到是配置持久化,所以取的是读锁,从内置的配置中获取所有配置内容,然后持久化到文件。
接下来看下
MixAll.string2File( allConfigs, getStorePath( ) )
做了什么
public static void string2File(final String str, final String fileName) throws IOException {
String tmpFile = fileName + ".tmp";
string2FileNotSafe(str, tmpFile);
String bakFile = fileName + ".bak";
String prevContent = file2String(fileName);
if (prevContent != null) {
string2FileNotSafe(prevContent, bakFile);
}
File file = new File(fileName);
file.delete();
file = new File(tmpFile);
file.renameTo(new File(fileName));
}
可以看到是创建了两个不同后缀的文件,一个是.tmp,一个是.bak,先进入string2FileNotSafe(str, tmpFile)看下干了什么:
File file = new File(fileName);
File fileParent = file.getParentFile();
if (fileParent != null) {
fileParent.mkdirs();
}
FileWriter fileWriter = null;
try {
fileWriter = new FileWriter(file);
fileWriter.write(str);
可以看到内容很简单,根据文件路径创建文件,把内容写入文件中。
回到
string2File(final String str, final String fileName);
看下
String prevContent = file2String(fileName);
这一行干了什么,从外面看,是从文件名获取到一个前置内容prevContent,判断应该是把文件的前置内容落入.bak文件中
try {
inputStream = new FileInputStream(file);
int len = inputStream.read(data);
result = len == data.length;
} finally {
if (inputStream != null) {
inputStream.close();
}
}
if (result) {
return new String(data);
}
可以看到如果从文件中读出的字节数等于文件的字节数(也就是完全读出成功了),则返回读出的内容。
if (prevContent != null) {
string2FileNotSafe(prevContent, bakFile);
}
此时就明白这个prevContent的含义了,其实就是先从源文件中预读出一次,预读出成功后,再把预读出的内容全部写入到bak后缀的文件中。其实就是做了一层缓存,先预读出,再把缓存写入源文件。此时我们在.bak和.tmp两个后缀的文件中有读出内容,下面两个操作,
File file = new File(fileName); file.delete(); file = new File(tmpFile); file.renameTo(new File(fileName));
就把.tmp文件冲掉了,应该是这样的,这两行代码其实没有特别搞清楚。但可以肯定.bak文件是一定拥有全部输出内容的。
此时我们就把
persist( )
这个持久化方法读明白了,之前我们自己做持久化,就是落库落库落库,落mysql,但在RocketMq的源码中,我们看到了“持久化”的底层操作,其实就是io流写入本地硬盘。



