Netty框架之概述及基本组件介绍
Reactor网络编程模型解析
前言在上篇博客介绍完netty框架的基本组件介绍和概述,也跟着代码看了下NioEventLoopGroup的启动过程,以及基于Reactor线程模型的解析,它是开发Netty的核心思想,也是整个Netty框架的核心思想;这篇文章分析Netty中责任链模式,该模式给netty框架提供了大量一些扩展,使得netty框架更适合在业务场景上使用。
设计模式 - 责任链模式
Netty中的责任链模式也就是来自设计模式中责任链模式。
责任链模式( Chain of Responsibility Pattern)为请求创建了一个处理对象的链,每一个链条处理一件事情; 发起请求和具体处理请求的过程进行解耦: 职责链上的处理者负责处理请求,客户只需要将 请求发送到职责链上即可,无须关心请求的处理细节和请求的传递。 每一个环节都会做一个处理,到达最后得到最后结果。 中间点中每个点都解耦,并不依赖类似于上面图的情况,每个部分做的事情不同,并不感染,感觉就像是一个个组件进行组装
责任链的好处
- 降低了对象之间的耦合度。该模式使得一个对象无须知道到底是哪一个对象处理其请求以及链的结构,发送者和接收者也无须拥有对方的明确信息。
- 增强了系统的可扩展性。可以根据需要增加新的请求处理类,满足开闭原则。
- 增强了给对象指派职责的灵活性。当工作流程发生变化,可以动态地改变链内的成员或者调动它们的次序,也可动态地新增或者删除责任。
- 责任链简化了对象之间的连接。每个对象只需保持一个指向其后继者的引用,不需保持其他所有处理者的引用,这避免了使用众多的 if 或者 if···else 语句。
- 责任分担。每个类只需要处理自己该处理的工作,不该处理的传递给下一个对象完成,明确各类的责任范围,符合类的单一职责原则。
缺点
- 不能保证每个请求都被处理,有可能这个处理并不会到末端就抛出掉了某个请求
- 对比较长的职责链,对象是非常多的,有可能会出现占用资源过大的问题
- 职责链建立的合理性要靠客户端来保证,因此也可能出现写出错的情况
在netty中主要就是利用责任链模式,进行业务上处理,达到netty的耦合降低,扩展性好的特点。
实现责任链模式一定有四个要素:
- 处理抽象类 抽象出方法用于链条的传递 next 方法
- 具体的处理器实现类 用于处理链条之间的数据
- 保存处理器信息 无论是数组实现 还是链条实现 都需要一个容器来存储这些实现类然后让链条连接起来
- 处理执行 开始执行的方法 执行器
代码实现
- 抽象处理类以及 处理抽象类 Pipeline 来进行管理并持有 抽象处理类
public class Pipeline {
class HandlerChainContext {
HandlerChainContext next;// 持有下一个节点
AbstractHandler handler;
public HandlerChainContext(AbstractHandler handler) {
this.handler = handler;
}
// 将节点持有下去
void handler(Object arg0) {
this.handler.doHandler(this, arg0);
}
void runNext(Object arg0) {
if (this.next != null) {
this.next.handler(arg0);
}
}
}
static abstract class AbstractHandler {
abstract void doHandler(HandlerChainContext context, Object arg0);
}
}
- 对Pipeline 进行完善
// 持有上下文
public HandlerChainContext context = new HandlerChainContext(new AbstractHandler() {
@Override
void doHandler(HandlerChainContext context, Object arg0) {
System.out.println("这是头部");
context.runNext(arg0);
}
});
- 增加责任链的方法及调用方法
// 添加责任链
public void addLast(AbstractHandler handler) {
HandlerChainContext next = context;
while (next.next != null) {
next = next.next;
}
next.next = new HandlerChainContext(handler);
}
// 开始调用
public void requestProcess(Object arg0) {
context.handler(arg0);
}
- 最后创建一个具体实现处理类 和测试代码
static class Handler1 extends AbstractHandler {
@Override
void doHandler(HandlerChainContext handlerChainContext, Object arg0) {
arg0 = arg0.toString() + "..handler1的的处理.....";
System.out.println("我是Handler1的实例,我在处理:" + arg0);
// 继续执行下一个
handlerChainContext.runNext(arg0);
}
}
public static void main(String[] args) {
Pipeline p = new Pipeline();
p.addLast(new Handler1());
p.requestProcess("开始了....");
}
得到下面的打印结果
这是头部 我是Handler1的实例,我在处理:开始了........这是头部......handler1的的处理.....
完整代码
public class Pipeline {
// 持有上下文
public HandlerChainContext context = new HandlerChainContext(new AbstractHandler() {
@Override
void doHandler(HandlerChainContext context, Object arg0) {
System.out.println("这是头部");
context.runNext(arg0 + "....这是头部....");
}
});
// 添加责任链
public void addLast(AbstractHandler handler) {
HandlerChainContext next = context;
while (next.next != null) {
next = next.next;
}
next.next = new HandlerChainContext(handler);
}
// 开始调用
public void requestProcess(Object arg0) {
context.handler(arg0);
}
class HandlerChainContext {
HandlerChainContext next;// 持有下一个节点
AbstractHandler handler;
public HandlerChainContext(AbstractHandler handler) {
this.handler = handler;
}
// 将节点持有下去
void handler(Object arg0) {
this.handler.doHandler(this, arg0);
}
void runNext(Object arg0) {
if (this.next != null) {
this.next.handler(arg0);
}
}
}
static abstract class AbstractHandler {
abstract void doHandler(HandlerChainContext context, Object arg0);
}
static class Handler1 extends AbstractHandler {
@Override
void doHandler(HandlerChainContext handlerChainContext, Object arg0) {
arg0 = arg0.toString() + "..handler1的的处理.....";
System.out.println("我是Handler1的实例,我在处理:" + arg0);
// 继续执行下一个
handlerChainContext.runNext(arg0);
}
}
public static void main(String[] args) {
Pipeline p = new Pipeline();
p.addLast(new Handler1());
p.requestProcess("开始了....");
}
}
Netty中的ChannelPipeline责任链
在netty中实现责任链的就是Pipeline管道
Pipeline 管道 保存了通道所有处理器信息。 创建新channel 时自动创建一个专有的 pipeline 。 入站 事件 和出站 操作 会调用 pipeline 上的处理器 在源码中怎么为每个channel 创建一个pipeline的- NioEventLoop的run是在初始化时就会启动run方法,然后监听是否有事件过来
- 走到 processSelectedKey 方法中的 事件读取 niomessageunsafe中
- doReadMessages 方法中 创建 一个NioSocketChannel
- 在抽象类AbstractChannel中持有者pipeline
- 会在初始化时就创建 这个pipeline
- Pipeline管道中入站事件 ,以及出站事件
- handler处理器是什么,有什么作用?
- Pipeline 如何维护 handler 的。
- handler 的执行。
入站事件:
通常指I/O线程生成了入站数据。(通俗理解: 从socket底层自己往上冒上来的事件都是入站)比如EventLoop收到selector的OP_READ事件,入站处理器调用socketChannel.read(ByteBuffer) 接收到数据后,这将导致通道的ChannelPipeline中包含的下一个中的channelRead方法被调用。
native层往应用写数据 包括各个事件就是入站事件。
出站事件: 经常是指I/O线程执行实际的输出操作。 (通俗理解:想主动往socket底层操作的事件的都是出站) 比如bind方法用意是请求server socket绑定到给定的SocketAddress,这将导致通道的 ChannelPipeline中包含的下一个出站处理器中的 bind 方法被调用。应用层往native层写数据 连接关闭 写入数据等等都是出站事件
- 在DefaultChannelPipeline中就有头和尾 的Context 类似责任链中 持有的 具体的处理器实现类
- 初始化时 将连接起来 头和尾连接起来
- 当消息过来时就会走AbstractNioByteChannel 中read 方法做包括申请多少内存 以及 pipeline.fireChannelRead(byteBuf); 去读取数据内容 这里的ByteBuf 是封装过了ByteBuffer的一个类。
- 就是fire 开头的都是触发入站的事件
- 而对于出站事件则是 包括wite read bind等方法来表示的
- 源码中对于入站和出战事件,完全按照 入站中头开始,出战按尾部开始
- 继续走下去找到读取的方法
- 调用到下一个channelpipline方法
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
- 在pipeline 中 add 和remove则是维护链表上的数据
- 对于context中也存在许多的读取方法等等
作为有 pipeline 以及context 事件的持有者,并在实现的子类中持有对应的handler 利用handler方法,而context就是为了维护链的结构而产生的
所以要对事件进行做处理,因此需要实现 ChannelInboundHandlerAdapter 或者 对应的事件即可
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("收到客户端数据,还给客户端:" + msg);
ctx.write(msg);
// ctx.channel().write("");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
在使用时,添加进去就行
ServerBootstrap b = new ServerBootstrap();
// 3、配置启动器
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerHandler());
}
});
ChannelDuplexHandler
这个Handler 是继承ChannelInboundHandlerAdapter 并且实现了ChannelOutboundHandler的既能入站 也可以出站
在源码中有很多这种handler 实现 提供给我们可以我们对入站事件和出站事件进行处理。
Netty中事件的定义 Pipeline中的handler ChannelHandler : 用于处理 I/O 事件或拦截 I/O 操作,并转发到 ChannelPipeline 中的下一个处理器。 这个顶级接口定义功能很弱,实际使用时会去实现下面两大子接口: 处理入站 I/O 事件的 ChannelInboundHandler 、处理出站 I/O 操作的 ChannelOutboundHandler适配器类: 为了开发方便,避免所有 handler 去实现一遍接口方法, Netty 提供了简单的实现类: ChannelInboundHandlerAdapter 处理入站 I/O 事件 ChannelOutboundHandlerAdapter 来处理出站 I/O 操作 ChannelDuplexHandler 来支持同时处理入站和出站事件 ChannelHandlerContext : 实际存储在 Pipeline 中的对象并非 ChannelHandler ,而是上下文对象。 将handler ,包裹在上下文对象中,通过上下文对象与它所属的 ChannelPipeline 交互,向上或向下传递事件 或者修改 pipeline 都是通过上下文对象。 Pipeline中的handler
ChannelPipeline是线程安全的,ChannelHandler可以在任何时候添加或删除。
例如,你可以在即将交换敏感信息时插入加密处理程序,并在交换后删除它。 一般操作,初始化的时候增加进去,较少删除。下面是 Pipeline 中管理 handler 的 API Handler的执行分析 当入站事件时,执行顺序是 1 、 2 、 3 、 4 、 5 当出站事件时,执行顺序是 5 、 4 、 3 、 2 、 1 在这一原则之上, ChannelPipeline 在执行时会进行选择 3 和 4 为出站处理器,因此入站事件的实际执行是 :1 、 2 、 5 1 和 2 为入站处理器,因此出站事件的实际执行是 :5 、 4 、 3 不同的入站事件会触发 handler 不同的方法执行: 上下文对象中 fire** 开头的方法,代表入站事件传播和处理 其余的方法代表出站事件的传播和处理。 结构架构图构成一个channel
构成整体的图
总结 整篇文章从责任链模式的一种实现方式开始,从源码去深入理解netty怎么使用责任链模式来扩展,以及用户在管道中有一个或多个channelhandler来接收I/O事件(例如读取)和请求I/O操作(例如写入和关闭)。 一个典型的服务器在每个通道的管道中都有以下处理程序,但是根据协议和业务逻辑的复杂性和特 征,可能会有所不同: 协议解码器——将二进制数据(例如ByteBuf)转换为Java对象。 协议编码器——将Java对象转换为二进制数据。 业务逻辑处理程序——执行实际的业务逻辑(例如数据库访问)。



