Netty官网:Netty: Home
前面我们介绍了Netty的基本用法以及基本知识,但是在我们的实际开发中要用到SpringBoot,下面我们来看看SpringBoot的整合与简单的文件传输吧
🌈🌈依赖
org.springframework.boot spring-boot-starter-web org.projectlombok lombok 1.18.24 io.netty netty-all 4.1.65.Final org.apache.logging.log4j log4j-core 2.17.0 org.apache.logging.log4j log4j-api 2.17.0
🌈🌈yaml配置
server: port:8080 netty: host: 127.0.0.1 port: 7397
package com.shu; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; /** * @description: * @author: shu * @createDate: 2023/5/7 10:27 * @version: 1.0 */ @Component("nettyServer") public class NettyServer { private Logger logger = LoggerFactory.getLogger(NettyServer.class); private final EventLoopGroup parentGroup = new NioEventLoopGroup(); private final EventLoopGroup childGroup = new NioEventLoopGroup(); private Channel channel; /** * 绑定端口 * @param address * @return */ public ChannelFuture bind(InetSocketAddress address) { ChannelFuture channelFuture = null; try { ServerBootstrap b = new ServerBootstrap(); b.group(parentGroup, childGroup) .channel(NioServerSocketChannel.class) //非阻塞模式 .option(ChannelOption.SO_BACKLOG, 128) .childHandler(new MyChannelInitializer()); channelFuture = b.bind(address).syncUninterruptibly(); channel = channelFuture.channel(); } catch (Exception e) { logger.error(e.getMessage()); } finally { if (null != channelFuture && channelFuture.isSuccess()) { logger.info("netty server start done."); } else { logger.error("netty server start error."); } } return channelFuture; } /** * 销毁 */ public void destroy() { if (null == channel) return; channel.close(); parentGroup.shutdownGracefully(); childGroup.shutdownGracefully(); } /** * 获取通道 * @return */ public Channel getChannel() { return channel; } }
package com.shu; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import java.nio.charset.Charset; /** * @description: * @author: shu * @createDate: 2023/5/7 10:31 * @version: 1.0 */ public class MyChannelInitializer extends ChannelInitializer{ /** * 初始化channel * @param channel * @throws Exception */ @Override protected void initChannel(SocketChannel channel) throws Exception { // 日志打印 channel.pipeline().addLast(new LoggingHandler(LogLevel.INFO)); // 基于换行符号 channel.pipeline().addLast(new LineBasedFrameDecoder(1024)); // 解码转String,注意调整自己的编码格式GBK、UTF-8 channel.pipeline().addLast(new StringDecoder(Charset.forName("GBK"))); // 解码转String,注意调整自己的编码格式GBK、UTF-8 channel.pipeline().addLast(new StringEncoder(Charset.forName("GBK"))); // 在管道中添加我们自己的接收数据实现方法 channel.pipeline().addLast(new MyServerHandler()); } }
package com.shu; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.socket.SocketChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; import java.util.Date; /** * @description: * @author: shu * @createDate: 2023/5/7 10:33 * @version: 1.0 */ public class MyServerHandler extends ChannelInboundHandlerAdapter { private Logger logger = LoggerFactory.getLogger(MyServerHandler.class); /** * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { SocketChannel channel = (SocketChannel) ctx.channel(); //通知客户端链接建立成功 String str = "通知客户端链接建立成功" + " " + new Date() + " " + channel.localAddress().getHostString() + "\r\n"; ctx.writeAndFlush(str); } /** * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { logger.info("客户端断开链接{}", ctx.channel().localAddress().toString()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //接收msg消息{与上一章节相比,此处已经不需要自己进行解码} logger.info(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 服务端接收到消息:" + msg); //通知客户端链消息发送成功 String str = "服务端收到:" + new Date() + " " + msg + "\r\n"; ctx.writeAndFlush(str); } /** * 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); logger.info("异常信息:\r\n" + cause.getMessage()); } }
package com.shu; import io.netty.channel.ChannelFuture; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ComponentScan; import java.net.InetSocketAddress; /** * @description: * @author: shu * @createDate: 2023/5/7 10:36 * @version: 1.0 */ @SpringBootApplication @ComponentScan("com.shu") public class NettyApplication implements CommandLineRunner { @Value("${netty.host}") private String host; @Value("${netty.port}") private int port; @Autowired private NettyServer nettyServer; public static void main(String[] args) { SpringApplication.run(NettyApplication.class, args); } @Override public void run(String... args) throws Exception { InetSocketAddress address = new InetSocketAddress(host, port); ChannelFuture channelFuture = nettyServer.bind(address); Runtime.getRuntime().addShutdownHook(new Thread(() -> nettyServer.destroy())); channelFuture.channel().closeFuture().syncUninterruptibly(); } }
import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import java.nio.charset.Charset; import java.text.SimpleDateFormat; import java.util.Date; /** * @description: * @author: shu * @createDate: 2023/5/7 10:41 * @version: 1.0 */ public class ApiTest { public static void main(String[] args) { EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel.class); b.option(ChannelOption.AUTO_READ, true); b.handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel channel) throws Exception { // 日志打印 channel.pipeline().addLast(new LoggingHandler(LogLevel.INFO)); // 基于换行符号 channel.pipeline().addLast(new LineBasedFrameDecoder(1024)); // 解码转String,注意调整自己的编码格式GBK、UTF-8 channel.pipeline().addLast(new StringDecoder(Charset.forName("GBK"))); // 解码转String,注意调整自己的编码格式GBK、UTF-8 channel.pipeline().addLast(new StringEncoder(Charset.forName("GBK"))); // 在管道中添加我们自己的接收数据实现方法 channel.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //接收msg消息{与上一章节相比,此处已经不需要自己进行解码} System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 客户端接收到消息:" + msg); } }); } }); ChannelFuture f = b.connect("127.0.0.1", 7397).sync(); System.out.println(" client start done"); //向服务端发送信息 f.channel().writeAndFlush("你好,SpringBoot启动的netty服务端,“我的结尾是一个换行符,用于传输半包粘包处理”\r\n"); f.channel().writeAndFlush("你好,SpringBoot启动的netty服务端,“我的结尾是一个换行符,用于传输半包粘包处理”\r\n"); f.channel().writeAndFlush("你好,SpringBoot启动的netty服务端,“我的结尾是一个换行符,用于传输半包粘包处理”\r\n"); f.channel().writeAndFlush("你好,SpringBoot启动的netty服务端,“我的结尾是一个换行符,用于传输半包粘包处理”\r\n"); f.channel().writeAndFlush("你好,SpringBoot启动的netty服务端,“我的结尾是一个换行符,用于传输半包粘包处理”\r\n"); f.channel().closeFuture().syncUninterruptibly(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); } } }
我们仔细来观察一下这个日志打印,并注意一下这些字节的意思
客户端
服务端
看到这里我就很疑惑他到底是如何把我们的文件转换成16进制的?
通过代码我们知道我们传递的文字:通知客户端链接建立成功 Mon May 08 20:30:04 CST 2023 127.0.0.1 但是他是如何转换成16进制的,下面我们来具体分析一下?
前提知识?
分析
这里我们采用的是GBK编码,那我们咋看懂这些字节表示的啥,这里可能还需要一些基本知识进制的转成,1字节8bit ,有了基础知识我们再来分析,如上面说中文GBK两个字节表示一个中文,那么通这个中文对应的两个字节就是cd a8,那咋验证?
参考网站:GB2312简体中文编码表 - 常用参考表对照表 - 脚本之家在线工具
我们可以发现真是CDA8代表汉字通,这也解释我心中的疑惑,这里这是汉字,下面我们看一下字母
字母分析?
当用GBK解码时,若高字节最高位为0,则用ASC||码表解码;若高字节最高位为1,则用GBK编码表解码?
那啥是高字节与低字节?
存放最低的8位有效位的字节被称为最低有效位字节或低位字节,而存放最高的8位有效位的字节被称为最高有效位字节或高位字节。
高位字节 低位字节
↓------------------------------↓ ↓-----------------------------↓
15 14 13 12 11 10 9. 8. 7. 6. 5. 4. 3. 2. 1. 0.
参考链接:ASCII码一览表,ASCII码对照表
我们可以通过Ascll码表可以发现空格对应16进制的20
而M的十六进制:4D
相信进过上面的理解,应该这一看懂这段报文的理解,这也为了自己看懂协议有了会很好的铺垫,下面我们继续看服务端与客户端的连接过程
文件传输在我们的实际开发中中非常常见,下面我们来个简单的案例
主要必须实现序列化
package com.shu.file01; import lombok.Data; import java.io.File; import java.io.Serializable; /** * @description: * @author: shu * @createDate: 2023/5/7 20:28 * @version: 1.0 */ @Data public class FileResponse implements Serializable { private long length; private File file; public FileResponse() { } public FileResponse(long length, File file) { this.length = length; this.file = file; } public long getLength() { return length; } public File getFile() { return file; } }
package com.shu.file01; import lombok.Data; import java.io.Serializable; /** * @description: * @author: shu * @createDate: 2023/5/7 20:27 * @version: 1.0 */ @Data public class FileRequest implements Serializable { private String fileName; public FileRequest() { } public FileRequest(String fileName) { this.fileName = fileName; } public String getFileName() { return fileName; } }
package com.shu.file01; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; /** * @description: * @author: shu * @createDate: 2023/5/7 19:57 * @version: 1.0 */ @Component("fileServer") public class FileServer { private Logger logger = LoggerFactory.getLogger(FileServer.class); /** * 绑定端口 * * @param port * @return */ public void bind(int port) { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new ObjectEncoder()) .addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(null))) .addLast(new FileServerHandler()); } }); ChannelFuture future = bootstrap.bind(port).sync(); future.channel().closeFuture().sync(); System.out.println("服务端启动成功,端口:" + port); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
package com.shu.file01; import io.netty.channel.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.RandomAccessFile; /** * @description: * @author: shu * @createDate: 2023/5/7 20:19 * @version: 1.0 */ public class FileServerHandler extends ChannelInboundHandlerAdapter { private static final String FILE_PATH = "D:\\coreconfig.txt"; private Logger logger = LoggerFactory.getLogger(FileServerHandler.class); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); logger.info("channelActive"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { logger.info("channelRead"); if (msg instanceof FileRequest) { FileRequest request = (FileRequest) msg; if (request.getFileName().equals(FILE_PATH)) { File file = new File(FILE_PATH); if (file.exists()) { RandomAccessFile raf = new RandomAccessFile(file, "r"); long length = raf.length(); FileResponse response = new FileResponse(length, file); ctx.writeAndFlush(response); ChannelFuture sendFileFuture = ctx.writeAndFlush(new DefaultFileRegion(raf.getChannel(), 0, length), ctx.newProgressivePromise()); sendFileFuture.addListener(new ChannelProgressiveFutureListener() { @Override public void operationComplete(ChannelProgressiveFuture future) throws Exception { System.out.println("File transfer completed."); raf.close(); } @Override public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) throws Exception { if (total < 0) { System.err.println("File transfer progress: " + progress); } else { System.err.println("File transfer progress: " + progress + " / " + total); } } }); } else { System.err.println("File not found: " + FILE_PATH); } } else { System.err.println("Invalid file name: " + request.getFileName()); } } } }
package com.shu.file01; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @description: * @author: shu * @createDate: 2023/5/7 20:42 * @version: 1.0 */ public class FileClient { private static final int PORT = 8080; private static final String HOST = "localhost"; private Logger logger = LoggerFactory.getLogger(FileClient.class); private final EventLoopGroup parentGroup = new NioEventLoopGroup(); /** * 连接服务端 */ public void connect() { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new ObjectEncoder()) .addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(null))) .addLast(new FileClientHandler()); } }); ChannelFuture future = bootstrap.connect(HOST, PORT).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { throw new RuntimeException(e); } finally { group.shutdownGracefully(); } } }
package com.shu.file01; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.io.File; import java.io.FileOutputStream; import java.nio.channels.FileChannel; /** * @description: * @author: shu * @createDate: 2023/5/7 20:47 * @version: 1.0 */ public class FileClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { FileRequest request = new FileRequest("D:\\coreconfig.txt"); ctx.writeAndFlush(request); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FileResponse) { FileResponse response = (FileResponse) msg; File file = response.getFile(); long fileLength = response.getLength(); FileOutputStream fos = new FileOutputStream(file); FileChannel channel = fos.getChannel(); // channel.transferFrom(channel, 0, fileLength); System.out.println("File " + file.getName() + " received."); } else { System.err.println("Invalid response type: " + msg.getClass()); } } }
这就是一个简单的测试吧,实际的开发中我们需要考虑很多,比如大文件的传输,断点续点,文件传输的加密等等系列问题,这个我正在研究中,以后再说