0
点赞
收藏
分享

微信扫一扫

Netty权威指南 第2版学习笔记7——MessagePack编解码及LengthFieldBasedFrameDecoder

MessagePack是一个高效的二进制序列化框架,像JSON一样支持不同语言间的数据交换,速度更快,序列化之后的码流更小

MessagePack特点

  • 编解码高效,性能高
  • 序列化后的码流小
  • 支持跨语言

其支持的语言有:
Java、Python、Ruby、Haskell、C#、OCaml、Lua、Go、C、C++等。

官网地址:
​​​http://msgpack.org/​​​
Git地址:
​​​https://github.com/msgpack/msgpack-java​​

MessagePack Java API 介绍

Maven引用的方式:

<dependencies>
...
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>${msgpack.version}</version>
</dependency>
...
</dependencies>

gradle引用

repositories {
mavenCentral()
}

dependencies {
compile 'org.msgpack:msgpack-core:(version)'
}

windows环境下编译安装过程

msgpack-java 使用sbt 作为编译工具。sbt的简单使用方法:
​​​http://xerial.org/blog/2014/03/24/sbt/​​

windows下使用方法:

  1. 下载​​文件包​​
  2. 拷贝到一个路径里
  3. 在PATH变量中把sbt.bat路径放进去
  4. 打开命令行,输入 sbt ,输出结果即可

编译过程见
​​​https://github.com/msgpack/msgpack-java​​

API 官方示例:

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.msgpack.MessagePack;
import org.msgpack.template.Templates;

public class TestMessagePack {

public static void main(String[] args) {
// Create serialize objects
List<String> src=new ArrayList<String>();
src.add("msgpack");
src.add("kumofs");
src.add("viver");
MessagePack msgpack=new MessagePack();
// Serialize
byte[] raw;
try {
raw = msgpack.write(src);
// Deserialize directly using a template
List<String> dst1 = msgpack.read(raw,Templates.tList(Templates.TString));
System.out.println(dst1.get(0));
System.out.println(dst1.get(1));
System.out.println(dst1.get(2));
} catch (IOException e) {
e.printStackTrace();
}
}

}

MessagePack 编码器和解码器开发

将需要的msgpack-0.6.6.jar和javassist-3.9.0.jar引用到项目里

创建编码器

import org.msgpack.MessagePack;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class MsgpackEncoder extends MessageToByteEncoder<Object> {

@Override
protected void encode(ChannelHandlerContext arg0, Object arg1, ByteBuf arg2) throws Exception {
MessagePack msgpack=new MessagePack();
byte[] raw=msgpack.write(arg1);
arg2.writeBytes(raw);
}
}

解码器

import java.util.List;

import org.msgpack.MessagePack;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;

public class MsgPackDecoder extends MessageToMessageDecoder<ByteBuf> {

@Override
protected void decode(ChannelHandlerContext arg0, ByteBuf arg1, List<Object> arg2) throws Exception {
final byte[] array;
final int length=arg1.readableBytes();
array=new byte[length];
arg1.getBytes(arg1.readerIndex(), array,0,length);
MessagePack msgpack=new MessagePack();
arg2.add(msgpack.read(array));
}

}

EchoClient.java 示例

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
public class EchoClient {
private final String host;
private final int port;
private final int sendNumber;
public EchoClient(int port,String host,int sendNumber){
this.host=host;
this.port=port;
this.sendNumber=sendNumber;
}

public void run() throws Exception{
EventLoopGroup group=new NioEventLoopGroup();
try{
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//LengthFieldBasedFrameDecoder用于处理半包消息
//这样后面的MsgpackDecoder接收的永远是整包消息
ch.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(65535,0,2,0,2));
ch.pipeline().addLast("msgpack decoder",new MsgPackDecoder());
//在ByteBuf之前增加2个字节的消息长度字段
ch.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2));
ch.pipeline().addLast("msgpack encoder",new MsgpackEncoder());
ch.pipeline().addLast(new EchoClientHandler(sendNumber));
}

});
ChannelFuture f= b.connect(host,port).sync();
f.channel().closeFuture().sync();
}finally{
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception{
int port=8080;
new EchoClient(port,"127.0.0.1",1).run();
}
}

没有考虑半包情况

EchoClientHandler.java

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class EchoClientHandler extends ChannelHandlerAdapter{
private final int sendNumber;
private int counter;
public EchoClientHandler(int sendNumber){
this.sendNumber=sendNumber;
}

@Override
public void channelActive(ChannelHandlerContext ctx){
UserInfo [] infos = UserInfo();
for(UserInfo infoE : infos){
ctx.write(infoE);
}
ctx.flush();
}
private UserInfo[] UserInfo(){
UserInfo [] userInfos=new UserInfo[sendNumber];
UserInfo userInfo=null;
for(int i=0;i<sendNumber;i++){
userInfo=new UserInfo();
userInfo.setAge(i);
userInfo.setName("ABCDEFG --->"+i);
userInfos[i]=userInfo;
}
return userInfos;
}
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
System.out.println("This is " + ++counter + " times receive server : [" + msg + "]");
ctx.write(msg);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx)throws Exception{
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
cause.printStackTrace();
ctx.close();
}
}


举报

相关推荐

0 条评论