SpringBoot 集成 Netty
文章目录
背景描述
- 如果需要在
SpringBoot开发的app中,提供Socket服务,那么Netty是不错的选择。
Netty与SpringBoot整合关注点
Netty跟Springboot生命周期保持一致,同生共死Netty能用上ioc中的BeanNetty能读取到全局的配置
Netty组件
Bootstrap、ServerBootstrap
- 帮助
Netty 使用者更加方便地组装和配置 Netty ,也可以更方便地启动 Netty 应用程序 Bootstrap 用于启动一个 Netty TCP 客户端,或者 UDP 的一端。ServerBootstrap 往往是用于启动一个 Netty 服务端。
Channel
Channel 是 Netty 网络操作抽象类,它除了包括基本的 I/O 操作,如 bind、connect、read、write 之外,还包括了 Netty 框架相关的一些功能,如获取该 Channel 的 EventLoop 。- 其实就是我们平常网络编程中经常使用的
socket套接字对象
EventLoop、EventLoopGroup
EventLoop定义了Netty的核心对象,用于处理IO事件,多线程模型、并发- 一个
EventLoopGroup包含一个或者多个EventLoop - 一个
EventLoop在它的生命周期内只和一个Thread绑定 - 所有有
EventLoop处理的I/O事件都将在它专有的Thread上被处理 - 一个
Channel在它的生命周期内只注册于一个EventLoop - 一个
EventLoop可能会被分配给一个或者多个Channel
ChannelHandler
ChannelHandler其实就是用于负责处理接收和发送数据的的业务逻辑,Netty中可以注册多个handler,以链式的方式进行处理,根据继承接口的不同,实现的顺序也不同。ChannelHandler主要用于对出站和入站数据进行处理,它有两个重要的子接口:
ChannelInboundHandler——处理入站数据ChannelOutboundHandler——处理出站数据
ChannelPipeline
ChannelPipeline是ChannelHandler的容器,通过ChannelPipeline可以将ChannelHandler组织成一个逻辑链,该逻辑链可以用来拦截流经Channel的入站和出站事件,当 Channel被创建时,它会被自动地分配到它的专属的 ChannelPipeline。
ByteBuf
ByteBuf就是字节缓冲区,用于高效处理输入输出。
Pom依赖
- 引入
springboot starter web 和 netty
<!-- SpringBoot 初始化依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.3.5.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.85.Final</version>
</dependency>
Yml 配置
server:
port: 2345
netty:
websocket:
port: 1024
ip: 0.0.0.0
max-frame-size: 10240
path: /channel
整合Netty步骤
服务端
- 使用
SpringBoot Runner 机制启动 Netty 服务。
@Component
@Order
public class NettyStartListener implements ApplicationRunner {
@Resource
private SocketServer socketServer;
@Override
public void run(ApplicationArguments args) {
this.socketServer.start();
}
}
@Component
public class SocketServer {
private static final Logger logger = LoggerFactory.getLogger(SocketServer.class);
private ServerBootstrap serverBootstrap;
@Autowired
private SocketInitializer socketInitializer;
@Value("${netty.websocket.port}")
private int port;
public void start() {
this.init();
this.serverBootstrap.bind(this.port);
logger.info("Netty started on port: {} (TCP) with boss thread {}", this.port, 2);
}
private void init() {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(2);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
this.serverBootstrap = new ServerBootstrap();
this.serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(this.socketInitializer);
}
}
public class SocketHandler extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(SocketHandler.class);
public static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
byte[] data = (byte[]) msg;
log.info("收到消息: " + new String(data));
for (Channel client : clients) {
if (!client.equals(ctx.channel())) {
client.writeAndFlush(data);
}
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
log.info("新的客户端链接:" + ctx.channel().id().asShortText());
clients.add(ctx.channel());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
clients.remove(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.channel().close();
clients.remove(ctx.channel());
}
}
@Component
public class SocketInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new ByteArrayDecoder());
pipeline.addLast(new ByteArrayEncoder());
pipeline.addLast(new SocketHandler());
}
}
客户端
public class ChatClient {
public void start(String name) throws IOException {
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 1024));
socketChannel.configureBlocking(false);
Selector selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_READ);
new Thread(new ClientThread(selector)).start();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String message = scanner.next();
if (StringUtils.hasText(message)) {
socketChannel.write(StandardCharsets.UTF_8.encode(name + ": " + message));
}
}
}
private class ClientThread implements Runnable {
private final Logger logger = LoggerFactory.getLogger(ClientThread.class);
private final Selector selector;
public ClientThread(Selector selector) {
this.selector = selector;
}
@Override
public void run() {
try {
while (true) {
int channels = selector.select();
if (channels == 0) {
continue;
}
Set<SelectionKey> selectionKeySet = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeySet.iterator();
while (keyIterator.hasNext()) {
SelectionKey selectionKey = keyIterator.next();
keyIterator.remove();
if (selectionKey.isReadable()) {
handleRead(selector, selectionKey);
}
}
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
private void handleRead(Selector selector, SelectionKey selectionKey) throws IOException {
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
StringBuilder message = new StringBuilder();
if (channel.read(byteBuffer) > 0) {
byteBuffer.flip();
message.append(StandardCharsets.UTF_8.decode(byteBuffer));
}
channel.register(selector, SelectionKey.OP_READ);
System.out.println(message);
}
}
public static void main(String[] args) throws IOException {
new ChatClient().start("张三");
}