- 背景
- 实现步骤
- 工程结构
- 服务端
- 配置类
- 配置文件
- socket核心类
- 前端HTML
- 效果演示
最近在做大数据平台项目,需要将相关处理过程的日志实时展现给前端,目前想到的方案就是通过websocket的方式,日志收集端生产日志到kafka,我这边服务端实时消费,然后推送给前端html 实时展现。
实现步骤 工程结构 服务端 配置类kafka配置
@Component
@Configuration
@Validated
public class KafkaConfig {
public static String kafkaBootStrapServers;
// @Value("${spring.kafka.key.serializer.class}")
// private String kafkaKeySerializerClass;
@Value("${spring.kafka.bootstrap-servers}")
public void setKafkaBootStrapServers(String kafkaBootStrapServers) {
this.kafkaBootStrapServers = kafkaBootStrapServers;
}
}
WebSocket配置类
@Configuration
@EnableWebSocket
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpoint() {
return new ServerEndpointExporter();
}
}
配置文件
application.properties
# app spring.application.name=wskafka server.port=7000 # kafka spring.kafka.bootstrap-servers=kafka2:9092,kafka3:9092,kafka4:9092 spring.kafka.consumer.group-id=testGroup spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-offset-reset=latest spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer #spring.kafka.consumer.bootstrap-servers=kafka2:9092,kafka3:9092,kafka4:9092 #spring.kafka.consumer.auto-offset-reset=earliest #spring.kafka.consumer.properties.spring.json.trusted.packages=sample.kafka #spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer #kafka默认生产者配置 spring.kafka.producer.bootstrap-servers=kafka2:9092,kafka3:9092,kafka4:9092 spring.kafka.producer.acks=-1 spring.kafka.client-id=kafka-producer spring.kafka.producer.batch-size=5socket核心类
@ServerEndpoint("/msg")
@Component
@Slf4j
public class WsServerEndpoint {
@OnError
public void onError(Session session, Throwable error) {
log.info("发生错误");
error.printStackTrace();
}
@OnOpen
public void onOpen(Session session) throws UnknownHostException {
log.info("连接成功");
Properties properties = new Properties();
properties.put("client.id", InetAddress.getLocalHost().getHostName());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConfig.kafkaBootStrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Collections.singletonList("test"));// 订阅消息
while (true) {
ConsumerRecords records = kafkaConsumer.poll(100);
for (ConsumerRecord record : records) {
// log.info(String.format("topic:%s,offset:%d,消息:%s", //
// record.topic(), record.offset(), record.value()));
try {
TimeUnit.MILLISECONDS.sleep(1000L);
session.getBasicRemote().sendText(record.value());
kafkaConsumer.commitSync();
} catch (CommitFailedException | InterruptedException | IOException e) {
// application-specific rollback of processed records
log.error(e.getMessage());
}
}
}
// try {
// while (true) {
// TimeUnit.MILLISECONDS.sleep(1000L);
// session.getBasicRemote().sendText("xxxxxxxx" + System.currentTimeMillis());
// }
// } catch (IOException | InterruptedException e) {
// e.printStackTrace();
// }
}
@OnClose
public void onClose(Session session) {
log.info("连接关闭");
}
@OnMessage
public String onMsg(String text) throws IOException, InterruptedException {
return "client say:" + text;
}
}
前端HTML
Title
效果演示



