栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

【Netty聊天系统】吃透 SpringBoot + Netty , 还开发不了通讯系统?(三)session

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【Netty聊天系统】吃透 SpringBoot + Netty , 还开发不了通讯系统?(三)session

一、netty设置session

netty是没有session,所需要我们自己创建

所以需要自己创建一个对象的,进行存储,就叫用户传输 

package com.binglian.common;

import com.binglian.message.MessagePro;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Data
public class UserTransmission {


    private String uid;
    private String devId= UUID.randomUUID().toString();
    private String token= UUID.randomUUID().toString();
    private String username;
    PLATTYPE platform = PLATTYPE.MAC;

    // windows,mac,android, ios, web , other
    public enum PLATTYPE {
        WINDOWS, MAC, ANDROID, IOS, WEB, OTHER;
    }

    private String sessionId;


    public void setPlatform(int platform) {
        PLATTYPE[] values = PLATTYPE.values();
        for (int i = 0; i < values.length; i++) {
            if (values[i].ordinal() == platform) {
                this.platform = values[i];
            }
        }

    }



    public static UserTransmission fromMsg(MessagePro.LoginRequest info) {
        UserTransmission userTransmission = new UserTransmission();
        userTransmission.uid = new String(info.getUid());
        userTransmission.devId = new String(info.getDeviceId());
        userTransmission.token = new String(info.getToken());
        userTransmission.setPlatform(info.getPlatform());
        log.info("登录中: {}", userTransmission.toString());
        return userTransmission;

    }

}

ServerSession

这里用了一个容器存放session_key,这个是Netty自带的,唯一性,这里用来绑定通道的,这一篇不会用到,但是后面一篇会用到。

public static final AttributeKey SESSION_KEY =
        AttributeKey.valueOf("SESSION_KEY");

//	获取通道
channel.attr(ServerSession.SESSION_KEY).get();

//	this就是把当前session绑定搭配key中
channel.attr(ServerSession.SESSION_KEY).set(this);

package com.binglian.session;

import com.binglian.common.UserTransmission;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.AttributeKey;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@Slf4j
@Data
public class ServerSession {

    public static final AttributeKey SESSION_KEY = AttributeKey.valueOf("SESSION_KEY");

    //  通道
    private Channel channel;


    private UserTransmission userTransmission;

    private final String sessionId;

    private boolean isLogin = false;

    
    private Map map = new HashMap();

    public ServerSession(Channel channel) {
        this.channel = channel;
        this.sessionId = buildNewSessionId();
    }

    //反向导航
    public static ServerSession getSession(ChannelHandlerContext ctx) {
        Channel channel = ctx.channel();
        return channel.attr(ServerSession.SESSION_KEY).get();
    }

    //关闭连接
    public static void closeSession(ChannelHandlerContext ctx) {
        ServerSession session =
                ctx.channel().attr(ServerSession.SESSION_KEY).get();

        if (null != session && session.isValid()) {
            session.close();
            SessionMap.inst().removeSession(session.getSessionId());
        }
    }

    //和channel 通道实现双向绑定
    public ServerSession bind() {
        log.info(" ServerSession 绑定会话 " + channel.remoteAddress());
        channel.attr(ServerSession.SESSION_KEY).set(this);
        SessionMap.inst().addSession( this);
        isLogin = true;
        return this;
    }


    public ServerSession unbind() {
        isLogin = false;
        SessionMap.inst().removeSession(getSessionId());
        this.close();
        return this;
    }

    public String getSessionId() {
        return sessionId;
    }

    private static String buildNewSessionId() {
        String uuid = UUID.randomUUID().toString();
        return uuid.replaceAll("-", "");
    }

    //关闭连接
    public synchronized void close() {
        ChannelFuture future = channel.close();
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    log.error("CHANNEL_CLOSED error ");
                }
            }
        });
    }
    public synchronized void set(String key, Object value) {
        map.put(key, value);
    }


    public synchronized  T get(String key) {
        return (T) map.get(key);
    }


    public boolean isValid() {
        return getUser() != null ? true : false;
    }

    //写Protobuf数据帧
    public synchronized void writeAndFlush(Object pkg) {
        //当系统水位过高时,系统应不继续发送消息,防止发送队列积压
        //写Protobuf数据帧

        if (channel.isWritable()) //低水位
        {
            channel.writeAndFlush(pkg);
        } else {   //高水位时
            log.debug("通道很忙,消息被暂存了");
            //写入消息暂存的分布式存储,如果mongo
            //等channel空闲之后,再写出去
        }

    }




    public UserTransmission getUser() {
        return userTransmission;
    }

    public void setUserTransmission(UserTransmission user) {
        this.userTransmission = user;
        user.setSessionId(sessionId);
    }
}

sessionMap存放,根据sessionID的,添加map,这里用到了ConcurrentHashMap,进行存放,因为它是线程安全的。

package com.binglian.session;

import com.binglian.common.UserTransmission;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;


import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

@Slf4j
@Data
public final class SessionMap {
    private SessionMap() {
    }

    private static SessionMap singleInstance = new SessionMap();

    //  会话集合
    private ConcurrentHashMap map =
            new ConcurrentHashMap();

    //  单例模式,获取对象
    public static SessionMap inst() {
        return singleInstance;
    }

    
    public void addSession(ServerSession s) {
        map.put(s.getSessionId(), s);
        log.info("用户登录:id= " + s.getUser().getUid()
                + "   在线总数: " + map.size());

    }

    
    public ServerSession getSession(String sessionId) {
        if (map.containsKey(sessionId)) {
            return map.get(sessionId);
        } else {
            return null;
        }
    }

    
    public List getSessionsBy(String userId) {

        List list = map.values()
                .stream()
                .filter(s -> s.getUser().getUid().equals(userId))
                .collect(Collectors.toList());
        return list;
    }

    
    public void removeSession(String sessionId) {
        if (!map.containsKey(sessionId)) {
            return;
        }
        ServerSession s = map.get(sessionId);
        map.remove(sessionId);
        log.info("用户下线:id= " + s.getUser().getUid()
                + "   在线总数: " + map.size());
    }



}
二、编码和解码

Netty有自带的一些编码,下面,只列举两个

StringEncoder,对字符串数据进行编码
ObjectEncoder,对 Java 对象进行编码

对应

StringDecoder, 对字符串数据进行解码

ObjectDecoder,对 Java 对象进行解码

编码类解释解码类解释
StringEncoder对字符串数据进行编码StringDecoder对字符串数据进行解码
ObjectEncoder对 Java 对象进行编码ObjectDecoder对 Java 对象进行解码
MessageToByteEncoder自定义编码器ByteToMessageDecoder对应自定义解码器

这里我们用的是自定义的

编码信息

extends MessageToByteEncoder就用到了,我们上面表格所说的。

这里用到了魔术和版本、长度、内容,按照此规范,传输到服务端,服务端那边也会按照编码的规范进行解析信息判断。

package com.binglian.common;

import com.binglian.message.MessagePro;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.extern.slf4j.Slf4j;


@Slf4j
public class ProtobufEncoder extends MessageToByteEncoder {

    @Override
    protected void encode(ChannelHandlerContext ctx,
                          MessagePro.Message msg, ByteBuf out)
            throws Exception {
        encode0(msg, out);
    }

    public static void encode0(
            MessagePro.Message msg, ByteBuf out) {

        //  将魔数和版本都写进去
        out.writeShort(ProtoInstant.MAGIC_CODE);
        out.writeShort(ProtoInstant.VERSION_CODE);


        // 将对象转换为byte
        byte[] bytes = msg.toByteArray();

        // 加密消息体
        int length = bytes.length;// 读取消息的长度

        log.info("message:---" + bytes.toString());

        // 先将消息长度写入,也就是消息头
        out.writeInt(length);
        // 消息体中包含我们要发送的数据
        out.writeBytes(bytes);
    }

}

解码信息

这里就集成了,自定义ByteToMessageDecoder,解码按照编码规定的格式

获取魔数是否匹配枚举类的获取版本是否匹配获取长度然后在获取内容

package com.binglian.common;

import com.binglian.message.MessagePro;
import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;

import java.util.List;


@Slf4j
public class ProtobufDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception {

        Object outmsg = decode0(channelHandlerContext, byteBuf);
        if (outmsg != null) {
            //  添加业务消息
            list.add(outmsg);
        }

    }

    public static Object decode0(ChannelHandlerContext ctx,
                                 ByteBuf in) throws Exception, InvalidProtocolBufferException {


        //  标记及一下当前的readIndex的位置
        in.markReaderIndex();
        //  判断包头长度
        if (in.readableBytes() < 8) {// 不够包头
            return null;
        }
        //  读取魔数 取两位 readShor
        short magic = in.readShort();
        if (magic != ProtoInstant.MAGIC_CODE) {
            String error = "客户端口令不对:" + ctx.channel().remoteAddress();
            throw new Exception(error);
        }

        //读取版本
        short version = in.readShort();
        // 读取传送过来的消息的长度。
        int length = in.readInt();

        // 长度如果小于0
        if (length < 0) {
        		// 非法数据,关闭连接
            ctx.close();
        }

        if (length > in.readableBytes()) {// 读到的消息体长度如果小于传送过来的消息长度
            // 重置读取位置
            in.resetReaderIndex();
            return null;
        }
        log.info("decoder length=" + in.readableBytes());


        byte[] array;
        if (in.hasArray()) {
            array = new byte[length];
            in.readBytes(array, 0, length);
        } else {
            //直接缓冲
            array = new byte[length];
            in.readBytes(array, 0, length);
        }


        // 字节转成对象
        MessagePro.Message outmsg =
                MessagePro.Message.parseFrom(array);

        log.info("接收的数据" + outmsg);
        return outmsg;
    }
}

 
三、请求handler、响应handler 

server服务端接收到客户端的请求,进行一个业务处理,进行绑定通道,通道绑定的代码在上面的

服务端LoginRequesthandler

解码之后,进入handler中, 在执行相关逻辑

这地方,把传过来的对象进行一个转换,转换成Protobuf对象。(像不像对象转换和获取)

				MessagePro.Message message = (MessagePro.Message) msg;

        //	获取请求类型,想不想封装的get和set
        MessagePro.Type Type = message.getType();
package com.binglian.handler;

import com.binglian.common.ProtoInstant;
import com.binglian.common.UserTransmission;
import com.binglian.message.MessagePro;
import com.binglian.session.ServerSession;
import com.binglian.session.SessionMap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

@Slf4j
@Component
@ChannelHandler.Sharable
public class LoginRequestHandler extends ChannelInboundHandlerAdapter {


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        log.info("收到一个新的连接,但是没有登录 {}", ctx.channel().id());
        super.channelActive(ctx);
    }

    
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        if (null == msg  || !(msg instanceof MessagePro.Message)) {
            super.channelRead(ctx, msg);
            return;
        }

        MessagePro.Message message = (MessagePro.Message) msg;

        //取得请求类型
        MessagePro.Type Type = message.getType();

        if (!Type.equals(MessagePro.Type.LOGIN_REQUEST_VALUE)) {
            super.channelRead(ctx, msg);
            return;
        }


				
				//	ctx的通道,添加到serverSession
				//	channel=[id: 0x4fe0514d, L:/127.0.0.1:8081 - R:/127.0.0.1:49990], userTransmission=null, sessionId=25cb72eddded45f3b1b55ccb0450fa9c, isLogin=false, map={}
        ServerSession session = new ServerSession(ctx.channel());


				//	后面进行扩展
        boolean result = true;
        if (result) {
            log.info("登录成功:" + session.getUser());
        } else {
            log.info("登录失败:" + session.getUser());
        }


    }


   



    

}

客户端的LoginResponseHandler

客户端获解码信息,判断服务器发送过来的信息,是否是成功还是失败

package com.binglian.handler;



import com.binglian.common.ProtoInstant;
import com.binglian.message.MessagePro;
import com.binglian.session.ClientSession;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@ChannelHandler.Sharable
@Component
public class LoginResponseHandler extends ChannelInboundHandlerAdapter {


    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {

        log.info("LoginResponseHandler");
        //判断消息实例
        if (null == msg || !(msg instanceof MessagePro.Message)) {
            super.channelRead(ctx, msg);
            return;
        }

        //	判断类型是不是登录响应的类型
        MessagePro.Message pkg = (MessagePro.Message) msg;
        MessagePro.Type type = ((MessagePro.Message) msg).getType();
        if (!type.equals(MessagePro.Type.LOGIN_RESPONSE)) {
            super.channelRead(ctx, msg);
            return;
        }


        //	判断返回是否成功,然后进行判断处理,后面扩展
        MessagePro.LoginResponse info = pkg.getLoginResponse();

        

    }



}

四、登录前的准备,protobuf使用

这两端代码,就是把之前的创建的protobuf使用,

第一条,就是把LoginRequest对象进行封装,把user传输对象放到其中第二条,就是把LoginRequest放到Message中,在放进其他内容

						log.info("构建登录信息");
            MessagePro.LoginRequest.Builder lb = MessagePro.LoginRequest.newBuilder()
                    .setDeviceId(user.getDevId())
                    .setPlatform(user.getPlatform().ordinal())
                    .setToken(user.getToken())
                    .setUid(user.getUid());

            MessagePro.Message message = MessagePro.Message
                    .newBuilder()
                    .setType(MessagePro.Type.LOGIN_REQUEST)
                    .setLoginRequest(lb)
                    .setSessionId(session.getSessionId())
                    .setSerialNumber(-1).buildPartial();
登录连接的工具类
package com.binglian.client;

import com.binglian.common.UserTransmission;
import com.binglian.entity.NettyUser;
import com.binglian.message.MessagePro;
import com.binglian.session.ClientSession;
import com.binglian.task.FutureTaskScheduler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.concurrent.TimeUnit;


@Component
@Slf4j
public class ClientUtils implements Client{


    //  连接标记,是否连接成功
    private boolean connectFlag = false;

    //  通道
    private Channel channel;

    //会话类
    private ClientSession session;

    @Autowired
    private NettyClient nettyClient;


    public void login(NettyUser nettyUser) {
        //  判断是否连接了
        while (connectFlag == false) {


            //  开始连接
            startConnectServer();
            waitCommandThread();
        }
        if (session != null) {
            UserTransmission user = new UserTransmission();
            user.setUid(String.valueOf(nettyUser.getUserId()));
            user.setUsername(nettyUser.getUsername());
            session.setUser(user);

            log.info("构建登录信息");
            MessagePro.LoginRequest.Builder lb = MessagePro.LoginRequest.newBuilder()
                    .setDeviceId(user.getDevId())
                    .setPlatform(user.getPlatform().ordinal())
                    .setToken(user.getToken())
                    .setUid(user.getUid());

            MessagePro.Message message = MessagePro.Message
                    .newBuilder()
                    .setType(MessagePro.Type.LOGIN_REQUEST)
                    .setLoginRequest(lb)
                    .setSessionId(session.getSessionId())
                    .setSerialNumber(-1).buildPartial();

            log.info("发送信息");

            sendMsg(message);
        }
    }


    public void startConnectServer() {
        FutureTaskScheduler.add(()->{
            
            nettyClient.setConnectedListener(connectedListener);
            nettyClient.doConnect();
        });
    }


    public void sendMsg(MessagePro.Message message) {


        if (null ==  this.session || !session.isConnected()) {
            log.info("连接还没成功");
            return;
        }

        Channel channel = session.getChannel();
        //  writeAndFlush write+flush 写进去
        ChannelFuture f = channel.writeAndFlush(message);
        f.addListener(new GenericFutureListener>() {
            @Override
            public void operationComplete(Future future)
                    throws Exception {
                // 回调
                if (future.isSuccess()) {
                    log.info("发送成功");
                } else {
                    log.info("发送失败");
                }
            }

        });
    }

    public synchronized void notifyCommandThread() {
        //唤醒,命令收集程
        this.notify();

    }

    public synchronized void waitCommandThread() {

        //休眠,命令收集线程
        try {
            this.wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }


    GenericFutureListener closeListener = (ChannelFuture f) ->
    {


        log.info(new Date() + ": 连接已经断开……");
        channel = f.channel();

        // 创建会话
        ClientSession session =
                channel.attr(ClientSession.SESSION_KEY).get();
        session.close();

        //唤醒用户线程
        notifyCommandThread();
    };

    GenericFutureListener connectedListener = (ChannelFuture f)->{
        final EventLoop eventLoop = f.channel().eventLoop();

        if (!f.isSuccess()) {
            log.info("连接失败!在10s之后准备尝试重连!");
            eventLoop.schedule(() -> nettyClient.doConnect(),
                    10, TimeUnit.SECONDS);

            connectFlag = false;
        } else {

            connectFlag = true;

            log.info("服务器 连接成功");
            channel = f.channel();

            //  创建回话
            session = new ClientSession(channel);
            session.setConnected(true);
            channel.closeFuture().addListener(closeListener);



            //唤醒用户线程
            notifyCommandThread();

        }

    };
}

其他需要的工具类 枚举
package com.binglian.common;

public class ProtoInstant {

    
    public static final short MAGIC_CODE = 0x86;

    
    public static final short VERSION_CODE = 0x01;

    
    public interface Platform {
        
        public static final int WINDOWS = 1;

        
        public static final int MAC = 2;
        
        public static final int ANDROID = 3;
        
        public static final int IOS = 4;
        
        public static final int WEB = 5;
        
        public static final int UNKNOWN = 6;


    }


    
    public enum ResultCodeEnum {

        SUCCESS(0, "Success"),  // 成功
        AUTH_FAILED(1, "登录失败"),
        NO_TOKEN(2, "没有授权码"),
        UNKNOW_ERROR(3, "未知错误"),
        ;

        private Integer code;
        private String desc;

        ResultCodeEnum(Integer code, String desc) {
            this.code = code;
            this.desc = desc;
        }

        public Integer getCode() {
            return code;
        }

        public String getDesc() {
            return desc;
        }

    }

}

任务调度

每次调用方法,创建一个线程执行

package com.binglian.task;


import com.binglian.utils.ThreadUtil;

import java.util.concurrent.ThreadPoolExecutor;


public class FutureTaskScheduler {

    static ThreadPoolExecutor mixPool = null;
    static {
        mixPool = ThreadUtil.getMixedTargetThreadPool();
    }

    private FutureTaskScheduler() {

    }

    public static void add(ExecuteTask executeTask) {
        mixPool.submit(()->{
            executeTask.execute();
        });
    }

    
    public interface ExecuteTask {

        
        public void execute();
    }
}

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号