一、服务端与客户端设计
1.服务启动类2.服务端管道处理器3.客户端启动类4.客户端管道处理器5.启动验证
·服务端·客户端 二、示例分析
1.ServerBootstrap的Group方法2.ServerBootstrap的Channel方法
1.反射实现类ReflectiveChannelFactory2.非空校验并赋值channelFactory 3.ServerBootstrap的Option、ChildOption方法
1.ChannelConfig实现类DefaultServerSocketChannelConfig 4.ServerBootstrap的ChildHandler方法,实现类
一、服务端与客户端设计 1.服务启动类package com.example.netty.server;
import com.example.netty.server.handler.NettyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//创建BossGroup 和 WorkerGroup
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创建服务端的启动对象 配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程进行设置
bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer() {
//给 PipLine 设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("Server is Ready");
//绑定一个端口并且同步 生成了一个 ChannelFuture 对象 启动服务
ChannelFuture cf = bootstrap.bind(6668).sync();
//对关闭通道进行侦听
cf.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
2.服务端管道处理器
package com.example.netty.server.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
System.out.println("Server Read Thread :" + Thread.currentThread().getName());
System.out.println("Server ctx=" + ctx);
//将message转成一个ByteBuf
ByteBuf buf = (ByteBuf) message;
System.out.println("Client Send Message is:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("Client Ip Address is:" + ctx.channel().remoteAddress());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//将数据写入到缓冲区并刷新
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Client",CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
3.客户端启动类
package com.example.netty.client;
import com.example.netty.client.handler.NettyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
//客户端需要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端启动对象
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("Client OK ...");
//启动客户端 连接服务端
ChannelFuture cf = bootstrap.connect("127.0.0.1",6668).sync();
//关闭通道增加监听
cf.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
4.客户端管道处理器
package com.example.netty.client.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client :" + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Server ", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("Server SendBack Message:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("Server Ip Address :" + ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
5.启动验证
·服务端
Connected to the target VM, address: '127.0.0.1:60934', transport: 'socket' Java HotSpot(TM) 64-Bit Server VM warning: Sharing is only supported for boot loader classes because bootstrap classpath has been appended Server is Ready Server Read Thread :nioEventLoopGroup-3-1 Server ctx=ChannelHandlerContext(NettyServerHandler#0, [id: 0x7748b291, L:/127.0.0.1:6668 - R:/127.0.0.1:60940]) Client Send Message is:Hello Server Client Ip Address is:/127.0.0.1:60940·客户端
Connected to the target VM, address: '127.0.0.1:60937', transport: 'socket' Java HotSpot(TM) 64-Bit Server VM warning: Sharing is only supported for boot loader classes because bootstrap classpath has been appended Client OK ... Client :ChannelHandlerContext(NettyClientHandler#0, [id: 0xb9a0d3d1, L:/127.0.0.1:60940 - R:/127.0.0.1:6668]) Server SendBack Message:Hello Client Server Ip Address :/127.0.0.1:6668二、示例分析 1.ServerBootstrap的Group方法
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
}
2.ServerBootstrap的Channel方法
public B channel(Class extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
1.反射实现类ReflectiveChannelFactory
package io.netty.channel; import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.StringUtil; import java.lang.reflect.Constructor; public class ReflectiveChannelFactory2.非空校验并赋值channelFactoryimplements ChannelFactory { private final Constructor extends T> constructor; public ReflectiveChannelFactory(Class extends T> clazz) { ObjectUtil.checkNotNull(clazz, "clazz"); try { this.constructor = clazz.getConstructor(); } catch (NoSuchMethodException e) { throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) + " does not have a public non-arg constructor", e); } } @Override public T newChannel() { try { return constructor.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t); } } @Override public String toString() { return StringUtil.simpleClassName(ReflectiveChannelFactory.class) + '(' + StringUtil.simpleClassName(constructor.getDeclaringClass()) + ".class)"; } }
public B channelFactory(io.netty.channel.ChannelFactory extends C> channelFactory) {
return channelFactory((ChannelFactory) channelFactory);
}
@Deprecated
public B channelFactory(ChannelFactory extends C> channelFactory) {
ObjectUtil.checkNotNull(channelFactory, "channelFactory");
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return self();
}
3.ServerBootstrap的Option、ChildOption方法
多个客户端如果请求同时到达,则需要一个队列来进行放置,以便服务端依次处理 Option ---> bossGroup ChildOption ---> workGroup1.ChannelConfig实现类DefaultServerSocketChannelConfig
@Override public4.ServerBootstrap的ChildHandler方法,实现类boolean setOption(ChannelOption option, T value) { validate(option, value); if (option == SO_RCVBUF) { setReceiveBufferSize((Integer) value); } else if (option == SO_REUSEADDR) { setReuseAddress((Boolean) value); } else if (option == SO_BACKLOG) { setBacklog((Integer) value); } else { return super.setOption(option, value); } return true; }
DefaultChannelPipeline implements ChannelPipeline



