栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Springboot+Netty优雅的创建websocket客户端 (附源码下载)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Springboot+Netty优雅的创建websocket客户端 (附源码下载)

Springboot-cli 开发脚手架系列

Netty系列:Springboot+Netty优雅的创建websocket客户端 (附源码下载)


文章目录
  • Springboot-cli 开发脚手架系列
  • 前言
    • 1. 环境
    • 2. 引入websocket编码解码器
    • 3. 编写websocket处理器
    • 4. http测试接口编写
    • 5. 效果演示
    • 6. 源码分享


前言

首先我们需要使用Netty搭建基础的tcp框架,参考Springboot使用Netty优雅的创建TCP客户端(附源码),接下来我们开始集成websocket。
结尾有完整源码下载地址

1. 环境
  • pom.xml

            io.netty
            netty-all
            ${netty-all.version}
        
  • yml开启日记debug级别打印
# 日记配置
logging:
  level:
    # 开启debug日记打印
    com.netty: debug
2. 引入websocket编码解码器

这里我们需要加入websocket编码解码器,因为websocket的握手是通过http完成的,所以我们还需要加入http的编码器。

@Component
@RequiredArgsConstructor
public class ChannelInit extends ChannelInitializer {

    private final MessageHandler messageHandler;

    @Override
    protected void initChannel(SocketChannel channel) {
        channel.pipeline()
                // 每隔60s的时间触发一次userEventTriggered的方法,并且指定IdleState的状态位是WRITER_IDLE,事件触发给服务器发送ping消息
                .addLast("idle", new IdleStateHandler(0, 60, 0, TimeUnit.SECONDS))
                // 添加解码器
                .addLast(new HttpClientCodec())
                .addLast(new ChunkedWriteHandler())
                .addLast(new HttpObjectAggregator(1024 * 1024 * 10))
                .addLast(new WebSocketFrameAggregator(1024 * 62))
                // 添加消息处理器
                .addLast("messageHandler", messageHandler);
    }
}
3. 编写websocket处理器
  • 修改消息处理器MessageHandler.java
  • 增加两个全局变量,保存当前连接
    
    private WebSocketClientHandshaker handShaker;
    
    private ChannelPromise handshakeFuture;
  • 连接成功开始握手
 @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        // 保存Promise
        this.handshakeFuture = ctx.newPromise();
    }
@Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.debug("n");
        log.debug("握手开始,channelId:{}", ctx.channel().id());
        handShaker = WebSocketClientHandshakerFactory.newHandshaker(new URI("ws://" + WebsocketClient.connectedIp + ":" + WebsocketClient.connectedPort + "/ws"), WebSocketVersion.V13, null, false, new DefaultHttpHeaders());
        handShaker.handshake(ctx.channel());
        super.channelActive(ctx);
    }
  • 完整代码
@Slf4j
@Component
@ChannelHandler.Sharable
@RequiredArgsConstructor
public class MessageHandler extends SimpleChannelInboundHandler {

    private WebSocketClientHandshaker handShaker;

    private ChannelPromise handshakeFuture;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        // 连接前执行
        this.handshakeFuture = ctx.newPromise();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object message) {
        log.debug("n");
        log.debug("channelId:" + ctx.channel().id());
        // 判断是否正确握手
        if (!this.handShaker.isHandshakeComplete()) {
            try {
                this.handShaker.finishHandshake(ctx.channel(), (FullHttpResponse) message);
                log.debug("websocket Handshake 完成!");
                this.handshakeFuture.setSuccess();
            } catch (WebSocketHandshakeException e) {
                log.debug("websocket连接失败!");
                this.handshakeFuture.setFailure(e);
            }
            return;
        }
        // 握手失败响应
        if (message instanceof FullHttpResponse) {
            FullHttpResponse response = (FullHttpResponse) message;
            log.error("握手失败!code:{},msg:{}", response.status(), response.content().toString(CharsetUtil.UTF_8));
        }
        WebSocketFrame frame = (WebSocketFrame) message;
        // 消息处理
        if (frame instanceof TextWebSocketFrame) {
            TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
            log.debug("收到消息: " + textFrame.text());
        }
        if (frame instanceof PongWebSocketFrame) {
            log.debug("pong消息");
        }
        if (frame instanceof CloseWebSocketFrame) {
            log.debug("服务器主动关闭连接");
            ctx.close();
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        log.debug("n");
        log.debug("连接断开");
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.debug("n");
        log.debug("握手开始,channelId:{}", ctx.channel().id());
        handShaker = WebSocketClientHandshakerFactory.newHandshaker(new URI("ws://" + WebsocketClient.connectedIp + ":" + WebsocketClient.connectedPort + "/ws"), WebSocketVersion.V13, null, false, new DefaultHttpHeaders());
        handShaker.handshake(ctx.channel());
        super.channelActive(ctx);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        log.debug("超时事件时触发");
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            // 当我们长时间没有给服务器发消息时,发送ping消息,告诉服务器我们还活跃
            if (event.state().equals(IdleState.WRITER_IDLE)) {
                log.debug("发送心跳");
                ctx.writeAndFlush(new PingWebSocketFrame());
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}
 
4. http测试接口编写 
  • pom.xml加入web依赖

        org.springframework.boot
        spring-boot-starter-web

  • HttpApi .java
@RequiredArgsConstructor
@RestController
@Slf4j
public class HttpApi {

    private final WebsocketClient websocketClient;

    
    @GetMapping("/send")
    public String send(String message) {
        websocketClient.getSocketChannel().writeAndFlush(new TextWebSocketFrame(message));
        return "发送成功";
    }

    
    @PostMapping("/send/json")
    public String send(@RequestBody JSONObject body) {

        websocketClient.getSocketChannel().writeAndFlush(new TextWebSocketFrame(body.toJSONString()));
        return "发送成功";
    }

    
    @GetMapping("connect")
    public String connect(String ip, Integer port) throws Exception {
        websocketClient.connect(ip, port);
        return "重启指令发送成功";
    }

    
    @GetMapping("reconnect")
    public String reconnect() throws Exception {
        websocketClient.reconnect();
        return "重启指令发送成功";
    }
}
5. 效果演示
  • 客户端服务类和启动类基础模块的搭建参考开头提供的连接进行搭建即可,这里就不重复了

  • 我们这里通过Springboot+Netty优雅的开发websocket高性能服务器搭建的服务器配合测试

  • 测试接口

基础接口 http://localhost:9999

# 1. 发送消息
/send?message=hello
# 2. 连接
/connect?ip=192.168.0.99&port=20000
# 3. 重连
/reconnect
# 5. 发送json
```json
Request URL:  http://localhost:9999/send/json
Request Method: POST
Request Headers:
{
   "Content-Type":"application/json"
}
Request Body:
{
   "msgId": 1,
   "type": 1,
   "data": {
            "message":"hello"
           }
}
  • 效果

6. 源码分享
  • Springboot-cli开发脚手架,集合各种常用框架使用案例,完善的文档,致力于让开发者快速搭建基础环境并让应用跑起来。
  • 项目源码github地址
  • 项目源码国内gitee地址
转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号