netty 使用 tcp/ip 协议传输数据。而 tcp/ip 协议是类似水流一样的数据传输方式。多次 访问的时候有可能出现数据粘包的问题,解决这种问题的方式如下:
定长数据流
客户端和服务器,提前协调好,每个消息长度固定。(如:长度 10)。如果客户端或服 务器写出的数据不足 10,则使用空白字符补足(如:使用空格)。
/**
* 1. 单线程组
* 2. Bootstrap配置启动信息
* 3. 注册业务处理Handler
* 4. connect连接服务,并发起请求
*/
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
public class Client4FixedLength {
// 处理请求和处理服务端响应的线程组
private EventLoopGroup group = null;
// 服务启动相关配置信息
private Bootstrap bootstrap = null;
public Client4FixedLength(){
init();
}
private void init(){
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
// 绑定线程组
bootstrap.group(group);
// 设定通讯模式为NIO
bootstrap.channel(NioSocketChannel.class);
}
public ChannelFuture doRequest(String host, int port) throws InterruptedException{
this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelHandler[] handlers = new ChannelHandler[3];
handlers[0] = new FixedLengthFrameDecoder(3);
// 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象
handlers[1] = new StringDecoder(Charset.forName("UTF-8"));
handlers[2] = new Client4FixedLengthHandler();
ch.pipeline().addLast(handlers);
}
});
ChannelFuture future = this.bootstrap.connect(host, port).sync();
return future;
}
public void release(){
this.group.shutdownGracefully();
}
public static void main(String[] args) {
Client4FixedLength client = null;
ChannelFuture future = null;
try{
client = new Client4FixedLength();
future = client.doRequest("localhost", 9999);
Scanner s = null;
while(true){
s = new Scanner(System.in);
System.out.print("enter message send to server > ");
String line = s.nextLine();
byte[] bs = new byte[5];
byte[] temp = line.getBytes("UTF-8");
if(temp.length <= 5){
for(int i = 0; i < temp.length; i++){
bs[i] = temp[i];
}
}
future.channel().writeAndFlush(Unpooled.copiedBuffer(bs));
TimeUnit.SECONDS.sleep(1);
}
}catch(Exception e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != client){
client.release();
}
}
}
}
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
public class Client4FixedLengthHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
String message = msg.toString();
System.out.println("from server : " + message);
}finally{
// 用于释放缓存。避免内存溢出
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}
}
/**
* 1. 双线程组
* 2. Bootstrap配置启动信息
* 3. 注册业务处理Handler
* 4. 绑定服务监听端口并启动服务
*/
import java.nio.charset.Charset;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
public class Server4FixedLength {
// 监听线程组,监听客户端请求
private EventLoopGroup acceptorGroup = null;
// 处理客户端相关操作线程组,负责处理与客户端的数据通讯
private EventLoopGroup clientGroup = null;
// 服务启动相关配置信息
private ServerBootstrap bootstrap = null;
public Server4FixedLength(){
init();
}
private void init(){
acceptorGroup = new NioEventLoopGroup();
clientGroup = new NioEventLoopGroup();
bootstrap = new ServerBootstrap();
// 绑定线程组
bootstrap.group(acceptorGroup, clientGroup);
// 设定通讯模式为NIO
bootstrap.channel(NioServerSocketChannel.class);
// 设定缓冲区大小
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
// SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
.option(ChannelOption.SO_RCVBUF, 16*1024)
.option(ChannelOption.SO_KEEPALIVE, true);
}
public ChannelFuture doAccept(int port) throws InterruptedException{
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelHandler[] acceptorHandlers = new ChannelHandler[3];
// 定长Handler。通过构造参数设置消息长度(单位是字节)。发送的消息长度不足可以使用空格补全。
acceptorHandlers[0] = new FixedLengthFrameDecoder(5);
// 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象
acceptorHandlers[1] = new StringDecoder(Charset.forName("UTF-8"));
acceptorHandlers[2] = new Server4FixedLengthHandler();
ch.pipeline().addLast(acceptorHandlers);
}
});
ChannelFuture future = bootstrap.bind(port).sync();
return future;
}
public void release(){
this.acceptorGroup.shutdownGracefully();
this.clientGroup.shutdownGracefully();
}
public static void main(String[] args){
ChannelFuture future = null;
Server4FixedLength server = null;
try{
server = new Server4FixedLength();
future = server.doAccept(9999);
System.out.println("server started.");
future.channel().closeFuture().sync();
}catch(InterruptedException e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != server){
server.release();
}
}
}
}
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class Server4FixedLengthHandler extends ChannelHandlerAdapter {
// 业务处理逻辑
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = msg.toString();
System.out.println("from client : " + message.trim());
String line = "ok ";
ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
}
// 异常处理逻辑
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("server exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}
}
特殊结束符
客户端和服务器,协商定义一个特殊的分隔符号,分隔符号长度自定义。如:‘#’、‘$_$’、 ‘AA@’。在通讯的时候,只要没有发送分隔符号,则代表一条数据没有结束。
import java.nio.charset.Charset;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
public class Server4Delimiter {
// 监听线程组,监听客户端请求
private EventLoopGroup acceptorGroup = null;
// 处理客户端相关操作线程组,负责处理与客户端的数据通讯
private EventLoopGroup clientGroup = null;
// 服务启动相关配置信息
private ServerBootstrap bootstrap = null;
public Server4Delimiter(){
init();
}
public void init(){
acceptorGroup = new NioEventLoopGroup();
clientGroup = new NioEventLoopGroup();
bootstrap = new ServerBootstrap();
// 绑定线程组
bootstrap.group(acceptorGroup, clientGroup);
// 设定通讯模式为NIO
bootstrap.channel(NioServerSocketChannel.class);
// 设定缓冲区大小
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
// SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
.option(ChannelOption.SO_RCVBUF, 16*1024)
.option(ChannelOption.SO_KEEPALIVE, true);
}
public ChannelFuture doAccept(int port) throws InterruptedException{
bootstrap.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
public void initChannel(SocketChannel ch) throws Exception{
// 数据分隔符, 定义的数据分隔符一定是一个ByteBuf类型的数据对象。
ByteBuf delimiter =Unpooled.copiedBuffer("$E$".getBytes());
ChannelHandler[] acceptorHandlers =new ChannelHandler[3];
// 处理固定结束标记符号的Handler。这个Handler没有@Sharable注解修饰,
// 必须每次初始化通道时创建一个新对象
// 使用特殊符号分隔处理数据粘包问题,也要定义每个数据包最大长度。netty建议数据有最大长度。
acceptorHandlers[0]=new DelimiterBasedFrameDecoder(1024, delimiter);
// 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象
acceptorHandlers[1]=new StringDecoder(Charset.forName("UTF-8"));
acceptorHandlers[2]=new Server4DelimiterHandler();
ch.pipeline().addLast(acceptorHandlers);
}
});
ChannelFuture future =bootstrap.bind(port).sync();
return future;
}
public void release(){
this.acceptorGroup.shutdownGracefully();
this.clientGroup.shutdownGracefully();
}
public static void main(String[] args) {
ChannelFuture future = null;
Server4Delimiter server = null;
try{
server = new Server4Delimiter();
future = server.doAccept(9999);
System.out.println("server started.");
future.channel().closeFuture().sync();
}catch(InterruptedException e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != server){
server.release();
}
}
}
}
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class Server4DelimiterHandler extends ChannelHandlerAdapter {
// 业务处理逻辑
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = msg.toString();
System.out.println("from client : " + message);
String line = "server message $E$ test delimiter handler!! $E$ second message $E$";
ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
}
// 异常处理逻辑
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("server exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}
}
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
public class Client4Delimiter {
// 处理请求和处理服务端响应的线程组
private EventLoopGroup group = null;
// 服务启动相关配置信息
private Bootstrap bootstrap = null;
public Client4Delimiter(){
init();
}
private void init(){
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
// 绑定线程组
bootstrap.group(group);
// 设定通讯模式为NIO
bootstrap.channel(NioSocketChannel.class);
}
public ChannelFuture doRequest(String host, int port) throws InterruptedException{
this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 数据分隔符
ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes());
ChannelHandler[] handlers = new ChannelHandler[3];
handlers[0] = new DelimiterBasedFrameDecoder(1024, delimiter);
// 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象
handlers[1] = new StringDecoder(Charset.forName("UTF-8"));
handlers[2] = new Client4DelimiterHandler();
ch.pipeline().addLast(handlers);
}
});
ChannelFuture future = this.bootstrap.connect(host, port).sync();
return future;
}
public void release(){
this.group.shutdownGracefully();
}
public static void main(String[] args) {
Client4Delimiter client = null;
ChannelFuture future = null;
try{
client = new Client4Delimiter();
future = client.doRequest("localhost", 9999);
Scanner s = null;
while(true){
s = new Scanner(System.in);
System.out.print("enter message send to server > ");
String line = s.nextLine();
future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
TimeUnit.SECONDS.sleep(1);
}
}catch(Exception e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != client){
client.release();
}
}
}
}
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
public class Client4DelimiterHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
String message = msg.toString();
System.out.println("from server : " + message);
}finally{
// 用于释放缓存。避免内存溢出
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}
}
协议
相对最成熟的数据传递方式。有服务器的开发者提供一个固定格式的协议标准。客户端 和服务器发送数据和接受数据的时候,都依据协议制定和解析消息。
自定义协议格式:
协议格式:
HEADcontent-length:xxxxHEADBODYxxxxxxBODY
/**
* 1. 双线程组
* 2. Bootstrap配置启动信息
* 3. 注册业务处理Handler
* 4. 绑定服务监听端口并启动服务
*/
import java.nio.charset.Charset;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
public class Server4Protocol {
// 监听线程组,监听客户端请求
private EventLoopGroup acceptorGroup = null;
// 处理客户端相关操作线程组,负责处理与客户端的数据通讯
private EventLoopGroup clientGroup = null;
// 服务启动相关配置信息
private ServerBootstrap bootstrap = null;
public Server4Protocol() {
init();
}
private void init() {
acceptorGroup = new NioEventLoopGroup();
clientGroup = new NioEventLoopGroup();
bootstrap = new ServerBootstrap();
// 绑定线程组
bootstrap.group(acceptorGroup, clientGroup);
// 设定通讯模式为NIO
bootstrap.channel(NioServerSocketChannel.class);
// 设定缓冲区大小
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
// SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
bootstrap.option(ChannelOption.SO_SNDBUF, 16 * 1024).option(ChannelOption.SO_RCVBUF, 16 * 1024)
.option(ChannelOption.SO_KEEPALIVE, true);
}
public ChannelFuture doAccept(int port, final ChannelHandler... acceptorHandlers) throws InterruptedException {
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(acceptorHandlers);
}
});
ChannelFuture future = bootstrap.bind(port).sync();
return future;
}
public void release() {
this.acceptorGroup.shutdownGracefully();
this.clientGroup.shutdownGracefully();
}
public static void main(String[] args) {
ChannelFuture future = null;
Server4Protocol server = null;
try {
server = new Server4Protocol();
future = server.doAccept(9999, new Server4ProtocolHandler());
System.out.println("server started.");
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != server){
server.release();
}
}
}
}
/**
* @Sharable注解 -
* 代表当前Handler是一个可以分享的处理器。也就意味着,服务器注册此Handler后,可以分享给多个客户端同时使用。
* 如果不使用注解描述类型,则每次客户端请求时,必须为客户端重新创建一个新的Handler对象。
*
*/
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
@Sharable
public class Server4ProtocolHandler extends ChannelHandlerAdapter {
// 业务处理逻辑
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = msg.toString();
System.out.println("server receive protocol content : " + message);
message = ProtocolParser.parse(message);
if(null == message){
System.out.println("error request from client");
return ;
}
System.out.println("from client : " + message);
String line = "server message";
line = ProtocolParser.transferTo(line);
System.out.println("server send protocol content : " + line);
ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
}
// 异常处理逻辑
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("server exceptionCaught method run...");
cause.printStackTrace();
ctx.close();
}
static class ProtocolParser{
public static String parse(String message) {
String[] temp=message.split("HEADBODY");
temp[0]=temp[0].substring(4);
temp[1]=temp[1].substring(0,temp[1].length()-4);
int length=Integer.parseInt(temp[0].substring(temp[0].indexOf(":")+1));
if(length != temp[1].length()){
return null;
}
return temp[1];
}
public static String transferTo(String message){
message = "HEADcontent-length:" + message.length() + "HEADBODY" + message + "BODY";
return message;
}
}
}
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
public class Client4Protocol {
// 处理请求和处理服务端响应的线程组
private EventLoopGroup group = null;
// 服务启动相关配置信息
private Bootstrap bootstrap = null;
public Client4Protocol(){
init();
}
private void init(){
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
// 绑定线程组
bootstrap.group(group);
// 设定通讯模式为NIO
bootstrap.channel(NioSocketChannel.class);
}
public ChannelFuture doRequest(String host, int port, final ChannelHandler... handlers) throws InterruptedException{
this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(handlers);
}
});
ChannelFuture future = this.bootstrap.connect(host, port).sync();
return future;
}
public void release(){
this.group.shutdownGracefully();
}
public static void main(String[] args) {
Client4Protocol client = null;
ChannelFuture future = null;
try{
client = new Client4Protocol();
future = client.doRequest("localhost", 9999, new Client4ProtocolHandler());
Scanner s = null;
while(true){
s = new Scanner(System.in);
System.out.print("enter message send to server > ");
String line = s.nextLine();
line = Client4ProtocolHandler.ProtocolParser.transferTo(line);
System.out.println("client send protocol content : " + line);
future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
TimeUnit.SECONDS.sleep(1);
}
}catch(Exception e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != client){
client.release();
}
}
}
}
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
public class Client4ProtocolHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
String message = msg.toString();
System.out.println("client receive protocol content : " + message);
message = ProtocolParser.parse(message);
if(null == message){
System.out.println("error response from server");
return ;
}
System.out.println("from server : " + message);
}finally{
// 用于释放缓存。避免内存溢出
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}
static class ProtocolParser{
public static String parse(String message){
String[] temp = message.split("HEADBODY");
temp[0] = temp[0].substring(4);
temp[1] = temp[1].substring(0, (temp[1].length()-4));
int length = Integer.parseInt(temp[0].substring(temp[0].indexOf(":")+1));
if(length != temp[1].length()){
return null;
}
return temp[1];
}
public static String transferTo(String message){
message = "HEADcontent-length:" + message.length() + "HEADBODY" + message + "BODY";
return message;
}
}
}
序列化对象
JBoss Marshalling 序列化
Java 是面向对象的开发语言。传递的数据如果是 Java 对象,应该是最方便且可靠。
/**
* 1. 双线程组
* 2. Bootstrap配置启动信息
* 3. 注册业务处理Handler
* 4. 绑定服务监听端口并启动服务
*/
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import utils.SerializableFactory4Marshalling;
public class Server4Serializable {
// 监听线程组,监听客户端请求
private EventLoopGroup acceptorGroup = null;
// 处理客户端相关操作线程组,负责处理与客户端的数据通讯
private EventLoopGroup clientGroup = null;
// 服务启动相关配置信息
private ServerBootstrap bootstrap = null;
public Server4Serializable(){
init();
}
private void init(){
acceptorGroup = new NioEventLoopGroup();
clientGroup = new NioEventLoopGroup();
bootstrap = new ServerBootstrap();
// 绑定线程组
bootstrap.group(acceptorGroup, clientGroup);
// 设定通讯模式为NIO
bootstrap.channel(NioServerSocketChannel.class);
// 设定缓冲区大小
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
// SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
.option(ChannelOption.SO_RCVBUF, 16*1024)
.option(ChannelOption.SO_KEEPALIVE, true);
}
public ChannelFuture doAccept(int port, final ChannelHandler... acceptorHandlers) throws InterruptedException{
bootstrap.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
ch.pipeline().addLast(acceptorHandlers);
}
});
ChannelFuture future=bootstrap.bind(port).sync();
return future;
}
public void release(){
this.acceptorGroup.shutdownGracefully();
this.clientGroup.shutdownGracefully();
}
public static void main(String[] args){
ChannelFuture future = null;
Server4Serializable server = null;
try{
server = new Server4Serializable();
future = server.doAccept(9999,new Server4SerializableHandler());
System.out.println("server started.");
future.channel().closeFuture().sync();
}catch(InterruptedException e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != server){
server.release();
}
}
}
}
/**
* @Sharable注解 -
* 代表当前Handler是一个可以分享的处理器。也就意味着,服务器注册此Handler后,可以分享给多个客户端同时使用。
* 如果不使用注解描述类型,则每次客户端请求时,必须为客户端重新创建一个新的Handler对象。
*
*/
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import utils.GzipUtils;
import utils.RequestMessage;
import utils.ResponseMessage;
@Sharable
public class Server4SerializableHandler extends ChannelHandlerAdapter{
// 业务处理逻辑
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("from client : ClassName - " + msg.getClass().getName()
+ " ; message : " + msg.toString());
if(msg instanceof RequestMessage){
RequestMessage request = (RequestMessage)msg;
//byte[] attachment = GzipUtils.unzip(request.getAttachment());
//System.out.println(new String(attachment));
}
ResponseMessage response = new ResponseMessage(0L, "test response");
ctx.writeAndFlush(response);
}
// 异常处理逻辑
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("server exceptionCaught method run...");
cause.printStackTrace();
ctx.close();
}
}
/**
* 1. 单线程组
* 2. Bootstrap配置启动信息
* 3. 注册业务处理Handler
* 4. connect连接服务,并发起请求
*/
import java.util.Random;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import utils.GzipUtils;
import utils.RequestMessage;
import utils.SerializableFactory4Marshalling;
public class Client4Serializable {
// 处理请求和处理服务端响应的线程组
private EventLoopGroup group = null;
// 服务启动相关配置信息
private Bootstrap bootstrap = null;
public Client4Serializable() {
init();
}
private void init() {
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
// 绑定线程组
bootstrap.group(group);
// 设定通讯模式为NIO
bootstrap.channel(NioSocketChannel.class);
}
public ChannelFuture doRequest(String host, int port, final ChannelHandler... handlers)
throws InterruptedException {
this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
ch.pipeline().addLast(handlers);
}
});
ChannelFuture future = this.bootstrap.connect(host, port).sync();
return future;
}
public void release(){
this.group.shutdownGracefully();
}
public static void main(String[] args) {
Client4Serializable client = null;
ChannelFuture future = null;
try{
client = new Client4Serializable();
future = client.doRequest("localhost", 9999, new Client4SerializableHandler());
String attachment = "test attachment";
byte[] attBuf = attachment.getBytes();
//attBuf = GzipUtils.zip(attBuf);
RequestMessage msg = new RequestMessage(new Random().nextLong(),
"test",new byte[0]);
//"test", attBuf);
future.channel().writeAndFlush(msg);
TimeUnit.SECONDS.sleep(1);
future.addListener(ChannelFutureListener.CLOSE);
}catch(Exception e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != client){
client.release();
}
}
}
}
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class Client4SerializableHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("from server : ClassName - " + msg.getClass().getName()
+ " ; message : " + msg.toString());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exceptionCaught method run...");
cause.printStackTrace();
ctx.close();
}
}
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
public class GzipUtils {
public static void main(String[] args) throws Exception {
FileInputStream fis = new FileInputStream("D:\\3\\1.jpg");
byte[] temp = new byte[fis.available()];
int length=fis.read(temp);
System.out.println("长度 : " + length);
byte[] zipArray = GzipUtils.zip(temp);
System.out.println("压缩后的长度 : " + zipArray.length);
byte[] unzipArray = GzipUtils.unzip(zipArray);
System.out.println("解压缩后的长度 : " + unzipArray.length);
FileOutputStream fos = new FileOutputStream("D:\\3\\101.jpg");
fos.write(unzipArray);
fos.flush();
fos.close();
fis.close();
}
/**
* 解压缩
* @param source 源数据。需要解压的数据。
* @return 解压后的数据。 恢复的数据。
* @throws Exception
*/
public static byte[] unzip(byte[] source) throws Exception{
ByteArrayOutputStream out = new ByteArrayOutputStream();
ByteArrayInputStream in = new ByteArrayInputStream(source);
// JDK提供的。 专门用于压缩使用的流对象。可以处理字节数组数据。
GZIPInputStream zipIn = new GZIPInputStream(in);
byte[] temp=new byte[256];
int length = 0;
while((length = zipIn.read(temp, 0, temp.length)) != -1){
out.write(temp, 0, length);
}
// 将字节数组输出流中的数据,转换为一个字节数组。
byte[] target = out.toByteArray();
zipIn.close();
out.close();
return target;
}
/**
* 压缩
* @param source 源数据,需要压缩的数据
* @return 压缩后的数据。
* @throws Exception
*/
public static byte[] zip(byte[] source) throws Exception{
ByteArrayOutputStream out = new ByteArrayOutputStream();
// 输出流,JDK提供的,提供解压缩功能。
GZIPOutputStream zipOut = new GZIPOutputStream(out);
// 将压缩信息写入到内存。 写入的过程会实现解压。
zipOut.write(source);
// 结束。
zipOut.finish();
byte[] target = out.toByteArray();
zipOut.close();
return target;
}
}
import java.io.Serializable;
public class RequestMessage implements Serializable {
private static final long serialVersionUID = 7084843947860990140L;
private Long id;
private String message;
private byte[] attachment;
@Override
public String toString() {
return "RequestMessage [id=" + id + ", message=" + message + "]";
}
public RequestMessage() {
super();
}
public RequestMessage(Long id, String message, byte[] attachment) {
super();
this.id = id;
this.message = message;
this.attachment = attachment;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public byte[] getAttachment() {
return attachment;
}
public void setAttachment(byte[] attachment) {
this.attachment = attachment;
}
}
import java.io.Serializable;
public class ResponseMessage implements Serializable {
private static final long serialVersionUID = -8134313953478922076L;
private Long id;
private String message;
@Override
public String toString() {
return "ResponseMessage [id=" + id + ", message=" + message + "]";
}
public ResponseMessage() {
super();
}
public ResponseMessage(Long id, String message) {
super();
this.id = id;
this.message = message;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;
import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;
public class SerializableFactory4Marshalling {
/**
* 创建Jboss Marshalling解码器MarshallingDecoder
*
* @return MarshallingDecoder
*/
public static MarshallingDecoder buildMarshallingDecoder() {
// 首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
// jboss-marshalling-serial 包提供
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
// 创建了MarshallingConfiguration对象,配置了版本号为5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
// 序列化版本。只要使用JDK5以上版本,version只能定义为5。
configuration.setVersion(5);
// 根据marshallerFactory和configuration创建provider
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
// 构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
return decoder;
}
/**
* 创建Jboss Marshalling编码器MarshallingEncoder
* @return MarshallingEncoder
*/
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}
定时断线重连
客户端断线重连机制。
客户端数量多,且需要传递的数据量级较大。可以周期性的发送数据的时候,使用。
要 求对数据的即时性不高的时候,才可使用。
优点: 可以使用数据缓存。不是每条数据进行一次数据交互。可以定时回收资源,对 资源利用率高。相对来说,即时性可以通过其他方式保证。如: 120 秒自动断线。数据变 化 1000 次请求服务器一次。300 秒中自动发送不足 1000 次的变化数据
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;
import utils.SerializableFactory4Marshalling;
public class Server4Timer {
// 监听线程组,监听客户端请求
private EventLoopGroup acceptorGroup = null;
// 处理客户端相关操作线程组,负责处理与客户端的数据通讯
private EventLoopGroup clientGroup = null;
// 服务启动相关配置信息
private ServerBootstrap bootstrap = null;
public Server4Timer(){
init();
}
private void init(){
acceptorGroup = new NioEventLoopGroup();
clientGroup = new NioEventLoopGroup();
bootstrap = new ServerBootstrap();
// 绑定线程组
bootstrap.group(acceptorGroup, clientGroup);
// 设定通讯模式为NIO
bootstrap.channel(NioServerSocketChannel.class);
// 设定缓冲区大小
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
// SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
.option(ChannelOption.SO_RCVBUF, 16*1024)
.option(ChannelOption.SO_KEEPALIVE, true);
// 增加日志Handler,日志级别为info
// bootstrap.handler(new LoggingHandler(LogLevel.INFO));
}
public ChannelFuture doAccept(int port) throws InterruptedException{
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
// 定义一个定时断线处理器,当多长时间内,没有任何的可读取数据,自动断开连接。
// 构造参数,就是间隔时长。 默认的单位是秒。
// 自定义间隔时长单位。 new ReadTimeoutHandler(long times, TimeUnit unit);
ch.pipeline().addLast(new ReadTimeoutHandler(3));
ch.pipeline().addLast(new Server4TimerHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
return future;
}
public void release(){
this.acceptorGroup.shutdownGracefully();
this.clientGroup.shutdownGracefully();
}
public static void main(String[] args){
ChannelFuture future = null;
Server4Timer server = null;
try{
server = new Server4Timer();
future = server.doAccept(9999);
System.out.println("server started.");
future.channel().closeFuture().sync();
}catch(InterruptedException e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != server){
server.release();
}
}
}
}
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import utils.ResponseMessage;
public class Server4TimerHandler extends ChannelHandlerAdapter {
// 业务处理逻辑
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("from client : ClassName - " + msg.getClass().getName()
+ " ; message : " + msg.toString());
ResponseMessage response = new ResponseMessage(0L, "test response");
ctx.writeAndFlush(response);
}
// 异常处理逻辑
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("server exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}
}
import java.util.Random;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.WriteTimeoutHandler;
import utils.RequestMessage;
import utils.SerializableFactory4Marshalling;
public class Client4Timer {
// 处理请求和处理服务端响应的线程组
private EventLoopGroup group = null;
// 服务启动相关配置信息
private Bootstrap bootstrap = null;
private ChannelFuture future = null;
public Client4Timer(){
init();
}
private void init(){
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
// 绑定线程组
bootstrap.group(group);
// 设定通讯模式为NIO
bootstrap.channel(NioSocketChannel.class);
// bootstrap.handler(new LoggingHandler(LogLevel.INFO));
}
public void setHandlers() throws InterruptedException{
this.bootstrap.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
// 写操作自定断线。 在指定时间内,没有写操作,自动断线。
ch.pipeline().addLast(new WriteTimeoutHandler(3));
ch.pipeline().addLast(new Client4TimerHandler());
}
});
}
public ChannelFuture getChannelFuture(String host, int port) throws InterruptedException{
if(future == null){
future = this.bootstrap.connect(host, port).sync();
}
if(!future.channel().isActive()){
future = this.bootstrap.connect(host, port).sync();
}
return future;
}
public void release(){
this.group.shutdownGracefully();
}
public static void main(String[] args) {
Client4Timer client = null;
ChannelFuture future = null;
try{
client = new Client4Timer();
client.setHandlers();
future = client.getChannelFuture("localhost", 9999);
for(int i = 0; i < 3; i++){
RequestMessage msg = new RequestMessage(new Random().nextLong(),
"test"+i, new byte[0]);
future.channel().writeAndFlush(msg);
TimeUnit.SECONDS.sleep(2);
}
TimeUnit.SECONDS.sleep(5);
future = client.getChannelFuture("localhost", 9999);
RequestMessage msg = new RequestMessage(new Random().nextLong(),
"test", new byte[0]);
future.channel().writeAndFlush(msg);
}catch(Exception e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != client){
client.release();
}
}
}
}
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class Client4TimerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("from server : ClassName - " + msg.getClass().getName()
+ " ; message : " + msg.toString());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exceptionCaught method run...");
cause.printStackTrace();
ctx.close();
}
/**
* 当连接建立成功后,出发的代码逻辑。
* 在一次连接中只运行唯一一次。
* 通常用于实现连接确认和资源初始化的。
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client channel active");
}
}