文章目录IDE:IntelliJ IDEA
- 分别基于IO、NIO、Netty的Java网络程序
- 一、Java NIO
- 1.1 NIO与传统IO对比
- 1.2 主要核心原理
- 1.2.1 缓冲区Buffer(负责数据的存取)
- **缓冲区的四个核心属性**
- **缓冲区的三个核心操作方法**
- **直接缓冲区和非直接缓冲区**
- 非直接缓冲区
- 直接缓冲区
- 直接缓冲区和非直接缓冲区的区别
- 1.2.2 通道Channel(负责数据的运输)
- **通道的主要实现类**
- **通道的获取方式**
- **通道之间的数据传输**
- **通道的分散读取和聚集写入**
- 1.2.3 选择器Selector(负责监控通道的IO状况)
- 选择键SelectionKey
- Selector常用方法
- 1.2.4 字符集Charset(编码解码)
- 编码
- 解码
- 1.2.5 NIO的网络通信(Selector的核心应用)
- 三大核心
- 阻塞与非阻塞
- 二、分别基于IO、NIO、Netty的Java网络程序Demo
- 2.1 基于IO的java网络程序
- 2.2 基于Netty的java网络程序
- 2.3 基于NIO的java网络程序
- 三、基于Web的聊天室(Springboot+netty)
- 3.1 新建工程
- 3.2 完整代码
- 3.3 测试
- 四、总结
- 五、参考文章
- 六、源代码
java 1.4版本推出了一种新型的IO API,与原来的IO具有相同的作用和目的;可代替标准java IO,只是实现的方式不一样,NIO是面向缓冲区、基于通道的IO操作;通过NIO可以提高对文件的读写操作。基于这种优势,现在使用NIO的场景越来愈多,很多主流行的框架都使用到了NIO技术,如Tomcat、Netty、Jetty等;所以学习和掌握NIO技术已经是一个java开发的必备技能了。
1.1 NIO与传统IO对比| NIO | IO |
|---|---|
| 面向缓冲区Buffer | 面向流Stream |
| 双向(基于通道Channel) | 单向(分别建立输入流、输出流) |
| 同步非阻塞(non-blocking) | 同步阻塞 |
| 选择器(Selector,多路复用) | 无 |
| 支持字符集编码解码解决方案,支持锁,支持内存映射文件的文件访问接口 | 无 |
主要包括:缓冲区(Buffer)、通道(Channel)和选择器(Selector)、字符集(Charset);首先获取用于连接IO设备的通道channel以及用于容纳数据的缓冲区,利用选择器Selector监控多个Channel的IO状况(多路复用),然后操作缓冲区,对数据进行处理。
NIO基于Channel和Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个线程可以监听多个数据通道,即一个单独的线程现在可以管理多个输入和输出通道。
在javaNIO中负责数据的存取,底层缓冲区就是数组,用于存储不同数据类型的数据,根据不同的数据类型(Boolean除外),提供了相应类型的缓冲区:ByteBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer、CharBuffer。这7种数据类型的Buffer都是通过allocate获取非直接缓冲区或allocateDirect(ByteBuffer通过此方式创建)或wrap(除ByteBuffer意外其他的创建方式)获取直接缓冲区域,分配一个指定大小的缓冲区。
- 缓冲区的四个核心属性
- capacity:容量,表示缓冲区的最大容量,一旦声明就不能改变
- limit:界限,缓冲区中可以操作数据的大小(limit后面的数据不能读写)
- position:位置,表示缓冲区中正在操作数据的位置。
- mark:标志,表示记录当前position的位置,可以通过reset()恢复到mark的位置。
- 四者的关系:0
- 缓冲区的三个核心操作方法
- put():存数据到缓存区,写数据模式。
- flip():切换到读数据模式(position和limit改变,capacity不变)
- get():从缓冲区中拿数据。
- 直接缓冲区和非直接缓冲区
- 非直接缓冲区
通过:static ByteBuffer allocateDirect(int capacity)字节Buffer创建指定大小的缓冲区,其他类型的Buffer通过wrap()方法创建缓冲区;在JVM内存外开辟空间,在每次调用基础操作系统的一个本机IO之前或者之后,虚拟机都会避免将缓冲区的内容复制到中间缓冲区(或者从缓冲区中复制内容),缓冲区的内容驻留在屋里内存中,少一次复制过程,如果需要循环使用缓冲区,用直接缓冲区可以很大地提高性能;虽然直接缓冲区可以使JVM进行高效的I/O操作,但它使用的内存使操作系统分配的,绕过了JVM堆栈,建立和销毁比堆栈上的缓存区要更大的开销。
- 直接缓冲区
通过:static ByteBuffer allocateDirect(int capacity)字节Buffer创建指定大小的缓冲区,其他类型的Buffer通过wrap()方法创建缓冲区;在JVM内存外开辟空间,在每次调用基础操作系统的一个本机IO之前或者之后,虚拟机都会避免将缓冲区的内容复制到中间缓冲区(或者从缓冲区中复制内容),缓冲区的内容驻留在屋里内存中,少一次复制过程,如果需要循环使用缓冲区,用直接缓冲区可以很大地提高性能;虽然直接缓冲区可以使JVM进行高效的I/O操作,但它使用的内存使操作系统分配的,绕过了JVM堆栈,建立和销毁比堆栈上的缓存区要更大的开销。
- 直接缓冲区和非直接缓冲区的区别
- 字节缓冲区要么是直接的,要么是非直接的。如果为直接字节缓冲区,则 Java 虚拟机会尽最大努力直接在此缓冲区上执行本机 I/O 操作。也就是说,在每次调用基础操作系统的一个本机 I/O 操作之前(或之后),虚拟机都会尽量避免将缓冲区的内容复制到中间缓冲区中(或从中间缓冲区中复制内容)。
- 直接字节缓冲区可以通过调用此类的 allocateDirect() 工厂方法来创建。此方法返回的缓冲区进行分配和取消分配所需成本通常高于非直接缓冲区。直接缓冲区的内容可以驻留在常规的垃圾回收堆之外,因此,它们对应用程序的内存需求量造成的影响可能并不明显。所以,建议将直接缓冲区主要分配给那些易受基础系统的本机 I/O 操作影响的大型、持久的缓冲区。一般情况下,最好仅在直接缓冲区能在程序性能方面带来明显好处时分配它们。
- 直接字节缓冲区还可以通过 FileChannel 的 map() 方法 将文件区域直接映射到内存中来创建。该方法返回MappedByteBuffer 。 Java 平台的实现有助于通过 JNI 从本机代码创建直接字节缓冲区。如果以上这些缓冲区中的某个缓冲区实例指的是不可访问的内存区域,则试图访问该区域不会更改该缓冲区的内容,并且将会在访问期间或稍后的某个时间导致抛出不确定的异常。
字节缓冲区是直接缓冲区还是非直接缓冲区可通过调用其 Buffer.isDirect() 方法来确定。提供此方法是为了能够在性能关键型代码中执行显式缓冲区管理
Channel表示到IO设备(如:文件、套接字)的连接,即用于源节点与目标节点的连接,在java NIO中Channel本身不负责存储数据,主要是配合缓冲区,负责数据的传输。
- 通道的主要实现类
- FileChannel类:本地文件IO通道,用于读取、写入、映射和操作文件的通道。
- SocketChannel类:网络套接字IO通道,TCP协议,针对面向流的连接套接字的可选择通道(一般用在客户端)。
- ServerSocketChannel类:网络通信IO操作,TCP协议,针对面向流的监听套接字的可选择通道(一般用于服务端)。
- DatagramChannel类:针对面向数据报套接字的可选择通道,能够发送和接受UDP数据包的Channel。UDP协议,由于UDP是一种无连接的网络协议,只能发送和接受数据包。
以上几个类都实现了java.nio.channels.Channel接口。
- 通道的获取方式
java针对支持通道的类提供了getChannel()方法:
- 本地文件IO的Channel类有:FileInputStream/FileOutStream,RandomAccessFile。
- 网络套接字IO的Channel类:SocketChannel、ServerSocketChannel、DatagraSocket。
- 在JDK7.0中的NIO2针对各个通道提供静态方法open()
- 在JDK7.0中的NIO2的Files工具类的newByteChannel()
- 通道之间的数据传输
使用Channel的实现类的对应方法(在直接缓冲区):transferForm()和transferTo()
- 通道的分散读取和聚集写入
- 分散读取:将通道的数据读取到多个缓冲区buffer中。方法:channel.read()
- 聚集写入:将多个缓冲区的数据聚集写道通道channel中。方法:channel.write()
是selectableChannel的多路复用器,用于监控SelectableChannel的IO状况。利用selector可以实现在一个线程中管理多个通道Channel,selector是非阻塞IO的核心。
SelectableChannel的结构图:
- 选择键SelectionKey
方法/属性 描述 interset集合 Selector感兴趣的集合,用于指示选择器对管道关心的操作,可通过SelectionKey对象的interestOps()获取;最初,该兴趣集合是通过通道被注册到Selector时传进来的值。该集合不会被选择器改变,但是可以通过interestOps()改变,我们可以通过以下方法判断Selector是否对Channel的某种事件感兴趣:int interestSet=selectionKey.interestOps(); boolean isInterestedInAccept =(interestSet&SelectionKey.OP_ACCEPT)==SelectionKey.OP_ACCEPT read集合 通道已经就绪的操作的集合,表示一个通道准备好要执行的操作了,可通过SelectionKey对象的readOps()来获取相关通道已经就绪的操作。它是interest集合的子集,并且表示interest集合中从上次调用select()以后已经就绪的那些操作。 //int readSet=selectionKey.readOps();selectionKey.isAcceptable();//等价于selectionKey.readyOps()SelectionKey.OP_ACCEPT;selectionKey.isConnectable();selectionKey.isReadable();selectionKey.isWritable(); int interestOps() 获取感兴趣事件集合 int readyOps() 获取通道已经准备就绪的操作的集合 SelectableChannel channel() 获取注册通道 Selector selector() 返回选择器 boolean isReadable() 检查Channel中读事件是否就绪 boolean isWriteable() 检测Channel 中写事件是否就绪 boolean isConnectable() 检测Channel中连接是否就绪 boolean isAcceptable() 检测Channel中接收是否就绪 - Selector常用方法
方法 描述 Set< SelectionKey > keys() 所有的SelectionKey集合,代表注册在该Selector上的Channel selectedKeys() 被选择的SelectionKey集合。返回此Selector的已选择键集 int select() 监控所有注册的Channel,当它们中间有需要处理的IO操作时,该方法返回,并将对应的SelectionKey加入被选择的SelectionKey集合中,该方法返回这些Channel的数量。 int select(long timeout) 可以设置超时时长的select()操作 int selectNow() 执行一个立即返回的select()操作,该方法不会阻塞线程 Selector wakeUp() 使一个还未返回的select()方法立即返回 void close() 关闭该选择器
- 编码
字符串转成字节数组
- 解码
字节数组转成字符串
- 三大核心
- 通道(channel):负责管道节点的连接及数据的运输
- 缓冲区(buffer):负责数据的存取
- 选择器(selector):是selectableChannel的多路复用器,用于监控SelectableChannel的IO状况。
- 阻塞与非阻塞
- 传统的 IO 流都是阻塞式的。也就是说,当一个线程调用 read() 或 write()时,该线程被阻塞,直到有一些数据被读取或写入,该线程在此期间不能执行其他任务。因此,在完成网络通信进行 IO 操作时,由于线程会阻塞,所以服务器端必须为每个客户端都提供一个独立的线程进行处理,当服务器端需要处理大量客户端时,性能急剧下降。
- Java NIO 是非阻塞模式的。当线程从某通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。因此,NIO 可以让服务器端使用一个或有限几个线程来同时处理连接到服务器端的所有客户端。
使用IDEA创建服务端和客户端程序(新建java工程)
服务器端代码:
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
public class server {
public static void main(String[] args) throws IOException {
//创建客户端的Socket对象(SevereSocket)
//ServerSocket (int port)创建绑定到指定端口的服务器套接字
ServerSocket ss=new ServerSocket(50000);
//Socket accept()侦听要连接到此套接字并接受他
Socket s=ss.accept();
//获取输入流,读数据,并把数据显示在控制台
InputStream is=s.getInputStream();
byte[] bys=new byte[1024];
int len=is.read(bys);
String data=new String(bys,0,len);
System.out.println("数据是:"+data);
//释放资源
s.close();
ss.close();
}
}
客户端代码:
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
public class click {
public static void main(String[] args) throws IOException{
//创建客户端的Socket对象
Socket s=new Socket("127.0.0.1", 50000);
//获取输出流,写数据
OutputStream os=s.getOutputStream();
os.write("hello,物联网19级".getBytes());
//释放资源
s.close();
}
}
端口可能被占用,请自行修改
运行效果:
2.2 基于Netty的java网络程序使用IDEA创建服务端和客户端程序(新建java工程)
File->Project Structure…–>Modules–>Dependencies
服务器端代码:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
public class Server {
private int port;
public static void main(String[] args){
new Server(12345).start();
}
public Server(int port) {
this.port = port;
}
public void start() {
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup work = new NioEventLoopGroup();
try {
ServerBootstrap server = new ServerBootstrap()
.group(boss, work).channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast("decoder", new StringDecoder())
.addLast("encoder", new StringEncoder())
.addLast(new HelloWorldServerHandler());
}
});
//绑定端口
ChannelFuture future = server.bind().sync();
System.out.println("server started and listen " + port);
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
public static class HelloWorldServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("HelloWorldServerHandler active");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server channelRead..");
System.out.println(ctx.channel().remoteAddress()+"->Server :"+ msg.toString());
ctx.write("server write"+msg);
ctx.flush();
}
}
}
客户端代码:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class Click {
private static final String HOST = "localhost";
private static final int PORT= 12345;
public static void main(String[] args){
new Click().start(HOST, PORT);
}
public void start(String host, int port) {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap client = new Bootstrap().group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast("decoder", new StringDecoder())
.addLast("encoder", new StringEncoder())
.addLast(new HelloWorldClientHandler());
}
});
ChannelFuture future = client.connect(host, port).sync();
future.channel().writeAndFlush("Hello Netty Server ,I am a netty client");
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
public static class HelloWorldClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("HelloWorldClientHandler Active");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("HelloWorldClientHandler read Message:"+msg);
}
}
}
运行效果:
2.3 基于NIO的java网络程序使用IDEA创建服务端和客户端程序(新建java工程)
服务器端代码:
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class Server {
//网络通信IO操作,TCP协议,针对面向流的监听套接字的可选择通道(一般用于服务端)
private ServerSocketChannel serverSocketChannel;
private Selector selector;
public void start(Integer port) throws Exception {
serverSocketChannel = ServerSocketChannel.open();
selector = Selector.open();
//绑定监听端口
serverSocketChannel.socket().bind(new InetSocketAddress(port));
//设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
//注册到Selector上
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
startListener();
}
private void startListener() throws Exception {
while (true) {
// 如果客户端有请求select的方法返回值将不为零
if (selector.select(1000) == 0) {
System.out.println("当前没有任务!!!");
continue;
}
// 如果有事件集合中就存在对应通道的key
Set selectionKeys = selector.selectedKeys();
Iterator iterator = selectionKeys.iterator();
// 遍历所有的key找到其中事件类型为Accept的key
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isAcceptable())
handleConnection();
if (key.isReadable())
handleMsg(key);
iterator.remove();
}
}
}
private void handleConnection() throws Exception {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
}
private void handleMsg(SelectionKey key) throws Exception {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer attachment = (ByteBuffer) key.attachment();
channel.read(attachment);
System.out.println("当前信息: " + new String(attachment.array()));
}
public static void main(String[] args) throws Exception {
Server myServer = new Server();
myServer.start(8887);
}
}
客户端代码:
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
public class Click {
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
// 连接服务器
if (!socketChannel.connect(new InetSocketAddress("127.0.0.1", 8887))) {
while (!socketChannel.finishConnect()) {
System.out.println("connecting...");
}
}
//发送数据
String str = "hello,物联网19级";
ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes());
socketChannel.write(byteBuffer);
System.in.read();
}
}
端口可能会被占用,自行修改
运行效果:
三、基于Web的聊天室(Springboot+netty) 3.1 新建工程按照图示步骤依次完成。
3.2 完整代码工程文件:
NettychathatApplication
package com.example.nettychat;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.Environment;
import java.net.InetAddress;
import java.net.UnknownHostException;
@SpringBootApplication
public class NettychathatApplication {
public static void main(String[] args) throws UnknownHostException {
ConfigurableApplicationContext application = SpringApplication.run(NettychatApplication.class, args);
Environment env = application.getEnvironment();
String host = InetAddress.getLocalHost().getHostAddress();
String port = env.getProperty("server.port");
System.out.println("[----------------------------------------------------------]");
System.out.println("聊天室启动成功!点击进入:t http://" + host + ":" + port);
System.out.println("[----------------------------------------------------------");
WebSocketServer.inst().run(53134);
}
}
SessionGroup:
package com.example.nettychat;
import com.google.gson.Gson;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketframe;
import io.netty.util.concurrent.ImmediateEventExecutor;
import org.springframework.util.StringUtils;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public final class SessionGroup {
private static SessionGroup singleInstance = new SessionGroup();
// 组的映射
private ConcurrentHashMap groupMap = new ConcurrentHashMap<>();
public static SessionGroup inst() {
return singleInstance;
}
public void shutdownGracefully() {
Iterator groupIterator = groupMap.values().iterator();
while (groupIterator.hasNext()) {
ChannelGroup group = groupIterator.next();
group.close();
}
}
public void sendToOthers(Map result, SocketSession s) {
// 获取组
ChannelGroup group = groupMap.get(s.getGroup());
if (null == group) {
return;
}
Gson gson=new Gson();
String json = gson.toJson(result);
// 自己发送的消息不返回给自己
// Channel channel = s.getChannel();
// 从组中移除通道
// group.remove(channel);
ChannelGroupFuture future = group.writeAndFlush(new TextWebSocketframe(json));
future.addListener(f -> {
System.out.println("完成发送:"+json);
// group.add(channel);//发送消息完毕重新添加。
});
}
public void addSession(SocketSession session) {
String groupName = session.getGroup();
if (StringUtils.isEmpty(groupName)) {
// 组为空,直接返回
return;
}
ChannelGroup group = groupMap.get(groupName);
if (null == group) {
group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
groupMap.put(groupName, group);
}
group.add(session.getChannel());
}
public void closeSession(SocketSession session, String echo) {
ChannelFuture sendFuture = session.getChannel().writeAndFlush(new TextWebSocketframe(echo));
sendFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
System.out.println("关闭连接:"+echo);
future.channel().close();
}
});
}
public void closeSession(SocketSession session) {
ChannelFuture sendFuture = session.getChannel().close();
sendFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
System.out.println("发送所有完成:"+session.getUser().getNickname());
}
});
}
public void sendMsg(ChannelHandlerContext ctx, String msg) {
ChannelFuture sendFuture = ctx.writeAndFlush(new TextWebSocketframe(msg));
sendFuture.addListener(f -> {//发送监听
System.out.println("对所有发送完成:"+msg);
});
}
}
SocketSession:
package com.example.nettychat;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.AttributeKey;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
public class SocketSession {
public static final AttributeKey SESSION_KEY = AttributeKey.valueOf("SESSION_KEY");
// 通道
private Channel channel;
// 用户
private User user;
// session唯一标示
private final String sessionId;
private String group;
private Map map = new HashMap();
public SocketSession(Channel channel) {//注意传入参数channel。不同客户端会有不同channel
this.channel = channel;
this.sessionId = buildNewSessionId();
channel.attr(SocketSession.SESSION_KEY).set(this);
}
// 反向导航
public static SocketSession getSession(ChannelHandlerContext ctx) {//注意ctx,不同的客户端会有不同ctx
Channel channel = ctx.channel();
return channel.attr(SocketSession.SESSION_KEY).get();
}
// 反向导航
public static SocketSession getSession(Channel channel) {
return channel.attr(SocketSession.SESSION_KEY).get();
}
public String getId() {
return sessionId;
}
private static String buildNewSessionId() {
String uuid = UUID.randomUUID().toString();
return uuid.replaceAll("-", "");
}
public synchronized void set(String key, Object value) {
map.put(key, value);
}
public synchronized T get(String key) {
return (T) map.get(key);
}
public boolean isValid() {
return getUser() != null ? true : false;
}
public User getUser() {
return user;
}
public void setUser(User user) {
this.user = user;
}
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
public Channel getChannel() {
return channel;
}
}
User:
package com.example.nettychat;
import java.util.Objects;
public class User {
public String id;
public String nickname;
public User(String id, String nickname) {
super();
this.id = id;
this.nickname = nickname;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getNickname() {
return nickname;
}
public void setNickname(String nickname) {
this.nickname = nickname;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
User user = (User) o;
return id.equals(user.getId());
}
@Override
public int hashCode() {
return Objects.hash(id);
}
public String getUid() {
return id;
}
}
WebSocketServer:
package com.example.nettychat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class WebSocketServer {
private static WebSocketServer wbss;
private static final int READ_IDLE_TIME_OUT = 60; // 读超时
private static final int WRITE_IDLE_TIME_OUT = 0;// 写超时
private static final int ALL_IDLE_TIME_OUT = 0; // 所有超时
public static WebSocketServer inst() {
return wbss = new WebSocketServer();
}
public void run(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer () {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// Netty自己的http解码器和编码器,报文级别 HTTP请求的解码和编码
pipeline.addLast(new HttpServerCodec());
// ChunkedWriteHandler 是用于大数据的分区传输
// 主要用于处理大数据流,比如一个1G大小的文件如果你直接传输肯定会撑暴jvm内存的;
// 增加之后就不用考虑这个问题了
pipeline.addLast(new ChunkedWriteHandler());
// HttpObjectAggregator 是完全的解析Http消息体请求用的
// 把多个消息转换为一个单一的完全FullHttpRequest或是FullHttpResponse,
// 原因是HTTP解码器会在每个HTTP消息中生成多个消息对象HttpRequest/HttpResponse,HttpContent,LastHttpContent
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
// WebSocket数据压缩
pipeline.addLast(new WebSocketServerCompressionHandler());
// WebSocketServerProtocolHandler是配置websocket的监听地址/协议包长度限制
pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 10 * 1024));
// 当连接在60秒内没有接收到消息时,就会触发一个 IdleStateEvent 事件,
// 此事件被 HeartbeatHandler 的 userEventTriggered 方法处理到
pipeline.addLast(
new IdleStateHandler(READ_IDLE_TIME_OUT, WRITE_IDLE_TIME_OUT, ALL_IDLE_TIME_OUT, TimeUnit.SECONDS));
// WebSocketServerHandler、TextWebSocketframeHandler 是自定义逻辑处理器,
pipeline.addLast(new WebSocketTextHandler());
}
});
Channel ch = b.bind(port).syncUninterruptibly().channel();
ch.closeFuture().syncUninterruptibly();
// 返回与当前Java应用程序关联的运行时对象
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
SessionGroup.inst().shutdownGracefully();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
});
}
}
WebSocketTextHandler:
package com.example.nettychat; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketframe; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import java.util.HashMap; import java.util.Map; import static com.fasterxml.jackson.databind.type.LogicalType.Map; public class WebSocketTextHandler extends SimpleChannelInboundHandler{ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketframe msg) throws Exception { SocketSession session = SocketSession.getSession(ctx); TypeToken > typeToken = new TypeToken >() { }; Gson gson=new Gson(); java.util.Map map = gson.fromJson(msg.text(), typeToken.getType()); User user = null; switch (map.get("type")) { case "msg": Map result = new HashMap<>(); user = session.getUser(); result.put("type", "msg"); result.put("msg", map.get("msg")); result.put("sendUser", user.getNickname()); SessionGroup.inst().sendToOthers(result, session); break; case "init": String room = map.get("room"); session.setGroup(room); String nick = map.get("nick"); user = new User(session.getId(), nick); session.setUser(user); SessionGroup.inst().addSession(session); break; } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { // 是否握手成功,升级为 Websocket 协议 if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) { // 握手成功,移除 HttpRequestHandler,因此将不会接收到任何消息 // 并把握手成功的 Channel 加入到 ChannelGroup 中 new SocketSession(ctx.channel()); } else if (evt instanceof IdleStateEvent) { IdleStateEvent stateEvent = (IdleStateEvent) evt; if (stateEvent.state() == IdleState.READER_IDLE) { System.out.println("bb22"); } } else { super.userEventTriggered(ctx, evt); } } }
test.html
群聊天室
群名:
昵称:
3.3 测试
四、总结
NIO通过Selector多路复用器单线程模型,节约了线程的资源开销;同时解决了IO线程资源受限,线程切换效率不高和数据读写效率低的问题。
五、参考文章机智的橙子: 分别基于IO、NIO、Netty的Java网络程序
huahua.Dr : Java NIO详解
六、源代码https://github.com/Wattson1128/HomeWork13-1



