目录
大都会(Metropolis)
前期准备
基础配置
消息规范
Netty封装
服务器封装
用户通道封装
消息处理封装
消息客户端
前两篇我们完成了基础工具的封装和HttpClient工具的封装,实际上我们已经可以开始使用它了,但是为了达到数据可视化的效果,我希望的爬到的每一条信息都能马上让我知道它是有效的,所以我采用了消息推送的方法,实时告诉我我抓到了哪条信息,所以呢,我们需要再封装一个实时消息的工具,这里我们使用Netty作为消息管理器,开始Netty的封装。
大都会(Metropolis)
Netty的特性我们先来了解一下:
Netty的特点具有:
- 高并发:Netty 是一款基于 NIO(Nonblocking IO,非阻塞IO)开发的网络通信框架,对比于 BIO(Blocking I/O,阻塞IO),他的并发性能得到了很大提高。
- 传输快:Netty 的传输依赖于零拷贝特性,尽量减少不必要的内存拷贝,实现了更高效率的传输。
- 封装好:Netty 封装了 NIO 操作的很多细节,提供了易于使用调用接口。
所以市面上有很多工具是基于Netty进行二次封装开发的,这里我们仅对Netty做一个基本封装。
先来说一下Netty封装要注意的几点:
1.Netty是一个独立的服务,因此需要独立启动
2.客户端连接到Netty服务器端,客户标记需要我们自己标记
3.Netty的消息转发,就是将A客户端的消息发送到B客户端,从而实现A向B发送消息
4.Netty用户连接服务器时,有一定的处理次序,断开连接时一样会有处理次序,因此我们可以监听到用户上线和下线状态
5.我们将Netty服务不独立启动,而是交由SpringBoot去管理,即Springboot服务启动时自动启动Netty服务,Springboot服务关闭时,先关闭Netty服务
6.消息服务为长连接服务,不像Http请求那样请求结束后通道自动关闭,长连接是会跟服务器保持长久连接,用户不关闭,通道不关闭,因此必须使用Socket请求。
了解了上面的原理,我们开始着手准备Netty的封装吧。
前期准备
基础配置
首先我们将Netty的配置写到application.properties中,方便集中管理基础配置(这里我们在第一篇时已经加入进去了)
#Netty的自定义配置 netty.websocket.ip=0.0.0.0 netty.websocket.port=7251 netty.websocket.max-size=10240 netty.websocket.path=/channel
这样Netty的连接就表示任何IP均可访问我们服务器,连接方式为ws://127.0.0.1:7251/channel,ws://表示本请求为websocket服务,为长连接服务。
另外,我们需要定义一些规范,方便我们对消息的处理
消息规范
消息格式规范,方便我们统一消息的解析
Message.java
package com.vtarj.pythagoras.message.entity;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
import java.util.HashMap;
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class Message {
private Integer type;
private String source;
private String target;
private String message;
private HashMap ext;
}
消息异常规范,方便我们对异常消息的跟踪和反馈
ErrorType.java
package com.vtarj.pythagoras.message.entity;
public enum ErrorType {
NO_STANDARD("消息格式不标准",1001),
NOT_STANDARDIZED("消息内容不规范",1002),
NO_TARGET("未找到消息接收目标",2001),
NOT_ONLINE("消息接收人不在线",2002),
BIND_USER_STANDARD("用户绑定失败,无效用户",9001);
private final String value;
public String getValue() {
return value;
}
private final int key;
public int getKey() {
return key;
}
ErrorType(String s, int i) {
this.value = s;
this.key = i;
}
}
错误规范我们采用枚举的方式,方便加入错误内容,也方便管理错误信息
前期准备工作完成,接下来我们开始封装消息工具。
Netty封装
首先,我们要封装的就是消息服务器, 如何让Netty服务启动,启动时Netty服务进行初始配置等。
服务器封装
NettyRunner.java
package com.vtarj.pythagoras.message.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
@Component("Netty消息中心")
public class NettyRunner implements ApplicationRunner, ApplicationListener, ApplicationContextAware {
@Value("${netty.websocket.ip}")
private String ip;
@Value("${netty.websocket.port}")
private int port;
@Value("${netty.websocket.path}")
private String path;
@Value("${netty.websocket.max-size}")
private long maxSize;
//日志管理
private static final Logger logger = LoggerFactory.getLogger(NettyRunner.class);
//定义上下文服务
private ApplicationContext applicationContext;
//消息服务通道
private Channel serverChannel;
//主线程组,接收请求
private EventLoopGroup serverGroup;
//从线程组,处理主线程分配的IO操作
private EventLoopGroup clientGroup;
@Override
public void run(ApplicationArguments args) {
//创建主线程组,接收请求
serverGroup = new NioEventLoopGroup();
//创建从线程组,处理主线程分配的IO操作
clientGroup = new NioEventLoopGroup();
//创建Netty服务器,配置消息中心
ServerBootstrap server = new ServerBootstrap();
server.group(serverGroup,clientGroup);
server.channel(NioServerSocketChannel.class);
server.localAddress(new InetSocketAddress(this.ip,this.port));
server.childHandler(new ChannelInitializer() {
//初始化客户端连接通道
@Override
protected void initChannel(SocketChannel socketChannel) {
//配置消息过滤器
ChannelPipeline pi = socketChannel.pipeline();
//支持Http解码器,HttpRequestDecoder和HttpResponseEncoder的一个组合,针对http协议进行编解码
pi.addLast(new HttpServerCodec());
//支持大数据流,将大数据流分块发送客户端,防止大文件发送内存溢出
pi.addLast(new ChunkedWriteHandler());
//支持Http聚合器,将HttpMessage和HttpContents聚合到一个完成的 FullHttpRequest或FullHttpResponse中,具体是FullHttpRequest对象还是FullHttpResponse对象取决于是请求还是响应,需要放到HttpServerCodec这个处理器后面
pi.addLast(new HttpObjectAggregator(65536));
//支持入站事件控制器
pi.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception {
if(msg instanceof FullHttpRequest){
FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
String uri = fullHttpRequest.uri();
//判断请求链接非WebSocket指定的端点地址,直接响应404给客户端并关闭消息监听
//正常消息则继续处理
if(!uri.equals(path)){
ctx.channel().writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.NOT_FOUND)).addListener(ChannelFutureListener.CLOSE);
return;
}
}
super.channelRead(ctx,msg);
}
});
//支持WebSocket数据压缩扩展
pi.addLast(new WebSocketServerCompressionHandler());
//设置WebSocket向外暴露的站点信息,当启动数据压缩扩展时,第三个参数必须为true
pi.addLast(new WebSocketServerProtocolHandler(path,null,true,maxSize));
//控制反转,自定义消息处理机制,将消息交由ChatHandler处理
pi.addLast(applicationContext.getBean(ChatHandler.class));
}
});
//启动消息中心
try {
//服务器绑定监听端口,开始接收连接
this.serverChannel = server.bind().sync().channel();
logger.info("Netty消息中心服务启动,ip={},port={}", this.ip, this.port);
} catch (InterruptedException e){
e.printStackTrace();
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public void onApplicationEvent(ContextClosedEvent event) {
//关闭主线程信道
if(this.serverGroup != null){
try {
this.serverGroup.shutdownGracefully().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//关闭从线程信道
if(this.clientGroup != null){
try {
this.clientGroup.shutdownGracefully().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//关闭消息服务通道
if(this.serverChannel != null){
this.serverChannel.close();
}
logger.info("Netty消息中心服务关闭!");
}
}
消息服务启动后,接下来就是建立消息通道。客户端要向服务器发送消息,肯定得有通道存在,如何管理通道,就成了我们接下来要考虑的事情(这里我们用用户来标记通道)。
用户通道封装
UserChannel.java
package com.vtarj.pythagoras.message.netty;
import io.netty.channel.Channel;
import org.springframework.stereotype.Service;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class UserChannel {
//定义全局用户通道管理器
private static final ConcurrentHashMap USER_CHANNEL_MAP = new ConcurrentHashMap<>();
public static void add(String userKey,Channel channel){
USER_CHANNEL_MAP.put(userKey,channel);
}
public static void remove(String userKey){
USER_CHANNEL_MAP.remove(userKey);
}
public static void removeByChannelId(String channelId){
if(isNotNull(channelId)){
for (String key:
USER_CHANNEL_MAP.keySet()) {
Channel channel = USER_CHANNEL_MAP.get(key);
if(channelId.equals(channel.id().asLongText())){
remove(key);
break;
}
}
}
}
public static Channel get(String userKey){
if(isNotNull(userKey)){
return USER_CHANNEL_MAP.get(userKey);
}
return null;
}
private static boolean isNotNull(String str){
return str != null && !str.trim().isEmpty();
}
}
现在服务和用户都有了,那如何让用户通过通道进入服务器,并且让服务器记录用户信息呢?这里就涉及我们要设计用户交互方式了。
消息处理封装
ChatHandler.java
package com.vtarj.pythagoras.message.netty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.vtarj.pythagoras.message.entity.ErrorType; import com.vtarj.pythagoras.message.entity.Message; import io.netty.channel.*; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.util.concurrent.GlobalEventExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.util.HashMap; @ChannelHandler.Sharable @Component public class ChatHandler extends SimpleChannelInboundHandler{ //日志管理 private static final Logger logger = LoggerFactory.getLogger(ChatHandler.class); private static final ChannelGroup CLIENTS = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); public static int online; private static final int SYSTEM_CHANNEL = 0; private static final int TO_PERSON = 1; private static final int TO_GROUP = 2; //定义系统用户 private static final String SYSTEM_USER = "admin"; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); logger.info("【连接】 通信地址:{},时间:{}",ctx.channel().remoteAddress(),System.currentTimeMillis()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //移除信道 super.channelInactive(ctx); logger.info("【断开】 通信地址:{},时间:{}",ctx.channel().remoteAddress(),System.currentTimeMillis()); } @Override public void handlerAdded(ChannelHandlerContext ctx){ CLIENTS.add(ctx.channel()); online = CLIENTS.size(); } @Override public void handlerRemoved(ChannelHandlerContext ctx){ CLIENTS.remove(ctx.channel()); online = CLIENTS.size(); //移除用户通道 UserChannel.removeByChannelId(ctx.channel().id().asLongText()); } @Override protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame info) { //文本消息处理 if (info instanceof TextWebSocketFrame){ TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) info; Message message = decrypt(textWebSocketFrame.text()); if(message != null){ //绑定用户,激活用户通道(用户在未发送消息前通道不记录,表示该通道处于休眠状态,用户通道变更后自动更新通道) UserChannel.add(message.getSource(),ctx.channel()); //转发消息 sendTextMessage(message); }else{ ctx.channel().writeAndFlush(new TextWebSocketFrame(ErrorType.BIND_USER_STANDARD.getValue())).addListener(ChannelFutureListener.CLOSE); } } else { //不接受文本以外的数据帧类型,通道创建失败 ctx.channel().writeAndFlush(WebSocketCloseStatus.INVALID_MESSAGE_TYPE).addListener(ChannelFutureListener.CLOSE); } } public static void sendTextMessage(Message m) { if(m.getType() == TO_GROUP){ sendTextMessageToAll(m); }else if(m.getType() == TO_PERSON){ sendTextMessageToPerson(m); }else{ if(m.getType() != SYSTEM_CHANNEL){ sendError(m,ErrorType.NO_STANDARD); } } } public static void sendTextMessageToPerson(Message m){ if(m.getTarget().isEmpty()) { logger.info("{} 【消息发送失败】: {}【消息来源:{}】",System.currentTimeMillis(),ErrorType.NO_TARGET.getValue(),m.getSource()); sendError(m, ErrorType.NO_TARGET); return; } if(m.getSource().equals(m.getTarget()) || m.getTarget().equals(SYSTEM_USER)){ logger.info("{} 【消息发送失败】: {}【消息来源:{}】【消息目标:{}】",System.currentTimeMillis(),"消息发送目标异常",m.getSource(),m.getTarget()); return; } Channel channel = UserChannel.get(m.getTarget()); if (channel != null){ channel.writeAndFlush(new TextWebSocketFrame(encryption(m))); } else { logger.info("{} 【消息发送失败】: {}【消息来源:{}】【消息目标:{}】",System.currentTimeMillis(),ErrorType.NOT_ONLINE.getValue(),m.getSource(),m.getTarget()); sendError(m, ErrorType.NOT_ONLINE); } } public static void sendTextMessageToAll(Message m){ for (Channel channel: CLIENTS) { //排除消息发送人 Channel channel1 = UserChannel.get(m.getSource()); if(channel1 != null && !channel1.id().asLongText().equals(channel.id().asLongText())){ channel.writeAndFlush(new TextWebSocketFrame(encryption(m))); } } } public static void sendError(Message m,ErrorType e){ if(m.getTarget().equals(SYSTEM_USER)){ System.out.println("系统消息无法送达!"); return; } Message r = new Message(); r.setSource(SYSTEM_USER); r.setTarget(m.getSource()); r.setType(SYSTEM_CHANNEL); r.setMessage(e.getValue()); HashMap ext = new HashMap<>(); ext.put("resTime",System.currentTimeMillis()); ext.put("code",e.getKey()); ext.put("data",m); r.setExt(ext); sendTextMessageToPerson(r); } private static String encryption(Message message){ ObjectMapper mapper = new ObjectMapper(); String result = null; try{ result = mapper.writeValueAsString(message); } catch (JsonProcessingException e){ e.printStackTrace(); } return result; } private static Message decrypt(String context){ Message message = null; if(!context.isEmpty()){ ObjectMapper mapper = new ObjectMapper(); mapper.enable(SerializationFeature.INDENT_OUTPUT); mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,false); try { message = mapper.readValue(context,Message.class); } catch (JsonProcessingException e){ e.printStackTrace(); } } return message; } }
这里我们定义了一个admin用户为系统用户,用于系统给客户端发送消息(S—>A或S—>B),而非用户给用户发送模式(A—>S—>B)。
另外,服务器不会自己识别用户标记,服务器只会认识自己的CLIENTS,所以我们将用户标记存放到CLIENTS中,这样就起到我们通过用户识别CLIENTS中的Channel(通道)的目的。
消息发送原理很简单,用户跟服务器建立通道之后,用户只需要将消息放入通道,消息就可以抵达服务器,同样服务器将消息放入通道,用户就可以接收到消息,因为是长连接,所以通道会一直打开,直到用户或服务器自己关闭。向QQ等离线消息,就是先将消息暂存到服务器中,等用户上线后重新建立通道后,再将消息发送到通道中,以此实现消息离线发送的目的,这里就不具体介绍离线服务了。
用户与服务器建立连接时,需要经过(handlerAdded() -> channelRegistered() -> channelActive() -> channelRead() -> channelReadComplete())这5个阶段,相当于客户端连接一次,服务器实际要处理5次,因此我们只需要抓住某一次进行标记即可,这里我们通常监听到channelActive时就表示该通道已建立(消息通道关闭同理)。
我们本次封装会有全体消息和个人消息的区别,上面其实我们已经说了,个人消息就是发送到指定通道,全体消息就是发送给除了系统自身外的所有通道,其实由此可以衍生出群组消息、组织消息等,这里就不过多发散了。
另外,我们只封装了文本消息,图片、文件等消息支持,如感兴趣请自行摸索。
消息通道管理基本封装完成,现在我们要做的就是封装一个消息快捷发送接口,避免其他业务调用时还需要梳理一大堆关系。
NettyHelper.java
package com.vtarj.pythagoras.message;
import com.vtarj.pythagoras.message.entity.Message;
import com.vtarj.pythagoras.message.netty.ChatHandler;
public class NettyHelper {
public static void send(String from, String to, int type, String content){
Message message = new Message();
message.setMessage(content);
message.setType(type);
message.setSource(from);
message.setTarget(to);
ChatHandler.sendTextMessage(message);
}
}
消息服务端处理就此封装完成,那就有人问了,那客户端怎么连接服务器建立长连接呢?哈哈,其实我们上面已经说过,通过ws://127.0.0.1:7251/channel就可以建立通道,所以客户端采用哪种客户端就写对应的请求即可。
消息客户端
这里我们写一个JS的例子供大家参考,毕竟我们刚开始就说要在web上实现监听嘛。
message.js
let MsgAdapter = {
"user":"",
"send":function (target,message){},
"receive":function (data){}
}
$(function(){
//判断浏览器是否支持WebSocket
if(window.WebSocket){
const websocket = new WebSocket('ws://127.0.0.1:7251/channel');
websocket.onopen = () => {
if(MsgAdapter.user){
websocket.send(JSON.stringify({
"source" : MsgAdapter.user,
"type" : 0,
"target" : "",
"message" : "Hello Server!"
}));
}else {
console.log("您还未指定用户标识,消息通道无法建立!");
}
}
MsgAdapter.send = (t,m) => {
if(!t){
console.log("未指定消息接收人,消息发送失败!");
return;
}
websocket.send(JSON.stringify({
"source" : MsgAdapter.user,
"type" : 1,
"target" : t,
"message" : m
}));
}
websocket.onmessage = (e) => {
MsgAdapter.receive(e.data)
}
websocket.onclose = (e) => {
console.log("消息通道连接已断开:",e);
}
websocket.onerror = (e) => {
console.log("消息中心连接异常:",e);
}
}else{
console.log("您的浏览器不支持WebSocket服务,及时消息服务无法启用");
}
})
这里我们依赖了jquery哦,毕竟我们用了人家的$符,所以大家自行加载依赖。我们定义了一个MsgAdapter来管理消息。通过MsgAdapter.send发送消息,通过MsgAdapter.receive来接收消息。
至此,消息服务Netty的封装就完成了,接下来完事具备,我们开始搞爬虫吧!
未完待续~~~
上一篇:实战:纯手工打造Java爬虫——基于JDK11原生HttpClient(三)
下一篇:实战:纯手工打造Java爬虫——基于JDK11原生HttpClient(五)



