服务器端:可以检测用户上线、离线,并实现消息的转发功能。
客户端:通过channel可以五阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(有服务器转发得到)
使用NIO聊天系统,实现服务器端和客户端之间的数据非阻塞的简单通讯。
1.服务端
package com.ljf.netty.nio.chat;
import org.omg.Messaging.SyncScopeHelper;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
public class CharNioServer {
//定义属性
private Selector selector;
private ServerSocketChannel listenChannel;
private static final int PORT=6666;
//构造函数,初始化工作
public CharNioServer() throws IOException {
//得到选择器
selector=Selector.open();
listenChannel=ServerSocketChannel.open();
//绑定端口
listenChannel.socket().bind(new InetSocketAddress(PORT));
//设置非阻塞模式
listenChannel.configureBlocking(false);
//将该listenchannel注册到selector
listenChannel.register(selector, SelectionKey.OP_ACCEPT);
}
//监听
public void listen(){
try {
//循环处理
while(true) {
int count = selector.select();
if(count>0){//有事件需要处理
Iterator iterator=selector.selectedKeys().iterator();
while (iterator.hasNext()){
//取出selectionKey
SelectionKey key=iterator.next();
//监听到是接收状态
if(key.isAcceptable()){
SocketChannel sc=listenChannel.accept();
sc.configureBlocking(false);//这句话千万不能少不然,报 java.nio.channels.IllegalBlockingModeException
//将该sc注册到selector
sc.register(selector,SelectionKey.OP_READ);
System.out.println(sc.getRemoteAddress()+"已经上线");
}
if(key.isReadable()){//通道发送read事件,即通道是可读状态
//处理读写的方法,处理业务的方法
readDataHandler(key);
}
//当前的key删除,防止重复处理
iterator.remove();
}
}
else{
System.out.println("等待......");
}
}
} catch (IOException e) {
e.printStackTrace();
}
finally {
System.out.println("==========");
}
}
//读取客户端消息
private void readDataHandler(SelectionKey key){
//取到关联的channle
SocketChannel channel=null;
try {
//得到channel
channel=(SocketChannel)key.channel();
//创建buffer
ByteBuffer buffer=ByteBuffer.allocate(1024);
int count =channel.read(buffer);
//根据count的值做处理
if(count>0){
//把缓存区的数据转成字符串
String msg=new String(buffer.array());
//输出信息
System.out.println("from 客户端:"+msg);
//向其他客户端转发消息(去掉自己),独立一个方法来处理此业务
sendInfoToOtherClients(msg,channel);
}
} catch (IOException e) {
//e.printStackTrace();
try {
System.out.println(channel.getRemoteAddress()+"离线了....");
//取消注册
key.cancel();;
//关闭通道
channel.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
//转发消息给其他客户端
private void sendInfoToOtherClients(String msg,SocketChannel self) throws IOException {
System.out.println("服务器转发消息中......");
//遍历所有注册到selector上的socketchannel,并排查自己self
for(SelectionKey key:selector.keys()){
//通过key,取出对应的socketchannel
Channel targetChannel=key.channel();
//排查自己
if(targetChannel instanceof SocketChannel &&targetChannel!=self){
//转型
SocketChannel dest=(SocketChannel)targetChannel;
//将msg存储到buffer中
ByteBuffer buffer=ByteBuffer.wrap(msg.getBytes());
//将buffer的数据写入到通道
dest.write(buffer);
}
}
}
public static void main(String[] args) throws IOException {
//创建调用
CharNioServer charNioServer=new CharNioServer();
charNioServer.listen();
}
}
2.客户端
package com.ljf.netty.nio.chat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
public class CharNioClient {
//定义相关的属性
private final String HOST="127.0.0.1";
private final int PORT=6666;//服务器端口
private Selector selector;
private SocketChannel socketChannel;
private String username;
//构造器,完成初始化工作
public CharNioClient() throws IOException {
selector=Selector.open();
//连接服务器
socketChannel=SocketChannel.open(new InetSocketAddress("127.0.0.1",PORT));
//设置非阻塞
socketChannel.configureBlocking(false);
//将channel注册到selector
socketChannel.register(selector, SelectionKey.OP_READ);
//得到username
username=socketChannel.getLocalAddress().toString().substring(1);
System.out.println(username+" is ok ......");
}
//向服务器发送消息
public void sendInfo(String info) throws IOException {
info=username+" 说: "+info;
socketChannel.write(ByteBuffer.wrap(info.getBytes()));
}
//读取从服务器端回复的消息
public void readInfo(){
try {
int readChannels=selector.select();
if(readChannels>0){//有可以用的通道
Iterator iterator=selector.selectedKeys().iterator();
while(iterator.hasNext()){
SelectionKey key=iterator.next();
if(key.isReadable()){
//得到相关的通道
SocketChannel sc=(SocketChannel)key.channel();
//得到一个Buffer
ByteBuffer buffer=ByteBuffer.allocate(1024);
//读取
sc.read(buffer);
//把读到的缓冲区的数据转成字符串
String msg=new String(buffer.array());
System.out.println("msg:"+msg.trim());
}
}
iterator.remove();;//删除当前的selectionKey,防止重复操作
}
else{
System.out.println("没有可用通道....");
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
CharNioClient charNioClient=new CharNioClient();
//启动一个线程,每个3秒,读取从服务器发送数据
new Thread(new Runnable() {
@Override
public void run() {
while(true){
charNioClient.readInfo();
try {
Thread.currentThread().sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
//发送数据
Scanner scanner=new Scanner(System.in);
while (scanner.hasNext()){
String s=scanner.nextLine();
charNioClient.sendInfo(s);
}
}
}
3.测试结果,启动多个客户端
1.客户端 2209收到客户端2225发送信息
2.客户端2225收到2209发送的消息
3.服务端信息



