netty是没有session,所需要我们自己创建
所以需要自己创建一个对象的,进行存储,就叫用户传输
package com.binglian.common;
import com.binglian.message.MessagePro;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@Data
public class UserTransmission {
private String uid;
private String devId= UUID.randomUUID().toString();
private String token= UUID.randomUUID().toString();
private String username;
PLATTYPE platform = PLATTYPE.MAC;
// windows,mac,android, ios, web , other
public enum PLATTYPE {
WINDOWS, MAC, ANDROID, IOS, WEB, OTHER;
}
private String sessionId;
public void setPlatform(int platform) {
PLATTYPE[] values = PLATTYPE.values();
for (int i = 0; i < values.length; i++) {
if (values[i].ordinal() == platform) {
this.platform = values[i];
}
}
}
public static UserTransmission fromMsg(MessagePro.LoginRequest info) {
UserTransmission userTransmission = new UserTransmission();
userTransmission.uid = new String(info.getUid());
userTransmission.devId = new String(info.getDeviceId());
userTransmission.token = new String(info.getToken());
userTransmission.setPlatform(info.getPlatform());
log.info("登录中: {}", userTransmission.toString());
return userTransmission;
}
}
ServerSession
这里用了一个容器存放session_key,这个是Netty自带的,唯一性,这里用来绑定通道的,这一篇不会用到,但是后面一篇会用到。
public static final AttributeKeySESSION_KEY = AttributeKey.valueOf("SESSION_KEY"); // 获取通道 channel.attr(ServerSession.SESSION_KEY).get(); // this就是把当前session绑定搭配key中 channel.attr(ServerSession.SESSION_KEY).set(this);
package com.binglian.session;
import com.binglian.common.UserTransmission;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.AttributeKey;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@Slf4j
@Data
public class ServerSession {
public static final AttributeKey SESSION_KEY = AttributeKey.valueOf("SESSION_KEY");
// 通道
private Channel channel;
private UserTransmission userTransmission;
private final String sessionId;
private boolean isLogin = false;
private Map map = new HashMap();
public ServerSession(Channel channel) {
this.channel = channel;
this.sessionId = buildNewSessionId();
}
//反向导航
public static ServerSession getSession(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
return channel.attr(ServerSession.SESSION_KEY).get();
}
//关闭连接
public static void closeSession(ChannelHandlerContext ctx) {
ServerSession session =
ctx.channel().attr(ServerSession.SESSION_KEY).get();
if (null != session && session.isValid()) {
session.close();
SessionMap.inst().removeSession(session.getSessionId());
}
}
//和channel 通道实现双向绑定
public ServerSession bind() {
log.info(" ServerSession 绑定会话 " + channel.remoteAddress());
channel.attr(ServerSession.SESSION_KEY).set(this);
SessionMap.inst().addSession( this);
isLogin = true;
return this;
}
public ServerSession unbind() {
isLogin = false;
SessionMap.inst().removeSession(getSessionId());
this.close();
return this;
}
public String getSessionId() {
return sessionId;
}
private static String buildNewSessionId() {
String uuid = UUID.randomUUID().toString();
return uuid.replaceAll("-", "");
}
//关闭连接
public synchronized void close() {
ChannelFuture future = channel.close();
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
log.error("CHANNEL_CLOSED error ");
}
}
});
}
public synchronized void set(String key, Object value) {
map.put(key, value);
}
public synchronized T get(String key) {
return (T) map.get(key);
}
public boolean isValid() {
return getUser() != null ? true : false;
}
//写Protobuf数据帧
public synchronized void writeAndFlush(Object pkg) {
//当系统水位过高时,系统应不继续发送消息,防止发送队列积压
//写Protobuf数据帧
if (channel.isWritable()) //低水位
{
channel.writeAndFlush(pkg);
} else { //高水位时
log.debug("通道很忙,消息被暂存了");
//写入消息暂存的分布式存储,如果mongo
//等channel空闲之后,再写出去
}
}
public UserTransmission getUser() {
return userTransmission;
}
public void setUserTransmission(UserTransmission user) {
this.userTransmission = user;
user.setSessionId(sessionId);
}
}
sessionMap存放,根据sessionID的,添加map,这里用到了ConcurrentHashMap,进行存放,因为它是线程安全的。
package com.binglian.session;
import com.binglian.common.UserTransmission;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@Slf4j
@Data
public final class SessionMap {
private SessionMap() {
}
private static SessionMap singleInstance = new SessionMap();
// 会话集合
private ConcurrentHashMap map =
new ConcurrentHashMap();
// 单例模式,获取对象
public static SessionMap inst() {
return singleInstance;
}
public void addSession(ServerSession s) {
map.put(s.getSessionId(), s);
log.info("用户登录:id= " + s.getUser().getUid()
+ " 在线总数: " + map.size());
}
public ServerSession getSession(String sessionId) {
if (map.containsKey(sessionId)) {
return map.get(sessionId);
} else {
return null;
}
}
public List getSessionsBy(String userId) {
List list = map.values()
.stream()
.filter(s -> s.getUser().getUid().equals(userId))
.collect(Collectors.toList());
return list;
}
public void removeSession(String sessionId) {
if (!map.containsKey(sessionId)) {
return;
}
ServerSession s = map.get(sessionId);
map.remove(sessionId);
log.info("用户下线:id= " + s.getUser().getUid()
+ " 在线总数: " + map.size());
}
}
二、编码和解码
Netty有自带的一些编码,下面,只列举两个
StringEncoder,对字符串数据进行编码
ObjectEncoder,对 Java 对象进行编码
对应
StringDecoder, 对字符串数据进行解码
ObjectDecoder,对 Java 对象进行解码
| 编码类 | 解释 | 解码类 | 解释 |
|---|---|---|---|
| StringEncoder | 对字符串数据进行编码 | StringDecoder | 对字符串数据进行解码 |
| ObjectEncoder | 对 Java 对象进行编码 | ObjectDecoder | 对 Java 对象进行解码 |
| MessageToByteEncoder | 自定义编码器 | ByteToMessageDecoder | 对应自定义解码器 |
这里我们用的是自定义的
编码信息extends MessageToByteEncoder
这里用到了魔术和版本、长度、内容,按照此规范,传输到服务端,服务端那边也会按照编码的规范进行解析信息判断。
package com.binglian.common; import com.binglian.message.MessagePro; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import lombok.extern.slf4j.Slf4j; @Slf4j public class ProtobufEncoder extends MessageToByteEncoder解码信息{ @Override protected void encode(ChannelHandlerContext ctx, MessagePro.Message msg, ByteBuf out) throws Exception { encode0(msg, out); } public static void encode0( MessagePro.Message msg, ByteBuf out) { // 将魔数和版本都写进去 out.writeShort(ProtoInstant.MAGIC_CODE); out.writeShort(ProtoInstant.VERSION_CODE); // 将对象转换为byte byte[] bytes = msg.toByteArray(); // 加密消息体 int length = bytes.length;// 读取消息的长度 log.info("message:---" + bytes.toString()); // 先将消息长度写入,也就是消息头 out.writeInt(length); // 消息体中包含我们要发送的数据 out.writeBytes(bytes); } }
这里就集成了,自定义ByteToMessageDecoder,解码按照编码规定的格式
获取魔数是否匹配枚举类的获取版本是否匹配获取长度然后在获取内容
package com.binglian.common;
import com.binglian.message.MessagePro;
import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
@Slf4j
public class ProtobufDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List
三、请求handler、响应handler
server服务端接收到客户端的请求,进行一个业务处理,进行绑定通道,通道绑定的代码在上面的
服务端LoginRequesthandler解码之后,进入handler中, 在执行相关逻辑
这地方,把传过来的对象进行一个转换,转换成Protobuf对象。(像不像对象转换和获取)
MessagePro.Message message = (MessagePro.Message) msg;
// 获取请求类型,想不想封装的get和set
MessagePro.Type Type = message.getType();
package com.binglian.handler;
import com.binglian.common.ProtoInstant;
import com.binglian.common.UserTransmission;
import com.binglian.message.MessagePro;
import com.binglian.session.ServerSession;
import com.binglian.session.SessionMap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
@Slf4j
@Component
@ChannelHandler.Sharable
public class LoginRequestHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("收到一个新的连接,但是没有登录 {}", ctx.channel().id());
super.channelActive(ctx);
}
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
if (null == msg || !(msg instanceof MessagePro.Message)) {
super.channelRead(ctx, msg);
return;
}
MessagePro.Message message = (MessagePro.Message) msg;
//取得请求类型
MessagePro.Type Type = message.getType();
if (!Type.equals(MessagePro.Type.LOGIN_REQUEST_VALUE)) {
super.channelRead(ctx, msg);
return;
}
// ctx的通道,添加到serverSession
// channel=[id: 0x4fe0514d, L:/127.0.0.1:8081 - R:/127.0.0.1:49990], userTransmission=null, sessionId=25cb72eddded45f3b1b55ccb0450fa9c, isLogin=false, map={}
ServerSession session = new ServerSession(ctx.channel());
// 后面进行扩展
boolean result = true;
if (result) {
log.info("登录成功:" + session.getUser());
} else {
log.info("登录失败:" + session.getUser());
}
}
}
客户端的LoginResponseHandler
客户端获解码信息,判断服务器发送过来的信息,是否是成功还是失败
package com.binglian.handler;
import com.binglian.common.ProtoInstant;
import com.binglian.message.MessagePro;
import com.binglian.session.ClientSession;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@ChannelHandler.Sharable
@Component
public class LoginResponseHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
log.info("LoginResponseHandler");
//判断消息实例
if (null == msg || !(msg instanceof MessagePro.Message)) {
super.channelRead(ctx, msg);
return;
}
// 判断类型是不是登录响应的类型
MessagePro.Message pkg = (MessagePro.Message) msg;
MessagePro.Type type = ((MessagePro.Message) msg).getType();
if (!type.equals(MessagePro.Type.LOGIN_RESPONSE)) {
super.channelRead(ctx, msg);
return;
}
// 判断返回是否成功,然后进行判断处理,后面扩展
MessagePro.LoginResponse info = pkg.getLoginResponse();
}
}
四、登录前的准备,protobuf使用
这两端代码,就是把之前的创建的protobuf使用,
第一条,就是把LoginRequest对象进行封装,把user传输对象放到其中第二条,就是把LoginRequest放到Message中,在放进其他内容
log.info("构建登录信息");
MessagePro.LoginRequest.Builder lb = MessagePro.LoginRequest.newBuilder()
.setDeviceId(user.getDevId())
.setPlatform(user.getPlatform().ordinal())
.setToken(user.getToken())
.setUid(user.getUid());
MessagePro.Message message = MessagePro.Message
.newBuilder()
.setType(MessagePro.Type.LOGIN_REQUEST)
.setLoginRequest(lb)
.setSessionId(session.getSessionId())
.setSerialNumber(-1).buildPartial();
登录连接的工具类
package com.binglian.client;
import com.binglian.common.UserTransmission;
import com.binglian.entity.NettyUser;
import com.binglian.message.MessagePro;
import com.binglian.session.ClientSession;
import com.binglian.task.FutureTaskScheduler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class ClientUtils implements Client{
// 连接标记,是否连接成功
private boolean connectFlag = false;
// 通道
private Channel channel;
//会话类
private ClientSession session;
@Autowired
private NettyClient nettyClient;
public void login(NettyUser nettyUser) {
// 判断是否连接了
while (connectFlag == false) {
// 开始连接
startConnectServer();
waitCommandThread();
}
if (session != null) {
UserTransmission user = new UserTransmission();
user.setUid(String.valueOf(nettyUser.getUserId()));
user.setUsername(nettyUser.getUsername());
session.setUser(user);
log.info("构建登录信息");
MessagePro.LoginRequest.Builder lb = MessagePro.LoginRequest.newBuilder()
.setDeviceId(user.getDevId())
.setPlatform(user.getPlatform().ordinal())
.setToken(user.getToken())
.setUid(user.getUid());
MessagePro.Message message = MessagePro.Message
.newBuilder()
.setType(MessagePro.Type.LOGIN_REQUEST)
.setLoginRequest(lb)
.setSessionId(session.getSessionId())
.setSerialNumber(-1).buildPartial();
log.info("发送信息");
sendMsg(message);
}
}
public void startConnectServer() {
FutureTaskScheduler.add(()->{
nettyClient.setConnectedListener(connectedListener);
nettyClient.doConnect();
});
}
public void sendMsg(MessagePro.Message message) {
if (null == this.session || !session.isConnected()) {
log.info("连接还没成功");
return;
}
Channel channel = session.getChannel();
// writeAndFlush write+flush 写进去
ChannelFuture f = channel.writeAndFlush(message);
f.addListener(new GenericFutureListener>() {
@Override
public void operationComplete(Future super Void> future)
throws Exception {
// 回调
if (future.isSuccess()) {
log.info("发送成功");
} else {
log.info("发送失败");
}
}
});
}
public synchronized void notifyCommandThread() {
//唤醒,命令收集程
this.notify();
}
public synchronized void waitCommandThread() {
//休眠,命令收集线程
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
GenericFutureListener closeListener = (ChannelFuture f) ->
{
log.info(new Date() + ": 连接已经断开……");
channel = f.channel();
// 创建会话
ClientSession session =
channel.attr(ClientSession.SESSION_KEY).get();
session.close();
//唤醒用户线程
notifyCommandThread();
};
GenericFutureListener connectedListener = (ChannelFuture f)->{
final EventLoop eventLoop = f.channel().eventLoop();
if (!f.isSuccess()) {
log.info("连接失败!在10s之后准备尝试重连!");
eventLoop.schedule(() -> nettyClient.doConnect(),
10, TimeUnit.SECONDS);
connectFlag = false;
} else {
connectFlag = true;
log.info("服务器 连接成功");
channel = f.channel();
// 创建回话
session = new ClientSession(channel);
session.setConnected(true);
channel.closeFuture().addListener(closeListener);
//唤醒用户线程
notifyCommandThread();
}
};
}
其他需要的工具类
枚举
package com.binglian.common;
public class ProtoInstant {
public static final short MAGIC_CODE = 0x86;
public static final short VERSION_CODE = 0x01;
public interface Platform {
public static final int WINDOWS = 1;
public static final int MAC = 2;
public static final int ANDROID = 3;
public static final int IOS = 4;
public static final int WEB = 5;
public static final int UNKNOWN = 6;
}
public enum ResultCodeEnum {
SUCCESS(0, "Success"), // 成功
AUTH_FAILED(1, "登录失败"),
NO_TOKEN(2, "没有授权码"),
UNKNOW_ERROR(3, "未知错误"),
;
private Integer code;
private String desc;
ResultCodeEnum(Integer code, String desc) {
this.code = code;
this.desc = desc;
}
public Integer getCode() {
return code;
}
public String getDesc() {
return desc;
}
}
}
任务调度
每次调用方法,创建一个线程执行
package com.binglian.task;
import com.binglian.utils.ThreadUtil;
import java.util.concurrent.ThreadPoolExecutor;
public class FutureTaskScheduler {
static ThreadPoolExecutor mixPool = null;
static {
mixPool = ThreadUtil.getMixedTargetThreadPool();
}
private FutureTaskScheduler() {
}
public static void add(ExecuteTask executeTask) {
mixPool.submit(()->{
executeTask.execute();
});
}
public interface ExecuteTask {
public void execute();
}
}



