springboot整合SSE技术开发经验总结及心得
作者:mmseoamin日期:2023-12-11

springboot整合SSE技术开发经验总结及心得

  • 一、开发背景
  • 二、快速了解SSE
    • 1、概念
    • 2、特性
    • 三、开发思路
    • 四、代码演示
      • 1、引入依赖
      • 2、服务端代码
      • 3、后端定时任务代码
      • 4、解决乱码的实体类
        • 4、前端代码
        • 五、核心代码分析

          一、开发背景

          公司需要开发一个大屏界面,大屏页面的数据是实时更新的,由后端主动实时推送数据给大屏页面。此时会立刻联想到:websocket 技术。当然使用websocket,确实可以解决这个场景。但是今天本文的主角是 :SSE,他和websocket略有不同,SSE只能由服务端主动发消息,而websocket前后端都可以推送消息。

          二、快速了解SSE

          1、概念

          SSE全称 Server Sent Event,顾名思义,就是服务器发送事件,所以也就注定了他 只能由服务端发送信息。

          2、特性

          • 主动从服务端推送消息的技术
          • 本质是一个HTTP的长连接
          • 发送的是一个stream流,格式为text/event-stream

            三、开发思路

            要实现后端的实时推送消息,前台实时更新数据,思路如下:

            • 1、前后端需要建立连接
            • 2、后端如何做到实时推送信息呢?可以采用定时调度

              四、代码演示

              1、引入依赖

              原则上是不需要引入的,因为springboot底层已经整合了SSE

              
                  org.springframework.boot
                  spring-boot-starter-web
              
              

              2、服务端代码

              controller层

              @RestController
              @CrossOrigin
              @RequestMapping("/sse")
              public class SseEmitterController extends BaseController {
                  @Autowired
                  private SseEmitterService sseEmitterService;
                  /**
                   * 创建SSE连接
                   *
                   * @return
                   */
                  @GetMapping("/connect/{type}")
                  public SseEmitter connect(@PathVariable("type") String type) {
                      return sseEmitterService.connect(type);
                  }
              }
              

              service层

              public interface SseEmitterService {
                  SseEmitter connect(String type);
                  void volumeOverview();
                  void sysOperation();
                  void monitor();
                  ........
              }
              

              service实现层

              @Service
              public class SseEmitterServiceImpl implements SseEmitterService {
                  private final Logger logger = LoggerFactory.getLogger(this.getClass());
                  private static Map sseCache = new ConcurrentHashMap<>();
                  /**
                   * 创建连接sse
                   * @param type
                   * @return
                   */
                  @Override
                  public SseEmitter connect(String type) {
                      final String clientId = UUID.randomUUID().toString().replace("-", "");
                      SseEmitterUTF8 sseEmitter = new SseEmitterUTF8(0L);
                      try {
                          sseEmitter.send(SseEmitter.event().comment("创建连接成功 !!!"));
                      } catch (IOException e) {
                          logger.error("创建连接失败 , {} " , e.getMessage());
                      }
                      sseEmitter.onCompletion(() -> {
                          logger.info("connect onCompletion , {} 结束连接 ..." , clientId);
                          removeClient(clientId);
                      });
                      sseEmitter.onTimeout(() -> {
                          logger.info("connect onTimeout , {} 连接超时 ..." , clientId);
                          removeClient(clientId);
                      });
                      sseEmitter.onError((throwable) -> {
                          logger.error("connect onError , {} 连接异常 ..." , clientId);
                          removeClient(clientId);
                      });
                      sseCache.put(clientId, sseEmitter);
                      //立即推送
                      volumeOverview();
                      dealResp();
                      monitor();
                      if (type.equals(SseEmitterConstant.OVER_VIEW)){
                          sysOperation();
                          mileStone();
                      }
                      logger.info("当前用户总连接数 : {} " , sseCache.size());
                      return sseEmitter;
                  }
                  /**
                   * 交易量概览
                   */
                  @Override
                  public void volumeOverview() {
                      Map map = new HashMap<>();
                      map.put("latest_tps",440.3);
                      map.put("total_cics_trans",341656001);
                      map.put("total_zjcx_trans",391656001);
                      map.put("zjcx_tps",23657);
                      map.put("day10",48388352);
                      map.put("history",105013985);
                      SseEmitter.SseEventBuilder data = SseEmitter.event().name(SseEmitterConstant.VOLUME_OVERVIEW).data(map, MediaType.APPLICATION_JSON);
                      for (Map.Entry entry : sseCache.entrySet()) {
                          SseEmitterUTF8 sseEmitter = entry.getValue();
                          if (sseEmitter == null) {
                              continue;
                          }
                          try {
                              sseEmitter.send(data);
                          } catch (IOException e) {
                              String body = "SseEmitterServiceImpl[volumeOverview  ]";
                              logger.error(body + ": 向客户端 {} 推送消息失败 , 尝试进行重推 : {}", entry.getKey() ,e.getMessage());
                              messageRepush(entry.getKey(),data,body);
                          }
                      }
                  }
              		private void messageRepush(String type, SseEmitter.SseEventBuilder data,String body){
                      for (int i = 0; i < 3; i++) {
                          try {
                              Thread.sleep(2000);
                              SseEmitterUTF8 sseEmitter = sseCache.get(type);
                              if (sseEmitter == null) {
                                  logger.error(body + " :向客户端{} 第{}次消息重推失败,未创建长链接", type, i + 1);
                                  continue;
                              }
                              sseEmitter.send(data);
                          } catch (Exception ex) {
                              logger.error(body + " :向客户端{} 第{}次消息重推失败", type, i + 1, ex);
                              continue;
                          }
                          logger.info(body + " :向客户端{} 第{}次消息重推成功", type, i + 1);
                          return;
                      }
                  }
              

              常量类

              public class SseEmitterConstant {
                  /**
                   * 创建连接的客户端类型
                   */
                  public static final String OVER_VIEW = "overview";
                  /**
                   * even 数据类型
                   */
                  public static final String VOLUME_OVERVIEW = "vw";
                  public SseEmitterConstant(){}
              }
              

              3、后端定时任务代码

              采用注解的方式实现:@Scheduled,使用该注解时,需要增加这个注解@EnableScheduling,相当于来开启定时调度功能,如果不加@EnableScheduling注解,那么定时调度会不生效的。

              启动类增加注解@EnableScheduling

              package com.hidata;
              import org.mybatis.spring.annotation.MapperScan;
              import org.springframework.boot.SpringApplication;
              import org.springframework.boot.autoconfigure.SpringBootApplication;
              import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
              import org.springframework.context.annotation.ComponentScan;
              import org.springframework.scheduling.annotation.EnableScheduling;
              @SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })
              @EnableScheduling
              public class HidataApplication {
                  public static void main(String[] args)
                  {
                      SpringApplication.run(HidataApplication.class, args);
                      System.out.println("[HiUrlShorter platform startup!]");
                  }
              }
              

              创建 定时任务调度类,在该类上加上@Scheduled注解,

              @Configuration
              public class SendMessageTask{
                  private final Logger logger = LoggerFactory.getLogger(this.getClass());
                  @Autowired
                  private SseEmitterService sseEmitterService;
                  @Scheduled(cron = "0/40 * * * * ?}")
                  public void volumeOverviewTask() {
                      try {
                          sseEmitterService.volumeOverview();
                      } catch (Exception e) {
                          logger.error("SendMessageTask [volumeOverviewTask]: {} ",e.getMessage());
                      }
                  }
              .......
              }
              

              4、解决乱码的实体类

              如果发送中文数据的时候,会出现乱码的现象。此时需要做对应的处理

              package com.hidata.devops.lagrescreen.domain;
              import org.springframework.http.HttpHeaders;
              import org.springframework.http.MediaType;
              import org.springframework.http.server.ServerHttpResponse;
              import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
              import java.nio.charset.StandardCharsets;
              public class SseEmitterUTF8 extends SseEmitter {
                  public SseEmitterUTF8(Long timeout) {
                      super(timeout);
                  }
                  @Override
                  protected void extendResponse(ServerHttpResponse outputMessage) {
                      super.extendResponse(outputMessage);
                      HttpHeaders headers = outputMessage.getHeaders();
                      headers.setContentType(new MediaType(MediaType.TEXT_EVENT_STREAM, StandardCharsets.UTF_8));
                  }
              }
              

              4、前端代码

                  // 连接服务器
                  var sseSource = new EventSource("http://localhost:8080/sse/connect");
                  // 连接打开
                  sseSource.onopen = function () {
                      console.log("连接打开");
                  }
                  // 连接错误
                  sseSource.onerror = function (err) {
                      console.log("连接错误:", err);
                  }
                  
              	//接收信息
                  eventSource.addEventListener("vw", function (event) {
                  console.log(event.data);
                  .....
                });
              

              五、核心代码分析

              先看代码片段

              SseEmitter.event().name("vw").data(map, MediaType.APPLICATION_JSON);
              

              分析:

              后端不会把所有数据一起发送给前端,而是会把页面分成多个模块,然后发给前端,此时前端需要区分哪一块数据对应哪一块页面。所以我们可以给各个模块的数据起个名字。也就是上述的代码

              SseEmitter.event().name("vw")
              

              这样,前端就知道怎么渲染页面了,类似于这样

              springboot整合SSE技术开发经验总结及心得,在这里插入图片描述,第1张

              关于even()的属性,可以查看源码,

              public interface SseEventBuilder {
                      SseEmitter.SseEventBuilder id(String var1);
                      SseEmitter.SseEventBuilder name(String var1);
                      SseEmitter.SseEventBuilder reconnectTime(long var1);
                      SseEmitter.SseEventBuilder comment(String var1);
                      SseEmitter.SseEventBuilder data(Object var1);
                      SseEmitter.SseEventBuilder data(Object var1, @Nullable MediaType var2);
                      Set build();
                  }
              

              springboot整合SSE技术开发经验总结及心得,在这里插入图片描述,第2张