0
点赞
收藏
分享

微信扫一扫

7.Netty框架与Marshalling解编码框架结合的入门案例


博客概述

解编码技术说白了就是java的序列化技术,序列化的目的有两个,第一是进行网络传输,第二是对象持久化。虽然我们可以使用java进行对象序列化,netty去传输。但是java序列化的硬伤太多,比如java序列化没办法跨语言,序列化后码流太大,序列化性能太低等等。
目前主流的解编码框架有google的protobuf,jboos的Marshalling包。本文就是对后者进行了简单的整合实现。Marshalling对jdk默认的序列化框架进行了优化,但是又保持了兼容同时又增加了一些可调的参数和附加的特性。

服务器端代码

package marshalling;

import com.oracle.webservices.internal.api.message.MessageContextFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
* @Auther: ;李泽
* @Date: 2019/3/4 22:04
* @Description:
*/
public class NettyServer {
private int port;

public NettyServer(int port) {
this.port = port;
}

public void run() throws Exception {
/*
bossGroup负责接受网络连接以及建立连接
workerGroup负责具体的网络通讯
*/
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
/*
ServerBootstrap 是一个启动服务器端NIO服务的辅助启动类。
*/
ServerBootstrap b = new ServerBootstrap();
//给启动器配置两个线程组,前一个是接受连接,后一个具体干活
b.group(bossGroup, workerGroup)
//这里我们指定使用NioServerSocketChannel类来说明服务器端接受的tcp连接是nio模式的。
.channel(NioServerSocketChannel.class)
//设置日志
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//设置Marshalling解编码器。
ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
ch.pipeline().addLast(new ServerHandler());
}
})

.option(ChannelOption.SO_BACKLOG, 128) //tcp缓冲区
/*
保持连接不断开
*/
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.SO_SNDBUF,32*1024) //设置发送缓冲的大小。
.option(ChannelOption.SO_RCVBUF,32*1024); //设置接受数据缓冲的大小。

// 绑定并且启动服务来接受到来的请求
ChannelFuture f = b.bind(port).sync(); //

// 起到的作用就是阻塞在这不让server关闭。
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new NettyServer(port).run();
}
}

服务器端handler

package marshalling;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;


/**
* @Auther: ;李泽
* @Date: 2019/3/4 22:05
* @Description: 继承自ChannelHandlerAdapter,这个类实现了ChannelHandler接口,ChannelHandler提供了许多事件处理
* 的接口方法,然后你可以覆盖这些方法。现在仅仅只需要继承ChannelHandlerAdapter类而不是你自己去实
* 现接口方法。
*/
public class ServerHandler extends ChannelHandlerAdapter {
/**
* 功能描述: exceptionCaught()事件处理方法是当出现Throwable对象才会被调用,即当Netty由于IO错误或者处理器在处理事件
* 时抛出的异常时。在大部分情况下,捕获的异常应该被记录下来并且把关联的channel给关闭掉。然而这个方法的处
* 理方式会在遇到不同异常的情况下有不同的实现,比如你可能想在关闭连接之前发送一个错误码的响应消息。
*
* @auther: 李泽
* @date: 2019/3/4 22:24
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
/**
* 功能描述: 这里我们覆盖了chanelRead()事件处理方法。每当从客户端收到新的数据时,这个方法会在收到消息时被调用,
* 这个例子中,收到的消息的类型是Request,因为我们使用了解编码框架
*
* @auther: 李泽
* @date: 2019/3/4 22:23
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//接收数据
Request request = (Request) msg;
System.out.println("request = " + request);
//回写数据
Response response = new Response();
response.setId(request.getId());
response.setName("resp "+request.getName());
response.setResponseMessage("resp "+request.getRequestMessage());
ctx.writeAndFlush(response);
}
}

客户端

package marshalling;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.io.UnsupportedEncodingException;

/**
* @Auther: ;李泽
* @Date: 2019/3/4 22:05
* @Description:
*/
public class NettyClient {
public static void main(String[] args) throws InterruptedException, UnsupportedEncodingException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//设置Marshalling解编码器。
socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
//这里面的是接受服务器端反馈时,才会触发的handler
socketChannel.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture channelFuture1 = bootstrap.connect("127.0.0.1",8080).sync();
//客户端发送数据,因为用了解编码框架
for (int i = 0; i < 5; i++) {
Request request = new Request();
request.setId(i+"");
request.setName("item"+i);
request.setRequestMessage("i am msg"+i);
channelFuture1.channel().writeAndFlush(request);
}


//可以理解为阻塞在这
channelFuture1.channel().closeFuture().sync();
eventLoopGroup.shutdownGracefully();
}
}

客户端handler

package marshalling;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

/**
* @Auther: ;李泽
* @Date: 2019/3/4 22:05
* @Description:
*/
public class ClientHandler extends ChannelHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
cause.printStackTrace();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//super.channelRead(ctx, msg);
try {
Response response = (Response) msg;
System.out.println("resp = " + response);
//buffer.release();
} finally {
ReferenceCountUtil.release(msg);
}
}
}

Marshalling解编码器工厂

package marshalling;

import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;

import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;

/**
* Marshalling工厂
* @author(李泽)
* @since 2019-3-10
*/
public final class MarshallingCodeCFactory {

/**
* 创建Jboss Marshalling解码器MarshallingDecoder
* @return MarshallingDecoder
*/
public static MarshallingDecoder buildMarshallingDecoder() {
//首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
//创建了MarshallingConfiguration对象,配置了版本号为5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
//根据marshallerFactory和configuration创建provider
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
return decoder;
}

/**
* 创建Jboss Marshalling编码器MarshallingEncoder
* @return MarshallingEncoder
*/
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}

通讯实体

package marshalling;

import java.io.Serializable;

public class Request implements Serializable{

private static final long SerialVersionUID = 1L;

private String id ;
private String name ;
private String requestMessage ;

public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getRequestMessage() {
return requestMessage;
}
public void setRequestMessage(String requestMessage) {
this.requestMessage = requestMessage;
}

@Override
public String toString() {
return "Request{" +
"id='" + id + '\'' +
", name='" + name + '\'' +
", requestMessage='" + requestMessage + '\'' +
'}';
}
}
package marshalling;

import java.io.Serializable;

public class Response implements Serializable{

private static final long serialVersionUID = 1L;

private String id;
private String name;
private String responseMessage;

public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getResponseMessage() {
return responseMessage;
}
public void setResponseMessage(String responseMessage) {
this.responseMessage = responseMessage;
}

@Override
public String toString() {
return "Response{" +
"id='" + id + '\'' +
", name='" + name + '\'' +
", responseMessage='" + responseMessage + '\'' +
'}';
}
}


举报

相关推荐

0 条评论