0
点赞
收藏
分享

微信扫一扫

Netty多协议(多数据结构)的解决方法一:编写通用的编码、解码器进行传统数据包传输

海滨公园 2022-01-27 阅读 24

文章目录

一、目标

  服务端实现同一端口监听两种协议(数据包)的通讯:一个是UserInfo,一个是OrderInfo。

二、实现

1、思路

定义协议;定义自定义帧编码、解码器;用根据包头部标识动态选择解码器。

2、定义协议

这里很简单地定义了协议:头标识的长度+头标识+数据长度+数据内容
header-length(1byte)+header(…不能大于127bytes)+data-length(4bytes)+data内容

3、通用编码器

编码器在encode方法里面就按协议协定逐步写入各项数据

/**
 * Created by rocklee on 2022/1/25 15:29
 */
public class CustomFrameEncoder<T> extends MessageToByteEncoder<T> {
  private byte[] header;
  private Function<T,byte[]> converter;

  public byte[] getHeader() {
    return header;
  }

  public CustomFrameEncoder<T> setHeader(byte[] header) {
    this.header = header;
    return  this;
  }

  public Function<T, byte[]> getConverter() {
    return converter;
  }

  public CustomFrameEncoder(Class<? extends T> outboundMessageType, byte[] header, Function<T, byte[]> converter) {
    super(outboundMessageType);
    this.header = header;
    this.converter = converter;
  }

  public CustomFrameEncoder() {
  }

  public CustomFrameEncoder<T> setConverter(Function<T, byte[]> converter) {
    this.converter = converter;
    return this;
  }

  @Override
  protected void encode(ChannelHandlerContext ctx, T msg, ByteBuf out) throws Exception {
      out.writeByte(header.length);
      out.writeBytes(header);
      byte[] bytes=converter.apply(msg);
      out.writeInt(bytes.length);
      out.writeBytes(bytes);
  }
}

4、通用解码器

/**
 * Created by rocklee on 2022/1/25 14:40
 * 自定义帧:header-length(byte)+header(byte[])+data-length(int)+data(byte[]);
 */
public class CustomFrameDecoder<T> extends ByteToMessageDecoder {
  private byte[] header;
  private Function<byte[],T> onCreateObject;
  private boolean throwExceptionOnInvalidFormat=false;
  public CustomFrameDecoder(byte[] header, Function<byte[],T> onCreateObject) {
    this.header = header;
    this.onCreateObject=onCreateObject;
  }
  public CustomFrameDecoder setThrowExceptionOnInvalidFormat(boolean throwExceptionOnInvalidFormat){
    this.throwExceptionOnInvalidFormat=throwExceptionOnInvalidFormat;
    return this;
  }
  @Override
  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
     for (;;) {
       int byteSize = in.readableBytes(); //这次有多少
       if (byteSize < 7) {//不足以形成一帧,则退出
         break;
       } else {
         //尝试看这篮子里面的数据够不够一帧
         in.markReaderIndex();
         byte headerSize = in.readByte();
         if (headerSize != header.length) {
           if (throwExceptionOnInvalidFormat) {
             throw new Exception("Invalid frame format");
           }
           else{
             in.resetReaderIndex();
             ctx.fireChannelReadComplete();
             return;
           }
         }
         byte[] inHeader = new byte[headerSize];
         in.readBytes(inHeader);
         if (!Arrays.equals(inHeader, header)) {
           if (throwExceptionOnInvalidFormat) {
             throw new Exception("Invalid frame format");
           }
           else{
             in.resetReaderIndex();
             ctx.fireChannelReadComplete();
             return;
           }
         }
         int dataLength = in.readInt();
         if (in.readableBytes() <  dataLength) {//不够一帧
           in.resetReaderIndex();
           ctx.fireChannelReadComplete();
           break;
         } else {
           byte[] data=new byte[dataLength];
           in.readBytes(data);
           T obj=onCreateObject.apply(data);
           out.add(obj);
           break;
         }

       }
     }

  }
}

5、抽象编码选择器

/**
 * 编码选择器
 * Created by rocklee on 2022/1/26 13:44
 */
public abstract class AbstractCodeSelector extends ByteToMessageDecoder {

  public abstract ChannelInboundHandlerAdapter determineDecode(ByteBuf in);
  public abstract void addHandler(ChannelPipeline pipeline, ChannelInboundHandlerAdapter channelInboundHandlerAdapter);
  @Override
  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    ChannelInboundHandlerAdapter channelInboundHandlerAdapter=determineDecode(in);
    addHandler(ctx.pipeline(),channelInboundHandlerAdapter);
    if (channelInboundHandlerAdapter!=null){
      ctx.pipeline().remove(this);
    }

  }
}

6、编码选择器

我们基于上面这个抽象类来个实现,在determineDecode里面根据头标识判断数据包是UserInfo还是OrderInfo,然后再动态返回相应的解码器实例

/**
 * Created by rocklee on 2022/1/26 13:57
 */
public class CustomerCodeSelector extends AbstractCodeSelector {
  @Override
  public ChannelInboundHandlerAdapter determineDecode(ByteBuf in) {
    int byteSize = in.readableBytes(); //这次有多少
    if (byteSize < 7) {//不足以形成一帧,则退出
      return null;
    } else {
      //尝试看这篮子里面的数据够不够一帧
      in.markReaderIndex();
      try {
        byte headerSize = in.readByte();
        if (headerSize != CodeConsts.UserHeader.length) {
          return null;
        }
        byte[] inHeader = new byte[headerSize];
        in.readBytes(inHeader);
        if (Arrays.equals(inHeader, CodeConsts.UserHeader)) {
          return new CustomFrameDecoder<UserInfo>(CodeConsts.UserHeader, b -> Utils.fromJsonBytes(b, UserInfo.class));
        } else if (Arrays.equals(inHeader, CodeConsts.OrderHeader)) {
          return new CustomFrameDecoder<OrderInfo>(CodeConsts.OrderHeader, b -> Utils.fromJsonBytes(b, OrderInfo.class));
        }
      }
      finally {
        in.resetReaderIndex();
      }
      return null;
    }
  }

  @Override
  public void addHandler(ChannelPipeline pipeline, ChannelInboundHandlerAdapter channelInboundHandlerAdapter) {
      if (channelInboundHandlerAdapter!=null) {
        pipeline.addLast(channelInboundHandlerAdapter);
      }
  }
}

7、实体类

**
 * Created by rocklee on 2022/1/25 15:46
 */
@AllArgsConstructor
@NoArgsConstructor
@Data
@ToString
public class UserInfo implements Serializable {
  private String userId;
  private String userName;
  private int age;

}
@NoArgsConstructor
@AllArgsConstructor
@ToString
@Data
public class OrderInfo {
  private String userId;
  private int Orderid;
}

8、建立Netty服务端

pipeline加入两编码器和一动态编码选择器(动态选择解码)

public class CustomCodeServerTest {
  public static void main(String[] args){
    IGeneralServer server= new GeneralNettyServerFactory().getGeneralServer(9900);
    try{
      server.getServerBootstrap().option(ChannelOption.SO_BACKLOG, 128) // tcp最大缓存链接个数
      .childOption(ChannelOption.SO_KEEPALIVE, true);
      server.run(new Consumer<SocketChannel>() {
        @Override
        public void accept(SocketChannel ch) {
          ChannelPipeline pipeline = ch.pipeline();
          // 添加用于处理粘包和拆包问题的处理器
          pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4))
                  .addLast(new LengthFieldPrepender(4))
                  .addLast("orderEncoder",new CustomFrameEncoder<OrderInfo>(OrderInfo.class,CodeConsts.OrderHeader,o->Utils.toJsonBytes(o)))
                  .addLast("userEncoder",new CustomFrameEncoder<UserInfo>(UserInfo.class,CodeConsts.UserHeader,o->Utils.toJsonBytes(o)))
                  .addLast(new CustomerCodeSelector(){
                    @Override
                    public void addHandler(ChannelPipeline pipeline, ChannelInboundHandlerAdapter channelInboundHandlerAdapter) {
                       super.addHandler(pipeline,channelInboundHandlerAdapter);
                       pipeline.addLast(new SimpleChannelInboundHandler() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                          super.channelActive(ctx);
                          System.out.println("Connected:"+ctx.channel().remoteAddress());
                        }
                        @Override
                        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                          super.channelInactive(ctx);
                          System.out.println("DisConnected:"+ctx.channel().remoteAddress());
                        }
                        @Override
                        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws
                                Exception {
                          System.out.println(cause.getMessage());
                          ctx.close();
                        }
                        @Override
                        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
                          if (msg instanceof UserInfo) {
                            //System.out.println("Client<<<:"+msg.toString());
                            UserInfo userInfo=(UserInfo)msg;
                            userInfo.setUserName(userInfo.getUserName() + ",srv");
                            ctx.channel().writeAndFlush(msg);
                            if (userInfo.getUserId().equalsIgnoreCase("B001")) {
                              ctx.close();
                            }
                          }
                          else if (msg instanceof OrderInfo){
                            OrderInfo orderInfo=(OrderInfo) msg;
                            ctx.channel().writeAndFlush(orderInfo);
                            if (orderInfo.getUserId().equalsIgnoreCase("O999")){
                              ctx.close();
                            }
                          }
                        }
                      });
                    }
                  });           
        }
      });
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      server.close();
    }
  }
}

9、建立Netty UserInfo客户端传输


 * Created by rocklee on 2022/1/25 15:50
 */
public class CustomCodeClientUserInfoTest {

  public static void main(String[] args) {
    IGeneralClient client = new GeneralNettyClientFactory().getClient("localhost", 9900);
    try {
      client.run(false, ch -> {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
        pipeline.addLast(new LengthFieldPrepender(4))
        .addLast("userDecoder",new CustomFrameDecoder<UserInfo>(CodeConsts.UserHeader, b -> Utils.fromJsonBytes(b, UserInfo.class)))
                .addLast("userEncoder",new CustomFrameEncoder<UserInfo>(UserInfo.class,CodeConsts.UserHeader,o->Utils.toJsonBytes(o)))
                  .addLast(new SimpleChannelInboundHandler() {
                  @Override
                  protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
                    System.out.println("Server<<" + msg.toString());
                  }
                });
      });
      Channel channel=client.getChannel();
      for (int i=1;i<=200;i++) {
        channel.writeAndFlush(new UserInfo("A00"+i, "陳大文", 20));
        Thread.sleep(200);
      }
      channel.writeAndFlush(new UserInfo("B001", "陳大文", 20)).sync();
      channel.closeFuture().sync();
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      client.close();
    }
  }
}

10、建立Netty OrderInfo客户端传输

/**
 * Created by rocklee on 2022/1/25 15:50
 */
public class CustomCodeClientOrderInfoTest {

  public static void main(String[] args) {
    IGeneralClient client = new GeneralNettyClientFactory().getClient("localhost", 9900);
    try {
      client.run(false, ch -> {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
        pipeline.addLast(new LengthFieldPrepender(4))
        .addLast("orderDecoder",new CustomFrameDecoder<OrderInfo>(CodeConsts.OrderHeader,b->Utils.fromJsonBytes(b,OrderInfo.class)))
                .addLast("orderEncoder",new CustomFrameEncoder<OrderInfo>(OrderInfo.class,CodeConsts.OrderHeader,o->Utils.toJsonBytes(o)))
                .addLast(new SimpleChannelInboundHandler() {
                  @Override
                  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                    super.userEventTriggered(ctx,evt);
                    System.out.println(evt);
                  }
                  @Override
                  protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
                    System.out.println("ord:Server<<" + msg.toString());
                  }
                });
      });
      Channel channel=client.getChannel();
      for (int i=1;i<=200;i++) {
        channel.writeAndFlush(new OrderInfo("O00"+i, i));
        Thread.sleep(200);
      }
      channel.writeAndFlush(new OrderInfo("O999", 20)).sync();
      channel.closeFuture().sync();
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      client.close();
    }
  }
}

为了通用性,CustomFrameEncoder/CustomFrameDecoder对实体的序列化、反序列化框架里面没有实现,通过接口让用户自行实现,这里用的jackson .

运行结果
在这里插入图片描述

在这里插入图片描述

举报

相关推荐

0 条评论