栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

java nio 实现一个高性能im服务器 基于reactor模型

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

java nio 实现一个高性能im服务器 基于reactor模型

  • server 部分
  • 项目地址
  • github
package com.HyChat.server;

import com.HyChat.server.Handle.MegHandel;
import com.HyChat.server.Handle.MegHandelimpl;
import com.HyChat.server.untity.LoggerUntity;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class HyChatServer {

    private Selector selector;

    
    private final int MaxFollwer=Runtime.getRuntime().availableProcessors()*2;;

    private ServerSocketChannel socketChannel;

    private MegHandel megHandel;

    private int keys=0;

    private FollowerServer[] followerServer=new FollowerServer[MaxFollwer];

    private ExecutorService factory= Executors.newFixedThreadPool(3);
    
    public void Start() throws IOException {
        init();
        HandelConn();
    }

    
    private void init() throws IOException {
       socketChannel= ServerSocketChannel.open();
       selector=Selector.open();
       //设置为非阻塞
       socketChannel.configureBlocking(false);
       socketChannel.socket().bind(new InetSocketAddress("127.0.0.1",8888));

       socketChannel.register(selector, SelectionKey.OP_ACCEPT);//注册链接事件

        megHandel=new MegHandelimpl();
        //初始化子rector
        for (int i=0;i
            followerServer[i]=new FollowerServer();
        }
    }

    
    private void HandelConn() throws IOException {
        System.out.println("链接启动");
        int index=0;
        while (true) {
            keys = selector.select();//会阻塞直到有链接请求进来
            if (keys>0){
                Iterator iterator = selector.selectedKeys().iterator();
                while(iterator.hasNext()){
                    SelectionKey currKey=iterator.next();

                    iterator.remove();

                    if (currKey.isAcceptable()){

                        socketChannel= (ServerSocketChannel) currKey.channel();

                        SocketChannel channel = socketChannel.accept();

                        channel.configureBlocking(false);

                        //注册读事件

                        followerServer[0].Regist(channel);


                        LoggerUntity.LogInfo("收到链接");
                    }
                }
            }
        }

    }
    public static void main(String[] args) {
        try {
            new HyChatServer().Start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

package com.HyChat.server;

import com.HyChat.server.Handle.MegHandel;
import com.HyChat.server.Handle.MegHandelimpl;
import com.HyChat.server.Message.ReqMessage;
import com.HyChat.server.ThreadPool.TaskPool;
import com.HyChat.server.untity.LoggerUntity;
import lombok.SneakyThrows;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.*;
import java.util.concurrent.*;


public class FollowerServer {

    private Selector selector;

    
    private MegHandel megHandel;

    private ExecutorService factory;

    public  FollowerServer() throws IOException {
        this.selector = Selector.open();
        factory= Executors.newCachedThreadPool();
        megHandel=new MegHandelimpl();
        new Thread(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                ReadPoll();
            }
        }).start();
    }
    public  void Regist(SocketChannel channel) throws ClosedChannelException {
        try {
            LoggerUntity.LogInfo(String.format("收到新的链接%s", channel.getRemoteAddress().toString()));
        } catch (IOException e) {
            e.printStackTrace();
        }
        channel.register(selector, SelectionKey.OP_READ);
    }
    private void ReadPoll() throws IOException {
        while (true){
            int selectLen=0;
            //设置一个超时时间不然无法注册事件
            selectLen=selector.select(1000);

            if (selectLen>0) {
                Iterator iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey currKey = iterator.next();
                    iterator.remove();
                    if (currKey.isReadable()){

                        SocketChannel channel = (SocketChannel) currKey.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        int length = channel.read(buffer);
                        if (length==-1){
                            //下线了
                            LoggerUntity.LogInfo("用户下线了");
                            MegHandelimpl.Offline(currKey);
                            currKey.cancel();
                            //selector.keys().remove(currKey);
                            continue;
                        }
                        try {
                            TaskPool.Sumit(new Runnable() {
                                @SneakyThrows
                                @Override
                                public void run() {
                                    ReqMessage.MegBody megBody = ReqMessage.MegBody.parseFrom(Arrays.copyOfRange(buffer.array(),0,length));
                                    LoggerUntity.LogInfo(String.format("发送给 %s 的消息",megBody.getTarget()));
                                    try {
                                        megHandel.DoHandel(megBody,currKey);
                                    } catch (IOException e) {
                                        e.printStackTrace();
                                    }
                                }
                            });

                        }
                        catch (Exception e){
                            LoggerUntity.LogWaring("消息类型错误");

                        }
                    }

                }
            }
        }
    }

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/872011.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号