该文章提供客户端代码,如需服务器端代码,请看下篇文章,基于SpringBoot项目编写的。
支持运行项目时自动启动netty,支持断线无限重连,只需要修改配置文件中的IP和端口即可使用,可以直接复制代码,解码处理器需要自己编写逻辑,当然也可以使用提供的解码器,详细见下文。
没有提供Controller,要是需要,可以自己新建一个Controller,再ClientBoot类中写一个sendMsg()方法,方法中调用connect()方法,然后在你的Controller里注入ClientBoot,调用sendMsg()即可。
//客户端启动器 @Slf4j @Component public class ClientStarter { @Resource private NettyConfig nettyConfig; public void bootstrap() { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(new NioEventLoopGroup(nettyConfig.getWorker())) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyConfig.getTimeout()) .option(ChannelOption.SO_KEEPALIVE, true) // 避免意外断开 .channel(NioSocketChannel.class) // 指定通道 .handler(new ClientHandler()); // 指定处理器 //连接服务器 try { ClientBoot clientBoot = new ClientBoot(); clientBoot.connect(bootstrap, nettyConfig); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
//客户端链接 @Component @Slf4j public class ClientBoot { public void connect(Bootstrap bootstrap, NettyConfig nettyConfig) throws InterruptedException { // 连接 netty ChannelFuture future = bootstrap.connect(nettyConfig.getHost(), nettyConfig.getPort()); //连接失败无限重连,直到连接成功为止,重连时间为5秒/次 future.addListener((ChannelFutureListener) channelFuture -> { if (!channelFuture.isSuccess()) { log.info("连接失败,尝试重新连接!"); // 在连接失败后,5秒后尝试重新连接 channelFuture.channel().eventLoop().schedule(() -> { try { connect(bootstrap, nettyConfig); } catch (InterruptedException e) { throw new RuntimeException(e); } }, nettyConfig.getReconnect(), TimeUnit.SECONDS); } else { log.info("客户端连接服务器成功!"); } }); //给关闭通道进行监听 Channel channel = future.channel(); channel.closeFuture().sync(); } }
这个类中的解码器和消息处理器是你主要写的地方,解码器需要根据自己的业务进行编写,也可以使用提供好的解码器,当然还可以自行添加一些其他的Handler
netty 提供的解码器
DelimiterBasedFrameDecoder 解决TCP的粘包解码器 StringDecoder 消息转成String解码器 LineBasedFrameDecoder 自动完成标识符分隔解码器 FixedLengthFrameDecoder 固定长度解码器,二进制 Base64Decoder base64 解码器 对于 netty的数据传递都是ByteBuf,我们一般重写以上的解码器、编码器来实现自己的逻辑
//客户端处理器 public class ClientHandler extends ChannelInitializer{ private static final NettyConfig nettyConfig = ApplicationContextHelperUtil.getBean(NettyConfig.class); @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { ChannelPipeline pipeline = nioSocketChannel.pipeline(); //心跳检测处理器 //IdleStateHandler参数说明: 读空闲时间,写空闲时间,全部空闲时间,时间单位(默认秒) pipeline.addLast(new IdleStateHandler(nettyConfig.getReadTime(), nettyConfig.getWriteTime(), 0, TimeUnit.SECONDS)); pipeline.addLast(new AnalyzeMessageHandler());//自定义解码器 pipeline.addLast(new MonitorMessageHandler());//客户端消息处理器 } }
io.netty netty-all4.1.39.Final cn.hutool hutool-all5.5.4 com.google.code.gson gson2.8.5 org.springframework.boot spring-boot-starter-aoporg.apache.httpcomponents httpclient4.5.12 com.alibaba fastjson1.2.75 joda-time joda-time2.10.1 com.alibaba easyexcel2.2.10 com.google.guava guava30.1-jre org.apache.logging.log4j log4j-api2.17.0 org.springframework.boot spring-boot-starter-weblog4j-to-slf4j org.apache.logging.log4j
@SpringBootApplication public class NettyClientApplication { @Resource private ClientStarter clientStarter; public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(NettyClientApplication.class, args); try { InetAddress ip = Inet4Address.getLocalHost(); System.out.println("当前IP地址==>>:" + ip.getHostAddress()); } catch (UnknownHostException e) { e.printStackTrace(); } System.out.println("(♥◠‿◠)ノ゙ netty客户端启动成功 ლ(´ڡ`ლ)゙ "); NettyClientApplication application = context.getBean(NettyClientApplication.class); application.runClient(); } public void runClient() { // 异步启动 Netty Executors.newSingleThreadExecutor().execute(clientStarter::bootstrap); } }
# yml配置netty netty: client: boss: 1 # boss线程数量 默认为cpu线程数*2 负责 ServerSocketChannel 上的 accept 事件 worker: 4 # worker线程数量 默认为cpu线程数*2 负责 socketChannel 上的读写 timeout: 100000 # 连接超时时间(毫秒) port: 6999 # 服务器主端口 默认6999 host: 127.0.0.1 # 服务器地址 127.0.0.1 writeTime: 2 # 客户端写入时间 2秒 目前默认发送心跳用 readTime: 900 # 客户端读取时间 15分钟 900秒 reconnect: 5 # 重新连接时间 5秒
//Netty属性配置 @Data @Configuration public class NettyConfig { /** * boss线程数量 默认为cpu线程数*2 * 负责 ServerSocketChannel 上的 accept 事件 */ @Value("${netty.client.boss}") private Integer boss; /** * worker线程数量 默认为cpu线程数*2 * 负责 socketChannel 上的读写 */ @Value("${netty.client.worker}") private Integer worker; /** * 连接超时时间 默认为30s */ @Value("${netty.client.timeout}") private Integer timeout; /** * 服务器主端口 默认6999 */ @Value("${netty.client.port}") private Integer port; /** * 服务器地址 默认为本地 */ @Value("${netty.client.host}") private String host; /** * 客户端写入时间 2秒 * 目前默认发送心跳 */ @Value("${netty.client.writeTime}") private Integer writeTime; /** * 客户端读取时间 15分钟 */ @Value("${netty.client.readTime}") private Integer readTime; /** * 重新连接时间 5秒 */ @Value("${netty.client.reconnect}") private Integer reconnect; }
//接收到服务器的报文并解析 @Slf4j public class AnalyzeMessageHandler extends ByteToMessageDecoder { private static final NettyServiceImpl nettyService = ApplicationContextHelperUtil.getBean(NettyServiceImpl.class); @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List