基于Springboot+WebSocket+Netty实现在线聊天、群聊系统
作者:mmseoamin日期:2023-12-11

文章目录

    • 一、文章前言
    • 二、开发流程及工具准备
    • 三、开发步骤

      一、文章前言

      此文主要实现在好友添加、建群、聊天对话、群聊功能,使用Java作为后端语言进行支持,界面友好,开发简单。

      二、开发流程及工具准备

      2.1、下载安装IntelliJ IDEA(后端语言开发工具),Mysql数据库,微信Web开发者工具。

      三、开发步骤

      1.创建maven project

      先创建一个名为SpringBootDemo的项目,选择【New Project】

      在这里插入图片描述

      然后在弹出的下图窗口中,选择左侧菜单的【New Project】(注:和2022之前的idea版本不同,这里左侧没有【Maven】选项,不要选【Maven Archetype】!!!),输入Name(项目名):SpringBootDemo,language选择【java】,build system选择【maven】,然后选择jdk,我这里选的是jdk18.

      在这里插入图片描述然后点击【Create】

      在这里插入图片描述

      2.在project下创建module

      点击右键选择【new】—【Module…】

      在这里插入图片描述

      左侧选择【Spring initializr】,通过idea中集成的Spring initializr工具进行spring boot项目的快速创建。窗口右侧:name可根据自己喜好设置,group和artifact和上面一样的规则,其他选项保持默认值即可,【next】

      在这里插入图片描述

      Developer Tools模块勾选【Spring Boot DevTools】,web模块勾选【Spring Web】

      在这里插入图片描述

      此时,一个Springboot项目已经搭建完成,可开发后续功能

      3.编写一个消息实体类、Mapper、service(三层架构)

      @Data
      public class Chat {
          @TableId(type = IdType.AUTO)
          private Long id;
          private Long userId;
          private Long targetUserId;
          private LocalDateTime createTime;
          private String userName;
          private String targetUserName;
          private String content;
      }
      

      由于我们使用mybatis-plus,所以简单的增删改查不用自己写,框架自带了,只需要实现或者继承他的Mapper、Service

      在这里插入图片描述在这里插入图片描述

      4.编写WebSocket服务类

      @ServerEndpoint("/imserver/{userId}")
      @Component
      public class WebSocketService {
          /**
           * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
           */
          private static ConcurrentHashMap webSocketMap = new ConcurrentHashMap<>();
          /**
           * 与某个客户端的连接会话,需要通过它来给客户端发送数据
           */
          private Session session;
          /**
           * 接收userId
           */
          private String userId = "";
          public static ChatMapper chatMapper = null;
          /**
           * 连接建立成功调用的方法
           * 

      * 1.用map存 每个客户端对应的MyWebSocket对象 */ @OnOpen public void onOpen(Session session, @PathParam("userId") String userId) { this.session = session; this.userId = userId; if (webSocketMap.containsKey(userId)) { webSocketMap.remove(userId); webSocketMap.put(userId, this); //加入set中 } else { webSocketMap.put(userId, this); //加入set中 } } /** * 报错 * * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { error.printStackTrace(); } /** * 实现服务器推送到对应的客户端 */ public void sendMessage(String message) { try { this.session.getBasicRemote().sendText(message); } catch (IOException e) { e.printStackTrace(); } } /** * 自定义 指定的userId服务端向客户端发送消息 */ public static void sendInfo(Chat chat) { QueryWrapper queryWrapper = new QueryWrapper(); List chats=chatMapper.selectList(queryWrapper.lambda() .eq(Chat::getTargetUserId, chat.getTargetUserId()).or().eq(Chat::getUserId, chat.getTargetUserId()).or().eq(Chat::getTargetUserId, chat.getUserId()).or().eq(Chat::getUserId, chat.getUserId())); //log.info("发送消息到:"+userId+",报文:"+message); if (!StringUtils.isEmpty(chat.getTargetUserId().toString()) && webSocketMap.containsKey(chat.getTargetUserId().toString())) { webSocketMap.get(chat.getUserId().toString()).sendMessage(JSONObject.toJSONString(chats)); webSocketMap.get(chat.getTargetUserId().toString()).sendMessage(JSONObject.toJSONString(chats)); } else { webSocketMap.get(chat.getUserId().toString()).sendMessage(JSONObject.toJSONString(chats)); } } /** * 自定义关闭 * * @param userId */ public static void close(String userId) { if (webSocketMap.containsKey(userId)) { webSocketMap.remove(userId); } } /** * 获取在线用户信息 * * @return */ public static Map getOnlineUser() { return webSocketMap; }

      5.创建控制器Controller

      先创建Controller Package

      在这里插入图片描述

      创建一个Controller

      在这里插入图片描述

      输入类名,选在【Class】

      在这里插入图片描述

      因为要编写Rest风格的Api,要在Controller上标注@RestController注解

      6.创建具体的Api接口

      @RestController
      public class DemoController {
          @Autowired
          private ChatService chatService;
          @PostMapping("/push")
          public ResponseEntity pushToWeb(@RequestBody Chat chat) throws IOException {
              chat.setCreateTime(LocalDateTime.now());
              chatService.save(chat);
              WebSocketService.sendInfo(chat);
              return ResponseEntity.ok("MSG SEND SUCCESS");
          }
          @GetMapping("/close")
          public String close(String userId) {
              WebSocketService.close(userId);
              return "ok";
          }
          @GetMapping("/getOnlineUser")
          public Map getOnlineUser() {
              return WebSocketService.getOnlineUser();
          }
          @GetMapping("/getMessage")
          public ResponseEntity> getMessage(String userId) {
              QueryWrapper queryWrapper = new QueryWrapper();
              List list = chatService.
                      list(queryWrapper.lambda().eq(Chat::getTargetUserId, userId).or().eq(Chat::getUserId, userId));
              return ResponseEntity.ok(list);
          }
      }
      

      7.编写netty配置

      package com.example.demo.config;
      import io.netty.bootstrap.ServerBootstrap;
      import io.netty.channel.*;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.SocketChannel;
      import io.netty.channel.socket.nio.NioServerSocketChannel;
      import io.netty.handler.codec.http.HttpObjectAggregator;
      import io.netty.handler.codec.http.HttpServerCodec;
      import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
      import io.netty.handler.stream.ChunkedWriteHandler;
      import org.springframework.stereotype.Component;
      public class NettyServer {
          public void start(){
              //创建两个线程组boosGroup和workerGroup,含有的子线程NioEventLoop的个数默认为cpu核数的两倍
              //boosGroup只是处理链接请求,真正的和客户端业务处理,会交给workerGroup完成
              EventLoopGroup boosGroup = new NioEventLoopGroup();
              EventLoopGroup workerGroup = new NioEventLoopGroup();
              try {
                  //创建服务器的启动对象
                  ServerBootstrap bootstrap = new ServerBootstrap();
                  //使用链式编程来配置参数
                  //设置两个线程组
                  bootstrap.group(boosGroup,workerGroup)
                          //使用NioSctpServerChannel作为服务器的通道实现
                          .channel(NioServerSocketChannel.class)
                          //初始化服务器链接队列大小,服务端处理客户端链接请求是顺序处理的,所以同一时间只能处理一个客户端链接
                          //多个客户端同时来的时候,服务端将不能处理的客户端链接请求放在队列中等待处理
                          .option(ChannelOption.SO_BACKLOG,1024)
                          //创建通道初始化对象,设置初始化参数
                          .childHandler(new ChannelInitializer() {
                              @Override
                              protected void initChannel(SocketChannel ch) throws Exception {
                                  System.out.println("收到到新的链接");
                                  //websocket协议本身是基于http协议的,所以这边也要使用http解编码器
                                  ch.pipeline().addLast(new HttpServerCodec());
                                  //以块的方式来写的处理器
                                  ch.pipeline().addLast(new ChunkedWriteHandler());
                                  ch.pipeline().addLast(new HttpObjectAggregator(8192));
                                  ch.pipeline().addLast(new MessageHandler());//添加测试的聊天消息处理类
                                  ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536 * 10));
                              }
                          });
                  System.out.println("netty server start..");
                  //绑定一个端口并且同步,生成一个ChannelFuture异步对象,通过isDone()等方法判断异步事件的执行情况
                  //启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
                  ChannelFuture cf = bootstrap.bind(1245).sync();
                  //给cf注册监听器,监听我们关心的事件
                  cf.addListener(new ChannelFutureListener() {
                      @Override
                      public void operationComplete(ChannelFuture channelFuture) throws Exception {
                          if (cf.isSuccess()){
                              System.out.println("监听端口成功");
                          }else {
                              System.out.println("监听端口失败");
                          }
                      }
                  });
                  //对通道关闭进行监听,closeFuture是异步操作,监听通道关闭
                  //通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭
                  cf.channel().closeFuture().sync();
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }finally {
                  boosGroup.shutdownGracefully();
                  workerGroup.shutdownGracefully();
              }
          }
      }
      

      8.前端代码Websocket聊天功能

      	if (!window.WebSocket) {
      							window.WebSocket = window.MozWebSocket;
      						}
      						if (window.WebSocket) {
      							me.websocket = new WebSocket(me.ws + me.info.id);
      							me.websocket.onmessage = function(event) {
      								var json = JSON.parse(event.data);
      								me.msgListMethod();
      								console.log(json);
      							};
      							console.log(me.websocket)
      							me.websocket.onopen = function(event) {
      								console.log("Netty-WebSocket服务器。。。。。。连接");
      							};
      							me.websocket.onerror = function(evt) {
      								console.log('发生错误..., evt');
      							};
      							me.websocket.CONNECTING = function(evt) {
      								console.log('正在链接中');
      							};
      						} else {
      							alert("您的浏览器不支持WebSocket协议!");
      						}
      						//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
      						window.onbeforeunload = function() {
      							if (me.websocket != null) {
      								me.websocket.close();
      							}
      						};
      

      这里用到了很多消息发送功能,比如文件、图片。群聊还可查看群成员功能