文章目录
一、目标
服务端实现同一端口监听两种协议(数据包)的通讯:一个是UserInfo,一个是OrderInfo。
二、实现
1、思路
定义协议;定义自定义帧编码、解码器;用根据包头部标识动态选择解码器。
2、定义协议
这里很简单地定义了协议:头标识的长度+头标识+数据长度+数据内容
header-length(1byte)+header(…不能大于127bytes)+data-length(4bytes)+data内容
3、通用编码器
编码器在encode方法里面就按协议协定逐步写入各项数据
/**
* Created by rocklee on 2022/1/25 15:29
*/
public class CustomFrameEncoder<T> extends MessageToByteEncoder<T> {
private byte[] header;
private Function<T,byte[]> converter;
public byte[] getHeader() {
return header;
}
public CustomFrameEncoder<T> setHeader(byte[] header) {
this.header = header;
return this;
}
public Function<T, byte[]> getConverter() {
return converter;
}
public CustomFrameEncoder(Class<? extends T> outboundMessageType, byte[] header, Function<T, byte[]> converter) {
super(outboundMessageType);
this.header = header;
this.converter = converter;
}
public CustomFrameEncoder() {
}
public CustomFrameEncoder<T> setConverter(Function<T, byte[]> converter) {
this.converter = converter;
return this;
}
@Override
protected void encode(ChannelHandlerContext ctx, T msg, ByteBuf out) throws Exception {
out.writeByte(header.length);
out.writeBytes(header);
byte[] bytes=converter.apply(msg);
out.writeInt(bytes.length);
out.writeBytes(bytes);
}
}
4、通用解码器
/**
* Created by rocklee on 2022/1/25 14:40
* 自定义帧:header-length(byte)+header(byte[])+data-length(int)+data(byte[]);
*/
public class CustomFrameDecoder<T> extends ByteToMessageDecoder {
private byte[] header;
private Function<byte[],T> onCreateObject;
private boolean throwExceptionOnInvalidFormat=false;
public CustomFrameDecoder(byte[] header, Function<byte[],T> onCreateObject) {
this.header = header;
this.onCreateObject=onCreateObject;
}
public CustomFrameDecoder setThrowExceptionOnInvalidFormat(boolean throwExceptionOnInvalidFormat){
this.throwExceptionOnInvalidFormat=throwExceptionOnInvalidFormat;
return this;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
for (;;) {
int byteSize = in.readableBytes(); //这次有多少
if (byteSize < 7) {//不足以形成一帧,则退出
break;
} else {
//尝试看这篮子里面的数据够不够一帧
in.markReaderIndex();
byte headerSize = in.readByte();
if (headerSize != header.length) {
if (throwExceptionOnInvalidFormat) {
throw new Exception("Invalid frame format");
}
else{
in.resetReaderIndex();
ctx.fireChannelReadComplete();
return;
}
}
byte[] inHeader = new byte[headerSize];
in.readBytes(inHeader);
if (!Arrays.equals(inHeader, header)) {
if (throwExceptionOnInvalidFormat) {
throw new Exception("Invalid frame format");
}
else{
in.resetReaderIndex();
ctx.fireChannelReadComplete();
return;
}
}
int dataLength = in.readInt();
if (in.readableBytes() < dataLength) {//不够一帧
in.resetReaderIndex();
ctx.fireChannelReadComplete();
break;
} else {
byte[] data=new byte[dataLength];
in.readBytes(data);
T obj=onCreateObject.apply(data);
out.add(obj);
break;
}
}
}
}
}
5、抽象编码选择器
/**
* 编码选择器
* Created by rocklee on 2022/1/26 13:44
*/
public abstract class AbstractCodeSelector extends ByteToMessageDecoder {
public abstract ChannelInboundHandlerAdapter determineDecode(ByteBuf in);
public abstract void addHandler(ChannelPipeline pipeline, ChannelInboundHandlerAdapter channelInboundHandlerAdapter);
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
ChannelInboundHandlerAdapter channelInboundHandlerAdapter=determineDecode(in);
addHandler(ctx.pipeline(),channelInboundHandlerAdapter);
if (channelInboundHandlerAdapter!=null){
ctx.pipeline().remove(this);
}
}
}
6、编码选择器
我们基于上面这个抽象类来个实现,在determineDecode里面根据头标识判断数据包是UserInfo还是OrderInfo,然后再动态返回相应的解码器实例
/**
* Created by rocklee on 2022/1/26 13:57
*/
public class CustomerCodeSelector extends AbstractCodeSelector {
@Override
public ChannelInboundHandlerAdapter determineDecode(ByteBuf in) {
int byteSize = in.readableBytes(); //这次有多少
if (byteSize < 7) {//不足以形成一帧,则退出
return null;
} else {
//尝试看这篮子里面的数据够不够一帧
in.markReaderIndex();
try {
byte headerSize = in.readByte();
if (headerSize != CodeConsts.UserHeader.length) {
return null;
}
byte[] inHeader = new byte[headerSize];
in.readBytes(inHeader);
if (Arrays.equals(inHeader, CodeConsts.UserHeader)) {
return new CustomFrameDecoder<UserInfo>(CodeConsts.UserHeader, b -> Utils.fromJsonBytes(b, UserInfo.class));
} else if (Arrays.equals(inHeader, CodeConsts.OrderHeader)) {
return new CustomFrameDecoder<OrderInfo>(CodeConsts.OrderHeader, b -> Utils.fromJsonBytes(b, OrderInfo.class));
}
}
finally {
in.resetReaderIndex();
}
return null;
}
}
@Override
public void addHandler(ChannelPipeline pipeline, ChannelInboundHandlerAdapter channelInboundHandlerAdapter) {
if (channelInboundHandlerAdapter!=null) {
pipeline.addLast(channelInboundHandlerAdapter);
}
}
}
7、实体类
**
* Created by rocklee on 2022/1/25 15:46
*/
@AllArgsConstructor
@NoArgsConstructor
@Data
@ToString
public class UserInfo implements Serializable {
private String userId;
private String userName;
private int age;
}
@NoArgsConstructor
@AllArgsConstructor
@ToString
@Data
public class OrderInfo {
private String userId;
private int Orderid;
}
8、建立Netty服务端
pipeline加入两编码器和一动态编码选择器(动态选择解码)
public class CustomCodeServerTest {
public static void main(String[] args){
IGeneralServer server= new GeneralNettyServerFactory().getGeneralServer(9900);
try{
server.getServerBootstrap().option(ChannelOption.SO_BACKLOG, 128) // tcp最大缓存链接个数
.childOption(ChannelOption.SO_KEEPALIVE, true);
server.run(new Consumer<SocketChannel>() {
@Override
public void accept(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 添加用于处理粘包和拆包问题的处理器
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4))
.addLast(new LengthFieldPrepender(4))
.addLast("orderEncoder",new CustomFrameEncoder<OrderInfo>(OrderInfo.class,CodeConsts.OrderHeader,o->Utils.toJsonBytes(o)))
.addLast("userEncoder",new CustomFrameEncoder<UserInfo>(UserInfo.class,CodeConsts.UserHeader,o->Utils.toJsonBytes(o)))
.addLast(new CustomerCodeSelector(){
@Override
public void addHandler(ChannelPipeline pipeline, ChannelInboundHandlerAdapter channelInboundHandlerAdapter) {
super.addHandler(pipeline,channelInboundHandlerAdapter);
pipeline.addLast(new SimpleChannelInboundHandler() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
System.out.println("Connected:"+ctx.channel().remoteAddress());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
System.out.println("DisConnected:"+ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws
Exception {
System.out.println(cause.getMessage());
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof UserInfo) {
//System.out.println("Client<<<:"+msg.toString());
UserInfo userInfo=(UserInfo)msg;
userInfo.setUserName(userInfo.getUserName() + ",srv");
ctx.channel().writeAndFlush(msg);
if (userInfo.getUserId().equalsIgnoreCase("B001")) {
ctx.close();
}
}
else if (msg instanceof OrderInfo){
OrderInfo orderInfo=(OrderInfo) msg;
ctx.channel().writeAndFlush(orderInfo);
if (orderInfo.getUserId().equalsIgnoreCase("O999")){
ctx.close();
}
}
}
});
}
});
}
});
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
server.close();
}
}
}
9、建立Netty UserInfo客户端传输
* Created by rocklee on 2022/1/25 15:50
*/
public class CustomCodeClientUserInfoTest {
public static void main(String[] args) {
IGeneralClient client = new GeneralNettyClientFactory().getClient("localhost", 9900);
try {
client.run(false, ch -> {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4))
.addLast("userDecoder",new CustomFrameDecoder<UserInfo>(CodeConsts.UserHeader, b -> Utils.fromJsonBytes(b, UserInfo.class)))
.addLast("userEncoder",new CustomFrameEncoder<UserInfo>(UserInfo.class,CodeConsts.UserHeader,o->Utils.toJsonBytes(o)))
.addLast(new SimpleChannelInboundHandler() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Server<<" + msg.toString());
}
});
});
Channel channel=client.getChannel();
for (int i=1;i<=200;i++) {
channel.writeAndFlush(new UserInfo("A00"+i, "陳大文", 20));
Thread.sleep(200);
}
channel.writeAndFlush(new UserInfo("B001", "陳大文", 20)).sync();
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
client.close();
}
}
}
10、建立Netty OrderInfo客户端传输
/**
* Created by rocklee on 2022/1/25 15:50
*/
public class CustomCodeClientOrderInfoTest {
public static void main(String[] args) {
IGeneralClient client = new GeneralNettyClientFactory().getClient("localhost", 9900);
try {
client.run(false, ch -> {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4))
.addLast("orderDecoder",new CustomFrameDecoder<OrderInfo>(CodeConsts.OrderHeader,b->Utils.fromJsonBytes(b,OrderInfo.class)))
.addLast("orderEncoder",new CustomFrameEncoder<OrderInfo>(OrderInfo.class,CodeConsts.OrderHeader,o->Utils.toJsonBytes(o)))
.addLast(new SimpleChannelInboundHandler() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx,evt);
System.out.println(evt);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("ord:Server<<" + msg.toString());
}
});
});
Channel channel=client.getChannel();
for (int i=1;i<=200;i++) {
channel.writeAndFlush(new OrderInfo("O00"+i, i));
Thread.sleep(200);
}
channel.writeAndFlush(new OrderInfo("O999", 20)).sync();
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
client.close();
}
}
}
为了通用性,CustomFrameEncoder/CustomFrameDecoder对实体的序列化、反序列化框架里面没有实现,通过接口让用户自行实现,这里用的jackson .
运行结果