开启ServerSocketChannel,设置为非阻塞,并声明为全局变量开启一个selector,并绑定客户端链接事件启动监听方法,循环监听事件,阻塞监听事件读取到客户端消息,并转发给其他客户端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
public class GroupChatServer {
private ServerSocketChannel serverSocketChannel;
private Selector selector;
private static final int PORT = 6666;
public GroupChatServer() {
try {
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务端已启动");
} catch (Exception e) {
e.printStackTrace();
}
}
public void listen() {
try {
while (true) {
int count = selector.select();
if (count == 0) {
continue;
}
Iterator keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println(socketChannel.getRemoteAddress() + "上线了");
}
if (key.isReadable()) {
//处理数据读取
readData(key);
}
keyIterator.remove();
}
}
} catch (Exception e) {
} finally {
}
}
private void readData(SelectionKey key) {
SocketChannel channel = null;
try {
channel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int read = channel.read(byteBuffer);
if (read > 0) {
//输出读取到的数据
String msg = new String(byteBuffer.array(), 0, read);
System.out.println("转发客户端消息:" + msg);
//向其他客户端转发消息
sendMsgToOther(msg, channel);
}
} catch (IOException e) {
try {
System.out.println(channel.getRemoteAddress() + " 已下线");
//取消注册事件,并关闭通道
key.cancel();
channel.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
private void sendMsgToOther(String msg, SocketChannel self) {
selector.keys().forEach(key -> {
SelectableChannel channel = key.channel();
if (channel instanceof SocketChannel && channel != self) {
//发给所有除自己意外的客户端
SocketChannel socketChannel = (SocketChannel) channel;
ByteBuffer src = ByteBuffer.wrap(msg.getBytes());
try {
socketChannel.write(src);
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
public static void main(String[] args) {
GroupChatServer server = new GroupChatServer();
server.listen();
}
}
客户端代码
链接到服务器端,得到SocketChannel,并设置为非阻塞开启一个selector,并绑定read事件启动一个线程,每隔3秒,读取从服务器端发送过来的数据开启一个Scanner,通过控制台发送信息给服务端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class GroupChatClient {
private static final String SERVER = "127.0.0.1";
private static final int PORT = 6666;
private SocketChannel channel;
private Selector selector;
private String userName;
public GroupChatClient() throws IOException {
//链接到服务器
channel = SocketChannel.open(new InetSocketAddress(SERVER, PORT));
//设置为非阻塞
channel.configureBlocking(false);
selector = Selector.open();
//绑定read事件到selector
channel.register(selector, SelectionKey.OP_READ);
userName = channel.getLocalAddress().toString();
System.out.println(userName + " 我已上线");
}
public void sendInfo(String msg) {
msg = userName + "说:" + msg;
try {
channel.write(ByteBuffer.wrap(msg.getBytes()));
} catch (IOException e) {
e.printStackTrace();
}
}
public void readInfo() {
try {
//线程阻塞,等待消息
int count = selector.select();
if (count > 0) {
Iterator keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
//得到一个ByteBuffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//读取数据到Buffer
int read = channel.read(byteBuffer);
System.out.println(new String(byteBuffer.array(), 0, read));
}
//移除已处理的key
keyIterator.remove();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
GroupChatClient groupChatClient = new GroupChatClient();
//启动一个线程,每隔3秒,读取从服务器端发送过来的数据
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
while (true) {
groupChatClient.readInfo();
Thread.sleep(3000l);
}
});
//发送数据给服务端
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String str = scanner.nextLine();
groupChatClient.sendInfo(str);
}
}
}
测试
1)启动服务端
2)启动3个客户端
服务端控制台
服务端已启动 /127.0.0.1:57806上线了 /127.0.0.1:57814上线了 /127.0.0.1:57825上线了 转发客户端消息:/127.0.0.1:57825说:我是01 转发客户端消息:/127.0.0.1:57814说:我是02 转发客户端消息:/127.0.0.1:57806说:我是03 /127.0.0.1:57825 已下线 /127.0.0.1:57814 已下线 /127.0.0.1:57806 已下线
客户端控制台
/127.0.0.1:57825 我已上线 我是01 /127.0.0.1:57814说:我是02 /127.0.0.1:57806说:我是03
/127.0.0.1:57814 我已上线 /127.0.0.1:57825说:我是01 我是02 /127.0.0.1:57806说:我是03
/127.0.0.1:57806 我已上线 /127.0.0.1:57825说:我是01 /127.0.0.1:57814说:我是02 我是03



