栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

基于corundumstudio建立websocket长连接

基于corundumstudio建立websocket长连接

依赖


        
            io.socket
            socket.io-client
            1.0.1
        
        
            com.corundumstudio.socketio
            netty-socketio
            ${netty-socketio.version}
        
 
            org.eclipse.paho
            org.eclipse.paho.client.mqttv3
            1.2.0
        
   
            org.apache.kafka
            kafka_2.12
            ${kafka.version}
        

代码

@Slf4j
@Component
public class SocketIOClientManager {

    @Autowired
    private KafkaConnectionManager kafkaConnectionManager;

    @Autowired
    private MqttConnectionManager mqttConnectionManager;

    @Resource
    private WebSocketEventHandler webSocketEventHandler;

    // 用来存已连接的客户端唯一ID, >
    private Map> clientMap = Collections.synchronizedMap(new HashMap<>());


    public void addClient(SocketIOClient client) {
        String sessionID = client.getSessionId().toString();
        String resourceID = getParamsByClient(client);
        if (resourceID == null) {
            log.error("客户端未配置参数");
            client.sendEvent("fail", 403, "Type" + splitStr + "address" + splitStr + "topic" + splitStr + "topic" + splitStr + "topic...");
        }
        if (clientMap.containsKey(resourceID)) {
            Map subMap = clientMap.get(resourceID);
            if (!subMap.containsKey(sessionID)) {
                subMap.put(sessionID, client);
                clientMap.put(resourceID, subMap);
            }
        } else {
            final HashMap subMap = new HashMap<>();
            subMap.put(sessionID, client);
            clientMap.put(resourceID, subMap);
        }
        log.info("在线客户端: " + clientMap.toString());
    }


    public void removeClient(SocketIOClient client) {
        String sessionID = client.getSessionId().toString();
        String resourceID = getParamsByClient(client);
        if (resourceID == null) {
            log.error("客户端未配置参数");
            client.sendEvent("fail", 403, "Type" + splitStr + "address" + splitStr + "topic" + splitStr + "topic" + splitStr + "topic...");
            return;
        }
        if (clientMap.containsKey(resourceID)) {
            final Map subMap = clientMap.get(resourceID);
            final Iterator> iterator = subMap.entrySet().iterator();
            while (iterator.hasNext()) {
                final Map.Entry clientEntry = iterator.next();
                if (clientEntry.getKey().equals(sessionID)) {
                    iterator.remove();
                    log.info("移除客户端: {}", sessionID);
                    // 如果移除session后对应url没有对应session,那么移除url
                    if (subMap.size() == 0) {
                        clientMap.remove(resourceID);
                        log.info("移除ID: {}", resourceID);
                        if (resourceID.startsWith(String.valueOf(ResourceType.KAFKA))) {
                            kafkaConnectionManager.removeConnection(resourceID);
                        }
                        if (resourceID.startsWith(String.valueOf(ResourceType.MQTT))) {
                            mqttConnectionManager.removeConnection(resourceID);
                        }
                    } else {
                        clientMap.put(resourceID, subMap);
                    }
                }
            }
        } else {
            log.info("没有 {} 对应的{} 客户端", resourceID, sessionID);
        }
    }

    public void pushClientMesg2Kafka(SocketIOClient client, String topic, String mesg) throws ExecutionException, InterruptedException {
        String resourceID = getParamsByClient(client);
        KafkaPubSubServer kafkaServer = (KafkaPubSubServer) kafkaConnectionManager.getServerByResourceID(resourceID);
        if (kafkaServer == null) {
            throw DataTException.asDataTException(CommonEnum.CONF_ERROR, "数据源未配置");
        }
        if (clientMap.containsKey(resourceID)) {
            kafkaServer.pushMesg(topic, mesg);
        }
    }

    public void pushKafkaMesg2Client(String resourceID, String mesg) {
        if (clientMap.containsKey(resourceID)) {
            Map subMap = clientMap.get(resourceID);
            for (SocketIOClient ioClient : subMap.values()) {
                ioClient.sendEvent(webSocketEventHandler.getClientSubKafkaEvent(), mesg.toString());
            }
        }
    }

    public void pushClientMesg2MQTT(SocketIOClient client, String topic, String mesg) throws MqttException {
        String resourceID = getParamsByClient(client);
        MqttPubSubServer mqttServer = (MqttPubSubServer) mqttConnectionManager.getServerByResourceID(resourceID);
        if (mqttServer == null) {
            throw DataTException.asDataTException(CommonEnum.CONF_ERROR, "数据源未配置");
        }
        if (clientMap.containsKey(resourceID)) {
            mqttServer.pushMesg(topic, mesg);
        }
    }

    public void pushMQTTMesg2Client(String resourceID, String mesg) {
        if (clientMap.containsKey(resourceID)) {
            Map subMap = clientMap.get(resourceID);
            for (SocketIOClient ioClient : subMap.values()) {
                ioClient.sendEvent(webSocketEventHandler.getClientSubEmqEvent(), mesg.toString());
            }
        }
    }


    
    private String getParamsByClient(SocketIOClient client) {
        // 从请求的连接中拿出参数(这里的loginUserNum必须是唯一标识)
        final String resourceID = client.getHandshakeData().getSingleUrlParam("resourceID");
        return resourceID;
    }

}

@Configuration
public class SocketIOConfig {

    @Value("${socket-io.host}")
    private String host;

    @Value("${socket-io.port}")
    private int port;






    public String getUrl() {
        return "http://" + host + ":" + port;
    }


    public SocketIOConfig() {
    }

    @Bean
    public SocketIOServer socketIOServer() {
        //创建Socket,并设置监听端口
        com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
        // 设置主机名,默认是0.0.0.0
        config.setHostname(host);
        // 设置监听端口
        config.setPort(port);
        // 协议升级超时时间(毫秒),默认10000。HTTP握手升级为ws协议超时时间
        config.setUpgradeTimeout(10000);
        // Ping消息间隔(毫秒),默认25000。客户端向服务器发送一条心跳消息间隔
        config.setPingInterval(25000);
        // Ping消息超时时间(毫秒),默认60000,这个时间间隔内没有接收到心跳消息就会发送超时事件
        config.setPingTimeout(60000);
        return new SocketIOServer(config);
    }

    @Bean
    public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
        return new SpringAnnotationScanner(socketServer);
    }
}
@Component
@Slf4j
public class WebSocketEventHandler {

    @Autowired
    private SocketIOClientManager socketIOClientManager;

    public String getClientSubKafkaEvent() {
        return clientSubKafkaEvent;
    }

    public String getClientPubKafkaEvent() {
        return clientPubKafkaEvent;
    }

    public String getClientSubEmqEvent() {
        return clientSubEmqEvent;
    }

    public String getClientPubEmqEvent() {
        return clientPubEmqEvent;
    }


    private final String clientSubKafkaEvent = "subKafka";

    private final String clientPubKafkaEvent = "pubKafka";

    private final String clientSubEmqEvent = "subEmq";

    private final String clientPubEmqEvent = "pubEmq";

    @OnConnect
    public void onConnect(SocketIOClient client) {
        log.info("客户端发起连接. sessionId->{}", client.getSessionId());
        socketIOClientManager.addClient(client);
    }

    @OnDisconnect
    public void onDisconnect(SocketIOClient client) {
        final String sessionID = client.getSessionId().toString();
        log.info("客户端断开连接, sessionId->{}" + sessionID);
        socketIOClientManager.removeClient(client);
        client.disconnect();
    }

    // kafka消息接收入口
    @OnEvent(value = clientPubKafkaEvent)
    public void pushKafka(SocketIOClient client, AckRequest ackRequest, String topic, String mesg) {
        if (StrUtil.isEmpty(topic)) {
            ackRequest.sendAckData(400, "topic不能为空");
        }
        if (StrUtil.isEmpty(mesg)) {
            ackRequest.sendAckData(400, "mesg不能为空");
        }
        try {
            socketIOClientManager.pushClientMesg2Kafka(client, topic, mesg);
            ackRequest.sendAckData(200, "id");
        } catch (Exception e) {
            e.printStackTrace();
            ackRequest.sendAckData(500, e.getMessage());
        }
    }

    // emq信息接收入口
    @OnEvent(value = clientPubEmqEvent)
    public void pushEmq(SocketIOClient client, AckRequest ackRequest, String topic, String mesg) {
        if (StrUtil.isEmpty(topic)) {
            ackRequest.sendAckData(400, "topic不能为空");
        }
        if (StrUtil.isEmpty(mesg)) {
            ackRequest.sendAckData(400, "mesg不能为空");
        }
        try {
            socketIOClientManager.pushClientMesg2MQTT(client, topic, mesg);
            ackRequest.sendAckData(200, "id");
        } catch (Exception e) {
            e.printStackTrace();
            ackRequest.sendAckData(500, e.getMessage());
        }
    }


}
@Component
@Order(1)
public class ServerRunner implements CommandLineRunner {
    private final SocketIOServer server;
    private static final Logger logger = LoggerFactory.getLogger(ServerRunner.class);

    @Autowired
    public ServerRunner(SocketIOServer server) {
        this.server = server;
    }

    @Override
    public void run(String... args) {
        logger.info("SocketIO 启动...");
        server.start();
    }
}
@Slf4j
public class SocketClientEMQTest {
    public static void main(String[] args) {
        final SocketClientEMQTest socketClientTest = new SocketClientEMQTest();
        try {
            socketClientTest.run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void run(String... args) throws Exception {
        URI uri = URI.create("http://127.0.0.1:9201");
        IO.Options options = new IO.Options();
        options.transports = new String[]{"websocket"};
        options.reconnectionAttempts = 2;
        options.query = "resourceID=" + "mqtt$$tcp://localhost:1883$$test";
        Socket socket = IO.socket(uri, options);
        socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                log.info("connect: {}", args);
            }
        });
        socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                log.info("disconnect: {}", args);
            }
        });

        socket.on("subEmq", new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                log.info("push_mqtt {}", args);
            }
        });
      

        final ArrayList arrayList = new ArrayList<>();
//        arrayList.add("")
        int i = 0;
        while (true) {
            i += 1;
            socket.emit("pubEmq", "test", "testmesg" + i, new Ack() {
                @Override
                public void call(Object... objects) {
                    log.info("userChat ack:{}|{}", objects[0], objects[1]);
                }
            });
            if (i >= 10) {
                break;
            }
            Thread.sleep(2000);
        }
        socket.connect();
        LockSupport.park();
    }
}
@Slf4j
public class SocketClientKAFKATest {
    public static void main(String[] args) {
        final SocketClientKAFKATest socketClientTest = new SocketClientKAFKATest();
        try {
            socketClientTest.run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void run(String... args) throws Exception {
        URI uri = URI.create("http://127.0.0.1:9201");
        IO.Options options = new IO.Options();
        options.transports = new String[]{"websocket"};
        options.reconnectionAttempts = 2;
        options.query = "resourceID=" + "kafka$$localhost:9092$$test12399";
//        options.query = "loginUserNum=" + "mqtt$$tcp://localhost:1883$$test";
        Socket socket = IO.socket(uri, options);
        socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                log.info("connect: {}", args);
            }
        });
        socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                log.info("disconnect: {}", args);
            }
        });
        socket.on("subKafka", new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                log.info("push_kafka {}", args);
            }
        });

      

        final ArrayList arrayList = new ArrayList<>();
//        arrayList.add("")
        int i = 0;
        while (true) {
            i += 1;
            socket.emit("pubKafka", "TEST", "testmesg" + i, new Ack() {
                @Override
                public void call(Object... objects) {
                    log.info("userChat ack:{}|{}", objects[0], objects[1]);
                }
            });
            if (i >= 10) {
                break;
            }
            Thread.sleep(2000);
        }
        socket.connect();
        LockSupport.park();
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/583479.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号