栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Springboot之整合Socket连接案例

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Springboot之整合Socket连接案例

Socket连接与硬件通信

一、如何让socket随着springboot项目一起启动

SpringBoot中CommandLineRunner的作用:平常开发中有可能需要实现在项目启动后执行的功能,SpringBoot提供的一种简单的实现方案就是添加一个model并实现CommandLineRunner接口,实现功能的代码放在实现的run方法中

具体实现

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Component
public class TestRunner implements CommandLineRunner {
  @Autowired
  private SocketProperties properties;
  @Override
  public void run(String... args) throws Exception {
    ServerSocket server = null;
    Socket socket = null;
    server = new ServerSocket(properties.getPort());
    System.out.println("设备服务器已经开启, 监听端口:" + properties.getPort());
    ThreadPoolExecutor pool = new ThreadPoolExecutor(
 properties.getPoolCore(),
 properties.getPoolMax(),
 properties.getPoolKeep(),
 TimeUnit.SECONDS,
 new ArrayBlockingQueue(properties.getPoolQueueInit()),
 new ThreadPoolExecutor.DiscardOldestPolicy()
    );
    while (true) {
      socket = server.accept();
      pool.execute(new ServerConfig(socket));
    }
  }
}

此处使用了自定义的线程池,提高对于socket的客户端处理能力。

二、自定义配置并使用

此处将socket的端口和线程池的一些配置放到 application.yml中使用,方便使用和修改

# Socket配置
socket:
 # 监听端口 2323
 port: 2323
 # 线程池 - 保持线程数 20
 pool-keep: 20
 # 线程池 - 核心线程数 10
 pool-core: 10
 # 线程池 - 最大线程数 20
 pool-max: 30
 # 线程队列容量 10
 pool-queue-init: 10
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;

@Setter
@Getter
@ToString
@Component
@Configuration
@PropertySource("classpath:application.yml")
@ConfigurationProperties(prefix = "socket")
public class SocketProperties {
  private Integer port;
  private Integer poolKeep;
  private Integer poolCore;
  private Integer poolMax;
  private Integer poolQueueInit;
}
三、Socket对于客户端发来的信息的处理和重发机制

当客户端端连接之后发送信息,如果超时未发送,将会关闭,发送数据有异常将会返回给客户端一个error,让客户端在发送一次数据。

import com.farm.config.socket.resolve.MessageChain;
import com.farm.service.EnvironmentService;
import com.farm.service.impl.EnvironmentServiceImpl;
import java.io.*;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.Map;

public class ServerConfig extends Thread {
  private Socket socket;
  public ServerConfig(Socket socket) {
    this.socket = socket;
  }
	// 获取spring容器管理的类,可以获取到sevrice的类
  private EnvironmentService service = SpringUtil.getBean(EnvironmentServiceImpl.class);
  private String handle(InputStream inputStream) throws IOException, DataFormException {
    byte[] bytes = new byte[1024];
    int len = inputStream.read(bytes);
    if (len != -1) {
      StringBuffer request = new StringBuffer();
      request.append(new String(bytes, 0, len, "UTF-8"));
      System.out.println("接受的数据: " + request);
      System.out.println("from client ... " + request + "当前线程" + Thread.currentThread().getName());
      Map map = MessageChain.out(request.toString());
      System.out.println("处理的数据" + map);
      Integer res = service.addEnvironment(map);
      if (res == 1) {
 return "ok";
      } else {
 throw new DataFormException("数据处理异常");
      }
    } else {
      throw new DataFormException("数据处理异常");
    }
  }
  @Override
  public void run() {
    BufferedWriter writer = null;
    try {
      // 设置连接超时9秒
      socket.setSoTimeout(9000);
      System.out.println("客户 - " + socket.getRemoteSocketAddress() + " -> 机连接成功");
      InputStream inputStream = socket.getInputStream();
      writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
      String result = null;
      try {
 result = handle(inputStream);
 writer.write(result);
 writer.newline();
 writer.flush();
      } catch (IOException | DataFormException | IllegalArgumentException e) {
 writer.write("error");
 writer.newline();
 writer.flush();
 System.out.println("发生异常");
 try {
   System.out.println("再次接受!");
   result = handle(inputStream);
   writer.write(result);
   writer.newline();
   writer.flush();
 } catch (DataFormException | SocketTimeoutException ex) {
   System.out.println("再次接受, 发生异常,连接关闭");
 }
      }
    } catch (SocketException socketException) {
      socketException.printStackTrace();
      try {
 writer.close();
      } catch (IOException ioException) {
 ioException.printStackTrace();
      }
    } catch (IOException e) {
      e.printStackTrace();
    } finally {
      try {
 writer.close();
      } catch (IOException e) {
 e.printStackTrace();
      }
    }
  }
}

在此处有一个坑,如果客户端是用C/C++编写的,必须使用如下方法:

byte[] bytes = new byte[1024];
int len = inputStream.read(bytes);

如果使用readLine或者 DataInputStream dataInputStream =new DataInputStream(socket.getInputStream())这样会出现使用TCP连接助手,客户端发送数据收不到。

四、如何在普通类中使用Spring注入类

这里需要使用一个工具类。

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

@Component
public class SpringUtil implements ApplicationContextAware {
  private static ApplicationContext applicationContext;
  @Override
  public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    if (SpringUtil.applicationContext == null) {
      SpringUtil.applicationContext = applicationContext;
    }
  }
  
  public static ApplicationContext getApplicationContext() {
    return applicationContext;
  }
  
  public static Object getBean(String name){
    return getApplicationContext().getBean(name);
  }
  
  public static  T getBean(Class clazz){
    return getApplicationContext().getBean(clazz);
  }
  
  public static  T getBean(String name,Class clazz){
    return getApplicationContext().getBean(name, clazz);
  }
}

补充:springboot下websocket前台后端数据长连接

首先导入依赖

 
      org.springframework.boot
      spring-boot-starter-websocket
    
    
      org.springframework.security
      spring-security-messaging
    

spring-security-messaging 是后面继承 AbstractSecurityWebSocketMessageBrokerConfigurer需要用到的依赖

WebSocketConfig

@Configuration
@EnableWebSocketMessageBroker //此注解表示使用STOMP协议来传输基于消息代理的消息,此时可以在@Controller类中使用@MessageMapping 
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
  @Override
  public void registerStompEndpoints(StompEndpointRegistry registry) {
     
    registry.addEndpoint("/websocket/tracker")   //物流消息通道,
      .setAllowedOrigins("*")   //允许跨域,里面路径可以设定
      .withSockJS()   //指定协议
      .setInterceptors(httpSessionHandshakeInterceptor()) ;    //设置拦截器()
  }
  @Override
  public void configureMessageBroker(MessageBrokerRegistry registry) {
     
    registry.enableSimpleBroker("/topic","/user");
  }
 //拦截器
 @Bean
  public HandshakeInterceptor httpSessionHandshakeInterceptor() {
    return new HandshakeInterceptor() {
      @Override
      public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) throws Exception {
 //可以在这里先判断登录是否合法
 return true;
      }
      @Override
      public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
  //握手成功后,
      }
    };
  }
}

WebsocketSecurityConfiguration

@Configuration
public class WebsocketSecurityConfiguration extends AbstractSecurityWebSocketMessageBrokerConfigurer {
  @Override
  protected void configureInbound(MessageSecuritymetadataSourceRegistry messages) {
    messages
      .nullDestMatcher().authenticated()
      .simpDestMatchers("/topic
  @Override
  protected boolean sameOriginDisabled() {
    return true;
  }
}

WebSocketResource

package com.gleam.shopmall.web.rest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageMappingInfo;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.stereotype.Controller;
import org.springframework.web.socket.messaging.SessionDisconnectEvent;
@Controller
public class WebSocketResource {
  private static final Logger log = LoggerFactory.getLogger(WebSocketResource.class);
  @Autowired
  SimpMessageSendingOperations messagingTemplate;
 //此方法适用于网页聊天室,从前端接收数据,返回订阅者(前端)
  @MessageMapping("/welcome") //指定要接收消息的地址,类似@RequestMapping
  @SendTo("/topic/getResponse")  //默认消息将被发送到与传入消息相同的目的地,但是目的地前面附加前缀(默认情况下为“/topic”}
  public String say(String message) throws Exception {
    return message;
  }
 //发送指定用户(直接从后端发送数据到前端)
  public void sendToUser(String login,String channel, String info) {
    log.debug("[ToUser]WEBSOCKET发送消息, username={}, info={}", login, info);
    this.messagingTemplate.convertAndSendToUser(login, channel, info);
    log.debug("[ToUser]WEBSOCKET发送消息:完成");
  }
 //发送所有订阅的(直接从后端发送数据到前端)
  public void send(String channel, String info) {
    log.debug("[ToAll]WEBSOCKET发送消息, info={}", info);
    // this.messagingTemplate.convertAndSend(channel, info);
    this.messagingTemplate.convertAndSend("/topic/getResponse", "接收到了吗?");
    log.debug("[ToAll]WEBSOCKET发送消息:完成");
  }
}

前端html




  
  
  
  
  
  Spring Boot+WebSocket+广播式
  


以上为个人经验,希望能给大家一个参考,也希望大家多多支持考高分网。如有错误或未考虑完全的地方,望不吝赐教。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/129184.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号