0
点赞
收藏
分享

微信扫一扫

6.Netty框架TCP粘包拆包问题(2)

_阿瑶 2022-11-21 阅读 128


博客概述

本博客是上篇文章的续集,主要讲述netty实现定长数据包解码器的案例。

服务器端

package fixed;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

import java.nio.charset.Charset;

/**
* @Auther: ;李泽
* @Date: 2019/3/4 22:04
* @Description:
*/
public class NettyServer {
private int port;

public NettyServer(int port) {
this.port = port;
}

public void run() throws Exception {
/*
bossGroup负责接受网络连接以及建立连接
workerGroup负责具体的网络通讯
*/
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
/*
ServerBootstrap 是一个启动服务器端NIO服务的辅助启动类。
*/
ServerBootstrap b = new ServerBootstrap();
//给启动器配置两个线程组,前一个是接受连接,后一个具体干活
b.group(bossGroup, workerGroup)
//这里我们指定使用NioServerSocketChannel类来说明服务器端接受的tcp连接是nio模式的。
.channel(NioServerSocketChannel.class)
/*
这里的事件处理类经常会被用来处理一个最近的已经接收的Channel。ChannelInitializer是一个特殊的处理类,
他的目的是帮助使用者配置一个新的Channel。也许你想通过增加一些处理类比如ServerHandler来配置一
个新的Channel或者其对应的ChannelPipeline来实现你的网络程序。当你的程序变的复杂时,可能你会增加更多
的处理类到pipline上,然后提取这些匿名类到最顶层的类上。
*/
.childHandler(new ChannelInitializer <SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {

ch.pipeline().addLast(new FixedLengthFrameDecoder(5));
//设置字符串解码器,将buf转化为字符串,默认是u8。handler接收到的就是string类型的
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
//最后的业务逻辑handler
ch.pipeline().addLast(new ServerHandler());
}
})

.option(ChannelOption.SO_BACKLOG, 128) //tcp缓冲区
/*
保持连接不断开
*/
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.SO_SNDBUF,32*1024) //设置发送缓冲的大小。
.option(ChannelOption.SO_RCVBUF,32*1024); //设置接受数据缓冲的大小。

// 绑定并且启动服务来接受到来的请求
ChannelFuture f = b.bind(port).sync(); //

// 起到的作用就是阻塞在这不让server关闭。
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new NettyServer(port).run();
}
}

服务器端handler

package fixed;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;


/**
* @Auther: ;李泽
* @Date: 2019/3/4 22:05
* @Description: 继承自ChannelHandlerAdapter,这个类实现了ChannelHandler接口,ChannelHandler提供了许多事件处理
* 的接口方法,然后你可以覆盖这些方法。现在仅仅只需要继承ChannelHandlerAdapter类而不是你自己去实
* 现接口方法。
*/
public class ServerHandler extends ChannelHandlerAdapter {
/**
* 功能描述: exceptionCaught()事件处理方法是当出现Throwable对象才会被调用,即当Netty由于IO错误或者处理器在处理事件
* 时抛出的异常时。在大部分情况下,捕获的异常应该被记录下来并且把关联的channel给关闭掉。然而这个方法的处
* 理方式会在遇到不同异常的情况下有不同的实现,比如你可能想在关闭连接之前发送一个错误码的响应消息。
*
* @auther: 李泽
* @date: 2019/3/4 22:24
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
/**
* 功能描述: 这里我们覆盖了chanelRead()事件处理方法。每当从客户端收到新的数据时,这个方法会在收到消息时被调用,
* 这个例子中,收到的消息的类型是ByteBuf
*
* @auther: 李泽
* @date: 2019/3/4 22:23
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//super.channelRead(ctx, msg); 不注释掉会报错
try {
//获得发送过来的信息,直接强转为string。
String req = (String) msg;
//打印输出
System.out.println("req = " + req);
//给个反馈,遵循echo响应协议,发啥反馈啥,
ctx.writeAndFlush(Unpooled.copiedBuffer(req.getBytes()));
//不需要释放,因为冲刷之后,自动释放了
} finally {
//如果上面的业务出现了反馈的代码,就不需要释放空间,冲刷了之后,就自动释放了
ReferenceCountUtil.release(msg);
}
}
}

客户端

package fixed;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;

/**
* @Auther: ;李泽
* @Date: 2019/3/4 22:05
* @Description:
*/
public class NettyClient {
public static void main(String[] args) throws InterruptedException, UnsupportedEncodingException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(5));
//设置字符串解码器,将buf转化为字符串,默认是u8。handler接收到的就是string类型的
socketChannel.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
//这里面的是接受服务器端反馈时,才会触发的handler
socketChannel.pipeline().addLast(new ClientHandler());

}
});
ChannelFuture channelFuture1 = bootstrap.connect("127.0.0.1",8080).sync();
//客户端发送数据,借用netty的buffer转化工具
channelFuture1.channel().write(Unpooled.copiedBuffer("aaaaaaaaa".getBytes("utf-8")));
channelFuture1.channel().write(Unpooled.copiedBuffer("bbbbbbbbbb".getBytes("utf-8")));
channelFuture1.channel().write(Unpooled.copiedBuffer("asdfsdf".getBytes("utf-8")));
channelFuture1.channel().flush();
//可以理解为阻塞在这
channelFuture1.channel().closeFuture().sync();
eventLoopGroup.shutdownGracefully();
}
}

客户端handler

package fixed;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

/**
* @Auther: ;李泽
* @Date: 2019/3/4 22:05
* @Description:
*/
public class ClientHandler extends ChannelHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
cause.printStackTrace();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//super.channelRead(ctx, msg);
try {
//获得反馈过来的信息,已经直接转化为string
String resp = (String)msg;
System.out.println("resp = " + resp);
} finally {
ReferenceCountUtil.release(msg);
}
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//super.channelActive(ctx);
System.out.println("channel active...");
}
}


举报

相关推荐

0 条评论