目录
任务拆解:
- 首先需要建立多客户端,每个客户端有一个独立的clientId和对应的tcp通道对应
- 能动态的根据clientId关闭对应的转发任务
- 停止服务的时候,需要断开所有的客户端连接,减少开销
- 客户端需要实现断线重连(考虑到断网的清空)
注意:本篇文章是只是实现转发操作,不支持转发的服务器,反向控制设备,需要做特殊处理,如果大家感兴趣,给我留言
下面我们根据我们头脑风暴的结果,来想办法实现上面的过程
创建数据库表
CREATE TABLE `station_message_transmit` (
`id` bigint(32) NOT NULL COMMENT '主键',
`station_id` int(11) NOT NULL COMMENT '站点id',
`host` varchar(50) DEFAULT NULL COMMENT '主机ip',
`port` int(11) DEFAULT NULL COMMENT '端口',
`create_by` varchar(64) DEFAULT NULL COMMENT '创建人',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_by` varchar(64) DEFAULT NULL COMMENT '修改人',
`update_time` datetime DEFAULT NULL COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- 所有的转发数据,都是基于单个站点(单个设备)
- id是唯一的,后续会通过该id绑定tcp通道,来实现发数据,关闭连接等操作
xml
<?xml version="1.0"?>
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
<relativePath />
</parent>
<groupId>boot.base.tcp.client</groupId>
<artifactId>boot-example-base-tcp-client-2.0.5</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>boot-example-base-tcp-client-2.0.5</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 打包成一个可执行jar -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- 所需要的依赖,这里只是实现一个简单的demo,来实践一下,我的设想是否能实现。
实体类
- yml配置文件就不需要配置,一切从简,默认的端口是8080
package com.test;
import lombok.Data;
/**
* @author wu
* @version 1.0
* @date 2023/10/18 16:39
*/
@Data
public class StationMessageTransmit {
/** 唯一编号 */
private Long id;
/** 站点id */
private Integer stationId;
/** 主机ip */
private String host;
/** 端口 */
private Integer port;
}
启动类
package com.test;
import io.netty.channel.ChannelHandlerContext;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.ContextStartedEvent;
import org.springframework.context.event.ContextStoppedEvent;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* wu
*/
@SpringBootApplication
@EnableAsync
@EnableScheduling
public class BootNettyClientApplication implements CommandLineRunner, ApplicationListener {
public static void main( String[] args ) {
SpringApplication app = new SpringApplication(BootNettyClientApplication.class);
app.run(args);
System.out.println( "Hello World!" );
}
@Async
@Override
public void run(String... args) throws Exception {
StationMessageTransmit tran = new StationMessageTransmit();
tran.setId(1L);
tran.setHost("192.168.10.128");
tran.setPort(5000);
tran.setStationId(13);
StationMessageTransmit tran1 = new StationMessageTransmit();
tran1.setId(2L);
tran1.setHost("192.168.10.128");
tran1.setPort(5001);
tran1.setStationId(13);
List<StationMessageTransmit> traces = new ArrayList<StationMessageTransmit>();
traces.add(tran);
traces.add(tran1);
for (StationMessageTransmit trace : traces) {
BootNettyClientThread thread = new BootNettyClientThread(trace);
thread.start();
}
}
@Override
public void onApplicationEvent(ApplicationEvent applicationEvent) {
if(applicationEvent instanceof ContextClosedEvent){
System.out.println("应用关闭事件");
for (Map.Entry<String, ChannelHandlerContext> entry : BootNettyClientGroupCache.groupMapCache.entrySet()) {
ChannelHandlerContext channelHandlerContext = entry.getValue();
if(channelHandlerContext != null){
System.out.println("关闭链接:"+entry.getKey());
channelHandlerContext.close();
}
}
}else if(applicationEvent instanceof ContextRefreshedEvent){
System.out.println("应用刷新事件");
}else if(applicationEvent instanceof ContextStartedEvent){
System.out.println("应用开启事件");
}else if(applicationEvent instanceof ContextStoppedEvent){
System.out.println("应用停止事件");
}
}
}
-
run方法里面主要干的活,是一个伪代码,模拟从数据拿数据,再初始化创建多个客户端。
-
onApplicationEvent方法主要是监控服务停止的事件,这是考虑到,tcp是长链接,跟其他服务器链接是一直没有中断,会存在多次重建连接的问题,所以需要再关闭事件中,关闭所有的tcp客户端连接
线程类
package com.test;
/**
*
* netty 客户端
* wu
*/
public class BootNettyClientThread extends Thread {
private StationMessageTransmit trace;
public BootNettyClientThread(StationMessageTransmit trace){
this.trace = trace;
}
@Override
public void run() {
try {
new BootNettyClient().connect(trace);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
- 传实体类,主要是为了保证clientId和通道保证对应
客户端代码
package com.test;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
/**
*
* netty 客户端
* wu
*/
public class BootNettyClient {
private EventLoopGroup group;
public void connect(StationMessageTransmit trace) throws Exception{
/**
* 客户端的NIO线程组
*
*/
group = new NioEventLoopGroup();
try {
/**
* Bootstrap 是一个启动NIO服务的辅助启动类 客户端的
*/
Bootstrap bootstrap = new Bootstrap();
bootstrap = bootstrap.group(group);
bootstrap = bootstrap.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true);
/**
* 设置 I/O处理类,主要用于网络I/O事件,记录日志,编码、解码消息
*/
bootstrap = bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024 * 1024));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new TcpHandler(trace));
}
});
/**
* 连接服务端
*/
ChannelFuture future = bootstrap.connect(trace.getHost(), trace.getPort()).sync();
if(future.isSuccess()) {
System.out.println("netty client start success="+trace.toString());
/**
* 等待连接端口关闭
*/
future.channel().closeFuture().sync();
}
} finally {
/**
* 退出,释放资源
*/
group.shutdownGracefully().sync();
}
}
}
handle
package com.test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;
/**
*
* I/O数据读写处理类
* wu
*/
@ChannelHandler.Sharable
public class TcpHandler extends ChannelInboundHandlerAdapter{
private static ScheduledExecutorService SCHEDULED_EXECUTOR = Executors.newScheduledThreadPool(5);
private StationMessageTransmit trace;
public TcpHandler(StationMessageTransmit trace){
this.trace = trace;
}
/**
* 从服务端收到新的数据时,这个方法会在收到消息时被调用
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException {
if(msg == null){
return;
}
System.out.println("channelRead:read msg:"+msg.toString());
//回应服务端
//ctx.write("I got server message thanks server!");
}
/**
* 从服务端收到新的数据、读取完成时调用
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
System.out.println("channelReadComplete");
ctx.flush();
}
/**
* 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws IOException {
System.out.println("exceptionCaught");
cause.printStackTrace();
ctx.close();//抛出异常,断开与客户端的连接
}
/**
* 客户端与服务端第一次建立连接时 执行
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception, IOException {
super.channelActive(ctx);
InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = inSocket.getAddress().getHostAddress();
System.out.println("服务器ip:"+clientIp+",clientId:"+trace.getId());
BootNettyClientGroupCache.save(trace.getId().toString(), ctx);
}
/**
* 客户端与服务端 断连时 执行
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {
super.channelInactive(ctx);
InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = inSocket.getAddress().getHostAddress();
ctx.close(); //断开连接时,必须关闭,否则造成资源浪费
System.out.println("channelInactive:"+clientIp);
//客户端重连
//reset();
}
/**
* 客户端重连
*/
public void reset(){
//增加一个伪代码,从服务器查询id对应的转发数据是否存在,不存在,则不继续运行转发任务
SCHEDULED_EXECUTOR.schedule(() -> {
try {
System.err.println("服务端链接不上,开始重连操作...");
new BootNettyClient().connect(trace);
} catch (Exception e) {
e.printStackTrace();
}
}, 3, TimeUnit.SECONDS);
}
}
- reset方法是为了实现客户端重连,3秒钟调用一次
- channelInactive方法,客户端和服务器断开连接时会触发
- channelActive方法,客户端和服务器建立连接时,需要实现client和通道的绑定关系,方便后续回写数据
controller类
package com.test;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
/**
* wu
*/
@RestController
public class BootNettyClientController {
/**
* 给所有客户端发送消息
* @param content
* @return
*/
@PostMapping("/reportAllClientDataToServer")
public String reportAllClientDataToServer(@RequestParam(name="content", required = true) String content) {
for (Map.Entry<String, ChannelHandlerContext> entry : BootNettyClientGroupCache.groupMapCache.entrySet()) {
ChannelHandlerContext ctx = entry.getValue();
ctx.writeAndFlush(Unpooled.buffer().writeBytes(content.getBytes()));
}
return "ok";
}
/**
* 停止指定的客户端
* @param code
* @return
* @throws InterruptedException
*/
@PostMapping("/stopStationByCode")
public String downDataToClient(@RequestParam(name="code", required = true) String code) throws InterruptedException {
ChannelHandlerContext ctx = BootNettyClientGroupCache.get(code);
ctx.close();
BootNettyClientGroupCache.remove(code);
return "success";
}
}
- 主要是提供两个测试方法,可以通过apifox调试工具进行模拟请求
缓存tcp链接
package com.test;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* wu
*/
public class BootNettyClientGroupCache {
/**
* 存放所有的连接,key是转发id,value是对应的数据
*/
public static volatile Map<String, ChannelHandlerContext> groupMapCache = new ConcurrentHashMap<String, ChannelHandlerContext>();
public static void add(String code, ChannelHandlerContext group){
groupMapCache.put(code,group);
}
public static ChannelHandlerContext get(String code){
return groupMapCache.get(code);
}
public static void remove(String code){
groupMapCache.remove(code);
}
public static void save(String code, ChannelHandlerContext channel) {
if(groupMapCache.get(code) == null) {
add(code,channel);
}
}
}
- 存放所有的通道