Java BIO、NIO(通信/群聊系统、零拷贝)、AIO
BIO、NIO、AIO特点和场景
BIO(Blocking I/O)、NIO(Non-blocking I/O)、AIO(Asynchronous I/O)是Java中用于处理I/O操作的三种不同的I/O模型,它们具有不同的特点和适用场景。
-  BIO(Blocking I/O): - BIO是传统的I/O模型,它是同步阻塞的,也就是说当一个I/O操作发生时,应用程序会被阻塞,直到操作完成。
- 在BIO中,每个I/O操作通常需要一个单独的线程来处理,这意味着如果有大量并发的I/O操作,就需要创建大量线程,这可能会导致资源消耗和性能问题。
- BIO适用于简单的网络通信场景,但在高并发情况下,性能较差。
  
 
-  NIO(Non-blocking I/O): -  NIO是一种更现代的I/O模型,它引入了非阻塞的概念,允许一个线程处理多个I/O操作。NIO使用了选择器(Selector)来监听多个通道的事件,从而减少了线程的数量。 
-  在NIO中,应用程序可以异步地发起I/O操作,然后继续执行其他任务,而不需要等待I/O操作完成。这提高了并发性能,特别适用于需要处理大量并发连接的网络应用程序。 
-  NIO适用于构建高性能的网络服务器和客户端,如Netty等框架。  
 
-  
-  AIO(Asynchronous I/O): - AIO是Java 7引入的一种异步I/O模型,它进一步提高了I/O操作的效率。与NIO不同,AIO的异步I/O操作可以在后台完成,应用程序不需要等待。
- AIO适用于那些需要大量I/O操作并且不希望阻塞主线程的应用程序,比如高性能的文件和网络服务器。
- 在AIO中,操作系统负责通知应用程序I/O操作的完成,而不需要应用程序不断地轮询或等待。
 
总结:
- BIO适用于简单的同步I/O操作,但在高并发场景下性能较差。
- NIO适用于需要处理大量并发连接的网络应用程序,提供了非阻塞I/O和选择器的支持。
- AIO适用于需要高性能异步I/O操作的应用程序,它可以在后台进行I/O操作而不阻塞主线程。
对比表
| 特点 | BIO (Blocking I/O) | NIO (Non-blocking I/O) | AIO (Asynchronous I/O) | 
|---|---|---|---|
| 阻塞 | 阻塞式 I/O | 非阻塞式 I/O | 异步 I/O | 
| 并发处理 | 低并发处理 | 中等并发处理 | 高并发处理 | 
| 编程模型 | 同步编程模型 | 同步和部分异步编程模型 | 异步编程模型 | 
| 缓冲区 | 需要自己手动管理缓冲区 | 使用缓冲区进行数据传输 | 使用缓冲区进行数据传输 | 
| 文件操作支持 | 支持 | 支持 | 支持 | 
| 网络操作支持 | 支持 | 支持 | 支持 | 
| 效率 | 低效率,线程池开销大 | 相对高效,线程开销小 | 高效率,异步处理 | 
| 适用场景 | 低并发、少连接数 | 中等并发、中等连接数 | 高并发、大连接数 | 
| 复杂性 | 相对简单 | 相对复杂 | 较复杂 | 
| 主要类和接口 | InputStream/OutputStream | Selector/Channel | AsynchronousChannel | 
| 典型应用 | 单线程或多线程处理少量连接 | 网络服务器、文件操作等 | 高性能服务器、文件操作等 | 
总的来说,BIO适用于连接数较少的场景,NIO适用于中等并发场景,而AIO适用于高并发的异步操作场景,如网络服务器、文件传输等。选择适当的I/O模型取决于应用程序的性能和并发需求。
BIO 案例
telnet客户端连接测试
cmd telnet [ip] [端口]
可以输入字符发送,或者ctrl+],然后使用send命令,send hello

启动Telnet客户端

服务端逻辑
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BIOServer {
    public static void main(String[] args) throws Exception {
        //线程池机制思路
        //1. 创建一个线程池
        //2. 如果有客户端连接,就创建一个线程,与之通讯
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        //创建ServerSocket
        ServerSocket serverSocket = new ServerSocket(8989);
        System.out.println("服务器启动了");
        while (true) {
            System.out.println("线程信息id = " + Thread.currentThread().getId() + "名字 = " + Thread.currentThread().getName());
            //监听,等待客户端连接
            System.out.println("等待连接....");
            //会阻塞在accept()
            final Socket socket = serverSocket.accept();
            System.out.println("连接到一个客户端");
            //就创建一个线程,与之通讯(单独写一个方法)
            newCachedThreadPool.execute(() -> {
                //可以和客户端通讯
                handler(socket);
            });
        }
    }
    /**
     * 与客户端通讯
     *
     * @param socket
     */
    public static void handler(Socket socket) {
        try {
            System.out.println("线程信息id = " + Thread.currentThread().getId() + "名字 = " + Thread.currentThread().getName());
            byte[] bytes = new byte[1024];
            //通过socket获取输入流
            InputStream inputStream = socket.getInputStream();
            //循环的读取客户端发送的数据
            while (true) {
                System.out.println("线程信息id = " + Thread.currentThread().getId() + "名字 = " + Thread.currentThread().getName());
                System.out.println("read....");
                //没有可读数据时会阻塞
                int read = inputStream.read(bytes);
                if (read != -1) {
                    System.out.println(new String(bytes, 0, read));//输出客户端发送的数据
                } else {
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println("关闭和client的连接");
            try {
                socket.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
总结
- 每个请求都需要创建独立的线程,与对应的客户端进行数据 Read,业务处理,数据Write。
- 当并发数较大时,需要创建大量线程来处理连接,系统资源占用较大。
- 连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在 Read操作上,造成线程资源浪费。
NIO三大核心组件关系

Channel
Channel是用于进行非阻塞I/O操作的抽象,它代表了一个可以进行读写操作的数据源或数据目的地。Channel提供了一种通用的、统一的接口,用于与不同类型的I/O设备(如文件、套接字、管道等)进行交互。
以下是关于Channel的一些主要特点和常见的类型:
-  不同类型的 Channel:- FileChannel:用于文件的读写操作,支持文件的随机读写。
- SocketChannel:用于TCP套接字通信,可以连接到远程服务器或接受客户端连接。
- ServerSocketChannel:用于监听传入的TCP连接请求,允许服务器接受客户端连接。
- DatagramChannel:用于UDP通信,支持无连接的数据包传输。
- Pipe.SinkChannel和- Pipe.SourceChannel:用于在同一虚拟机内的两个线程之间进行通信,支持管道传输。
 
-  通用操作: - 所有类型的Channel都支持通用的操作,如读取数据到Buffer、从Buffer写入数据、关闭通道等。
- Channel的读写操作通常是非阻塞的,因此可以在一个线程中同时处理多个- Channel。
 
- 所有类型的
-  FileChannel: - FileChannel是用于文件的I/O操作的主要通道类型。
- 可以使用FileChannel来读取、写入、映射文件以及文件锁定等。
- FileChannel支持文件的随机访问,可以通过- position()方法设置读写位置。
 
-  SocketChannel 和 ServerSocketChannel: - SocketChannel用于客户端套接字通信,可以连接到远程服务器。
- ServerSocketChannel用于服务器套接字通信,用于监听传入的连接请求并创建对应的- SocketChannel来处理客户端连接。
 
-  DatagramChannel: - DatagramChannel用于UDP通信,支持无连接的数据包传输。
- 通过DatagramChannel可以发送和接收UDP数据报。
 
-  Pipe: - Pipe.SinkChannel和- Pipe.SourceChannel通常用于在同一虚拟机内的两个线程之间进行通信。
- 一个线程将数据写入SinkChannel,另一个线程从SourceChannel读取数据。
 
总之,Channel是Java NIO中用于进行非阻塞I/O操作的关键组件之一,它提供了统一的接口,可以用于处理不同类型的I/O设备。开发者可以根据应用程序的需求选择合适类型的Channel,并使用它们来进行数据的读写操作,从而实现高效的I/O通信。
Buffer
Buffer是用于进行数据读取和写入的重要抽象。它是Java NIO(New I/O)库的一部分,用于处理非阻塞I/O操作。Buffer提供了一种方便的方式来操作底层数据源(通常是字节数组)的数据,同时可以在不同的I/O通道之间传递数据。
以下是关于NIO中的Buffer的主要特点和用法:
-  缓冲区类型:Java NIO提供了多种类型的缓冲区,主要有以下几种: - ByteBuffer:用于处理字节数据。
- CharBuffer:用于处理字符数据。
- ShortBuffer、- IntBuffer、- LongBuffer:分别用于处理短整数、整数和长整数数据。
- FloatBuffer、- DoubleBuffer:用于处理浮点数和双精度浮点数数据。
- 这些不同类型的缓冲区都是Buffer的子类,提供了不同数据类型的读写方法。
 
-  容量(Capacity):每个缓冲区都有一个固定的容量,表示它可以存储的数据元素数量。容量在缓冲区创建时被指定,并且不能更改。 
-  位置(Position):位置表示下一个读取或写入操作将从缓冲区的哪个位置开始。初始位置通常为0,并且通过读取或写入数据而逐渐增加。位置不能超过缓冲区的容量。 
-  限制(Limit):限制是一个不可读写的索引,表示在读取或写入操作时不能超过的位置。初始限制通常等于容量,但可以通过设置限制来限制读取或写入的数据范围。 
-  标记(Mark):标记是一个临时的索引,可以通过 mark()方法设置,并在需要时通过reset()方法回到标记的位置。标记的主要用途是在读取或写入一些数据后能够返回到之前的位置。
-  读写操作: Buffer提供了一系列读取和写入数据的方法,如get()、put()、read()、write()等。这些方法根据当前位置(Position)和限制(Limit)来确定读写的数据范围。
-  翻转(Flip):通过 flip()方法,可以将缓冲区从写模式切换到读模式。这个操作会将限制设置为当前位置,并将位置重置为0,准备开始读取数据。
-  清空(Clear):通过 clear()方法,可以将缓冲区从读模式切换到写模式。这个操作会将限制设置为容量,位置重置为0,准备开始写入数据。
-  压缩(Compact):通过 compact()方法,可以将未读取的数据复制到缓冲区的开头,然后将位置设置为复制后的数据后面。这个操作用于在写模式下向缓冲区写入数据之前,压缩未读取的数据。
-  重绕(Rewind):通过 rewind()方法,可以将位置重置为0,保持限制不变,进入写模式。这个操作用于重新读取已经读取过的数据。
-  标记和重置: mark()方法用于设置标记位置,reset()方法用于将位置重置为标记位置。
Buffer在Java NIO中是一个重要的基础构建块,它允许开发者高效地进行数据的读取和写入操作,特别适用于非阻塞I/O编程。在使用Buffer时,需要注意正确管理位置、限制、标记和容量,以确保数据的正确读写。
Selector
Selector是一个多路复用器,用于管理多个Channel的事件,以便一个线程可以同时处理多个通道上的I/O操作。Selector是非常重要的,因为它可以帮助实现非阻塞I/O,提高网络和文件I/O的效率。
以下是关于Selector的一些主要特点和用法:
-  多路复用: Selector允许一个线程同时监视多个Channel上的I/O事件,如读就绪、写就绪、连接就绪、接收就绪等。通过使用Selector,一个线程可以有效地管理多个I/O通道,而不需要为每个通道创建一个单独的线程。
-  SelectableChannel:SelectableChannel是可以在Selector上注册的Channel的子类,例如SocketChannel、ServerSocketChannel、DatagramChannel等。通过将Channel注册到Selector上,可以监视它们的I/O事件。
-  SelectionKey:当一个Channel被注册到Selector上时,会返回一个SelectionKey对象,它表示了该Channel在Selector上的注册状态和感兴趣的事件。SelectionKey包括事件类型(如读、写、连接、接收)、关联的Channel等信息。
-  select()方法:Selector提供了select()方法,用于阻塞等待至少一个通道在感兴趣的事件上就绪。一旦有一个或多个通道就绪,select()方法会返回,并返回就绪通道的数量。这个方法可以帮助减少CPU的轮询开销。
-  selectedKeys()方法:Selector还提供了selectedKeys()方法,用于获取就绪的SelectionKey集合,然后可以遍历这些SelectionKey来处理相应的事件。
-  register()方法:SelectableChannel可以通过register()方法将自己注册到Selector上,并指定感兴趣的事件类型。通常在创建Channel后,需要将其注册到一个Selector上才能开始监视事件。
-  cancel()方法:通过SelectionKey的cancel()方法可以取消注册的通道,停止监视相应的事件。
-  非阻塞操作:使用 Selector可以实现非阻塞I/O操作,当一个通道就绪时,可以处理它的I/O事件,而不需要等待。这允许一个线程有效地管理多个通道的状态。
Selector在高性能的网络服务器、文件I/O操作以及需要管理多个通道的应用程序中非常有用。通过结合Selector、Channel和Buffer等NIO组件,可以实现高效的非阻塞I/O编程,提高应用程序的性能和响应性。但需要注意,正确使用Selector需要处理一些细节,如注册和取消注册通道、处理事件等,以确保程序的正确性和可维护性。
Buffer 缓冲区
这四个字段 mark、position、limit 和 capacity 是 Buffer 类中非常重要的属性,它们用于管理缓冲区的状态和限制。以下是对每个字段的详细解释:
-  mark:mark是一个标记位置,用于记录缓冲区中的某个特定位置。初始值为-1,表示没有设置标记。你可以通过mark()方法来设置标记位置,然后通过reset()方法来恢复到标记位置。这对于在读写模式之间切换时定位到特定位置非常有用。
-  position:position表示当前读取或写入的位置。初始值为0,即缓冲区的起始位置。position在读写操作中会随着操作的进行而递增,标记了下一个要读取或写入的位置。
-  limit:limit表示读取或写入操作的限制位置。它限制了可以读取或写入的最大位置。初始时,limit通常等于缓冲区的容量,表示可以读取或写入整个缓冲区。当使用flip()方法切换到读模式时,limit会被设置为当前的position,这样就限制了读取操作只能在已写入数据的范围内进行。limit的值可以随时进行修改以改变读写操作的限制。
-  capacity:capacity表示缓冲区的容量,即它可以容纳的数据元素的数量。一旦分配,缓冲区的容量通常不会更改。
这些字段共同管理了缓冲区的状态,允许你在读取和写入操作之间进行切换,并确保操作在正确的范围内进行。例如,position 指示了下一个要读取或写入的位置,而 limit 限制了操作的范围,mark 允许你在需要时回到特定位置。
在使用 Buffer 类时,了解和管理这些字段的值对于正确操作缓冲区非常重要。通常,你会使用 flip()、clear()、rewind()、mark()、reset() 等方法来更改和管理这些字段的值,以满足你的读写需求。
Channel 通道
FileChannel 类
FileChannel 是 Java NIO 中用于文件 IO 操作的一个重要类,它提供了对文件的读取和写入操作。FileChannel 通常与 ByteBuffer 一起使用,以高效地处理文件的读写。
-  创建 FileChannel: 若要创建一个 FileChannel对象,通常需要通过FileInputStream或FileOutputStream来获取。例如,要创建一个用于读取文件的FileChannel,可以使用以下代码:javaCopy codeFileInputStream fileInputStream = new FileInputStream("example.txt"); FileChannel fileChannel = fileInputStream.getChannel();同样,要创建一个用于写入文件的 FileChannel,可以使用FileOutputStream。
-  FileChannel 的读取和写入: FileChannel提供了一系列方法来执行读取和写入操作,其中最常见的包括:- int read(ByteBuffer dst):从文件中读取数据到给定的- ByteBuffer。
- int write(ByteBuffer src):将- ByteBuffer中的数据写入到文件。
- long position()和- FileChannel position(long newPosition):获取或设置当前的文件位置。
- long size():获取文件的大小。
- int read(ByteBuffer dst, long position)和- int write(ByteBuffer src, long position):在指定位置读取或写入数据。
 
-  FileChannel 的文件锁定: FileChannel还支持文件锁定机制,允许你在多个线程或进程之间控制文件的访问。通过FileChannel的lock()和tryLock()方法,你可以请求文件的共享锁或排它锁。这对于确保同时只有一个进程能够对文件进行写入操作非常有用。
-  FileChannel 的关闭: 当你完成文件操作后,应该及时关闭 FileChannel以释放资源。使用close()方法可以关闭FileChannel。
-  适用场景: FileChannel主要用于文件 IO 操作,特别适用于需要高性能和大文件处理的场景。与传统的InputStream和OutputStream相比,FileChannel更为灵活,对于高并发的文件读写操作也更高效。
应用实例
本地文件写入、读取、单缓冲区读写、拷贝案例
import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.io.File;
import java.io.FileInputStream;
public class NIOFileChannel {
    public static void main(String[] args) throws Exception {
        NIOFileChannel nio = new NIOFileChannel();
//        nio.writeFile("c:\\file01.txt");
//        nio.readFile("c:\\file01.txt");
//        nio.readAndWriteFile("C:\\tmp\\nio\\1.txt", "C:\\tmp\\nio\\2.txt");
        nio.copyFile("C:\\tmp\\nio\\1.txt", "C:\\tmp\\nio\\22.txt");
    }
    /**
     * 写入文件数据
     *
     * @param filePath 文件路径
     */
    public void writeFile(String filePath) throws Exception {
        String str = "hello, world";
        //创建一个输出流 -> channel
        FileOutputStream fileOutputStream = new FileOutputStream(filePath);
        //通过 fileOutputStream 获取对应的 FileChannel
        //这个 fileChannel 真实类型是 FileChannelImpl
        FileChannel fileChannel = fileOutputStream.getChannel();
        //创建一个缓冲区 ByteBuffer
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        //将 str 放入 byteBuffer
        byteBuffer.put(str.getBytes());
        //对 byteBuffer 进行 flip
        byteBuffer.flip();
        //将 byteBuffer 数据写入到 fileChannel
        fileChannel.write(byteBuffer);
        fileOutputStream.close();
    }
    /**
     * 读取文件数据
     *
     * @param filePath 文件路径
     */
    public void readFile(String filePath) throws Exception {
        //创建文件的输入流
        File file = new File(filePath);
        FileInputStream fileInputStream = new FileInputStream(file);
        //通过 fileInputStream 获取对应的 FileChannel -> 实际类型 FileChannelImpl
        FileChannel fileChannel = fileInputStream.getChannel();
        //创建缓冲区
        ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());
        //将通道的数据读入到 Buffer
        fileChannel.read(byteBuffer);
        //将 byteBuffer 的字节数据转成 String
        System.out.println(new String(byteBuffer.array()));
        fileInputStream.close();
    }
    /**
     * 使用一个 Buffer 从通道中读取数据,并将数据写入到另一个通道中
     *
     * @param filePath1 读取文件路径
     * @param filePath2 待写入文件路径
     */
    public void readAndWriteFile(String filePath1, String filePath2) throws Exception {
        FileInputStream fileInputStream = new FileInputStream(filePath1);
        FileChannel fileChannel01 = fileInputStream.getChannel();
        FileOutputStream fileOutputStream = new FileOutputStream(filePath2);
        FileChannel fileChannel02 = fileOutputStream.getChannel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(512);
        //循环读取
        while (true) {
            //清空 buffer
            byteBuffer.clear();
            int read = fileChannel01.read(byteBuffer);
            System.out.println("read = " + read);
            //读完
            if (read == -1) {
                break;
            }
            //将 buffer 中的数据写入到 fileChannel02--2.txt
            byteBuffer.flip();
            fileChannel02.write(byteBuffer);
        }
        //关闭相关的流
        fileInputStream.close();
        fileOutputStream.close();
    }
    /**
     * 拷贝文件
     *
     * @param filePath1 源文件路径
     * @param filePath2 待拷贝文件路径
     */
    public static void copyFile(String filePath1, String filePath2) throws Exception {
        //创建相关流
        FileInputStream fileInputStream = new FileInputStream(filePath1);
        FileOutputStream fileOutputStream = new FileOutputStream(filePath2);
        //获取各个流对应的 FileChannel
        FileChannel sourceCh = fileInputStream.getChannel();
        FileChannel destCh = fileOutputStream.getChannel();
        //使用 transferForm 完成拷贝
        destCh.transferFrom(sourceCh, 0, sourceCh.size());
        //关闭相关通道和流
        sourceCh.close();
        destCh.close();
        fileInputStream.close();
        fileOutputStream.close();
    }
}
关于 Buffer 和 Channel 的注意事项和细节
ByteBuffer 支持类型化的 put 和 get,put 放入的是什么数据类型,get 就应该使用相应的数据类型来取出,否则可能有 BufferUnderflowException 异常。
可以将一个普通 Buffer 转成只读 Buffer
NIO 还提供了 MappedByteBuffer,可以让文件直接在内存(堆外的内存)中进行修改,而如何同步到文件由 NIO 来完成。
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
/**
 * MappedByteBuffer 可让文件直接在内存(堆外内存)修改,操作系统不需要拷贝一次
 */
public class MappedByteBufferTest {
    public static void main(String[] args) throws IOException {
        modifyFile("C:\\tmp\\nio\\1.txt");
    }
    // 直接在内存中修改文件
    public static void modifyFile(String filePath) throws IOException {
        RandomAccessFile randomAccessFile = new RandomAccessFile(filePath, "rw");
        //获取对应的通道
        FileChannel channel = randomAccessFile.getChannel();
        /**
         * FileChannel.MapMode.READ_WRITE表示映射后的内存可以读取和写入。
         * 0表示从文件的起始位置开始映射。
         * 5表示映射到内存的大小,即将文件的前5个字节映射到内存中。这意味着我们可以在内存中修改这5个字节的内容。
         */
        MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
        mappedByteBuffer.put(0, (byte) 'H');
        mappedByteBuffer.put(3, (byte) '9');
        // 这段代码中,尝试将第6个字节的内容修改为字符 'Y',但是我们之前只映射了5个字节的数据,所以会抛出越界异常
//        mappedByteBuffer.put(5, (byte) 'Y');
        randomAccessFile.close();
        System.out.println("修改成功");
    }
}
前面的读写操作案例,都是通过一个 Buffer 完成的,NIO 还支持通过多个 Buffer(即 Buffer数组)完成读写操作,即 Scattering 和 Gathering
Selector 选择器
Selector(选择器)是Java NIO中的一个关键类,用于实现非阻塞I/O操作。它允许一个线程同时管理多个通道(Channel),从而能够高效地处理多个客户端连接和请求。以下是关于Selector的总结和要点:
基本介绍:
- Selector是Java NIO中的核心组件,用于实现非阻塞I/O。
- 它能够监测多个注册的通道上是否有事件发生,并响应这些事件。
- 多个通道可以注册到同一个Selector中,实现多路复用,从而提高系统的并发性能。
- 使用Selector可以避免为每个连接创建一个线程,减少系统开销和线程切换的开销。
Selector的特点和用途:
- Selector允许单线程管理多个通道,即管理多个连接和请求。
- 通过Selector,线程可以在某个通道上进行读写操作,如果没有数据可用,则线程可以在其他通道上执行IO操作,从而充分利用非阻塞I/O的特性。
- 非阻塞I/O避免了频繁的I/O阻塞,提高了线程的运行效率。
- 一个I/O线程可以并发处理多个客户端连接和读写操作,解决了传统同步阻塞I/O模型中一连接一线程的性能问题。
Selector的相关方法:
- selector.select():阻塞方法,等待至少一个通道准备就绪,然后返回就绪通道的数量。
- selector.select(timeout):阻塞方法,等待一段时间(timeout毫秒)或至少一个通道准备就绪,然后返回就绪通道的数量。
- selector.wakeup():唤醒阻塞在- select()方法上的线程,用于强制使- select()方法立即返回。
- selector.selectNow():非阻塞方法,立即返回,不等待任何通道就绪。
总的来说,Selector是Java NIO中非常重要的组件,用于实现高性能的非阻塞I/O操作。它可以让一个线程管理多个通道,处理多个客户端连接,提高系统的并发性能和可扩展性。Selector的使用有助于避免多线程之间的上下文切换开销,提高系统的响应速度。
NIO 网络客户端/服务端案例
服务端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.SelectionKey;
import java.util.Iterator;
import java.util.Set;
public class NIOServer {
    public static void main(String[] args) throws IOException {
        // 创建ServerSocketChannel 用于监听客户端连接的通道
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(7777));
        serverSocketChannel.configureBlocking(false);
        // 创建Selector
        Selector selector = Selector.open();
        // 将ServerSocketChannel注册到Selector,关注OP_ACCEPT事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            // 监听事件
            int selectCount = selector.select();
            if (selectCount == 0) {
                continue;
            }
            // 获取触发事件的SelectionKey
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                if (key.isAcceptable()) {
                    // 处理连接请求
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector, SelectionKey.OP_READ);
                } else if (key.isReadable()) {
                    // 处理读事件
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    int bytesRead = socketChannel.read(buffer);
                    if (bytesRead == -1) {
                        socketChannel.close();
                        key.cancel();
                    } else if (bytesRead > 0) {
                        buffer.flip();
                        byte[] data = new byte[buffer.remaining()];
                        buffer.get(data);
                        System.out.println("Received: " + new String(data));
                    }
                }
                keyIterator.remove();
            }
        }
    }
}
客户端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
public class NIOClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        for (int i = 0; i < 5; i++) {
            int tmpI = i;
            new Thread(() -> {
                try {
                    sendMsg("hello" + tmpI);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }).start();
        }
    }
    public static void sendMsg(String msg) throws IOException {
        // 创建SocketChannel
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        // 连接服务器
        if (!socketChannel.connect(new InetSocketAddress("127.0.0.1", 7777))) {
            while (!socketChannel.finishConnect()) {
                System.out.println("连接服务器需要时间,可以做其他工作...");
            }
        }
        // 发送数据给服务器
        ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
        socketChannel.write(buffer);
        // 接收服务器的响应
        ByteBuffer responseBuffer = ByteBuffer.allocate(1024);
        int bytesRead = socketChannel.read(responseBuffer);
        if (bytesRead > 0) {
            responseBuffer.flip();
            byte[] responseData = new byte[responseBuffer.remaining()];
            responseBuffer.get(responseData);
            System.out.println("Server Response: " + new String(responseData));
        }
        // 关闭SocketChannel
        socketChannel.close();
    }
}
NIO 群聊系统
服务端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
public class GroupChatServer {
    private Selector selector;
    private ServerSocketChannel listenChannel;
    private static final int PORT = 6667;
    public GroupChatServer() {
        try {
            // 创建Selector
            selector = Selector.open();
            // 创建ServerSocketChannel
            listenChannel = ServerSocketChannel.open();
            // 绑定服务器端口
            listenChannel.socket().bind(new InetSocketAddress(PORT));
            // 设置为非阻塞模式
            listenChannel.configureBlocking(false);
            // 注册ServerSocketChannel到Selector,关注连接事件
            listenChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public void listen() {
        try {
            while (true) {
                // 监听事件
                int count = selector.select(2000);
                if (count > 0) {
                    // 获取触发事件的SelectionKey集合
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        if (key.isAcceptable()) {
                            // 处理连接请求
                            SocketChannel clientChannel = listenChannel.accept();
                            clientChannel.configureBlocking(false);
                            clientChannel.register(selector, SelectionKey.OP_READ);
                            System.out.println(clientChannel.getRemoteAddress() + " 上线");
                        }
                        if (key.isReadable()) {
                            // 处理读事件
                            readData(key);
                        }
                        iterator.remove();
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            // 处理异常后的操作
        }
    }
    private void readData(SelectionKey key) {
        SocketChannel channel = null;
        try {
            channel = (SocketChannel) key.channel();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int count = channel.read(buffer);
            if (count > 0) {
                String message = new String(buffer.array());
                System.out.println("收到来自 " + channel.getRemoteAddress() + " 的消息: " + message);
                sendToOtherClients(message, channel);
            }
        } catch (IOException e) {
            try {
                System.out.println(channel.getRemoteAddress() + " 离线");
                key.cancel();
                channel.close();
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }
    private void sendToOtherClients(String message, SocketChannel selfChannel) throws IOException {
        System.out.println("服务器转发消息中...");
        for (SelectionKey key : selector.keys()) {
            Channel targetChannel = key.channel();
            if (targetChannel instanceof SocketChannel && targetChannel != selfChannel) {
                SocketChannel dest = (SocketChannel) targetChannel;
                ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
                dest.write(buffer);
            }
        }
    }
    public static void main(String[] args) {
        GroupChatServer server = new GroupChatServer();
        server.listen();
    }
}
客户端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
public class GroupChatClient {
    private final String HOST = "127.0.0.1";
    private final int PORT = 6667;
    private Selector selector;
    private SocketChannel socketChannel;
    private String username;
    public GroupChatClient() {
        try {
            // 创建Selector
            selector = Selector.open();
            // 创建SocketChannel并连接服务器
            socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));
            // 设置为非阻塞模式
            socketChannel.configureBlocking(false);
            // 注册SocketChannel到Selector,关注读事件
            socketChannel.register(selector, SelectionKey.OP_READ);
            // 获取本地Socket地址作为用户名
            username = socketChannel.getLocalAddress().toString().substring(1);
            System.out.println(username + " is online.");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public void sendInfo(String info) {
        info = username + " 说: " + info;
        try {
            // 发送消息给服务器
            socketChannel.write(ByteBuffer.wrap(info.getBytes()));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public void readInfo() {
        try {
            int count = selector.select();
            if (count > 0) {
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    if (key.isReadable()) {
                        SocketChannel channel = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        int len = channel.read(buffer);
                        if (len > 0) {
                            System.out.println(new String(buffer.array(), 0, len));
                        }
                    }
                    iterator.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
        GroupChatClient chatClient = new GroupChatClient();
        // 启动一个线程来接收服务器和其他客户端的消息
        new Thread(() -> {
            while (true) {
                chatClient.readInfo();
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        // 主线程用于发送消息到服务器
        while (true) {
            // 从控制台读取用户输入
            java.util.Scanner scanner = new java.util.Scanner(System.in);
            String message = scanner.nextLine();
            chatClient.sendInfo(message);
        }
    }
}
NIO 结合零拷贝
NIO(New I/O)和零拷贝(Zero-Copy)是两个与I/O操作相关的概念,它们可以在高性能的数据传输和处理中发挥重要作用。
零拷贝(Zero-Copy):
零拷贝是一种技术,旨在在数据传输和处理过程中尽量减少或消除数据的拷贝操作。它通过直接将数据从一个位置传输到另一个位置,而不需要在中间进行复制。零拷贝通常与操作系统和硬件的支持相关,以提供最佳性能。
主要特点和优点:
- 减少数据拷贝:零拷贝技术避免了数据在内存之间的复制,从而减少了CPU和内存带宽的开销。
- 提高性能:通过减少数据拷贝和内存复制,零拷贝可以提高数据传输和处理的性能。
- 节省资源:减少了不必要的CPU和内存资源消耗,特别适用于大规模的数据传输和处理任务。
零拷贝通常与操作系统的底层API和硬件的支持密切相关,例如使用mmap映射文件、sendfile系统调用等。
在实际应用中,NIO和零拷贝可以结合使用,以提高网络应用程序的性能。例如,你可以使用NIO来管理多个网络连接,并且使用零拷贝来传输大文件或数据块,以避免不必要的数据复制和传输开销。这两个概念在高性能和高吞吐量的应用程序中都非常有用。
案例
服务端
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
public class NIOServer {
    public static void main(String[] args) throws Exception {
        InetSocketAddress address = new InetSocketAddress(7001);
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(address);
        // 创建buffer
        ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
        while (true) {
            SocketChannel socketChannel = serverSocketChannel.accept();
            int readcount = 0;
            while (-1 != readcount) {
                try {
                    readcount = socketChannel.read(byteBuffer);
                } catch (Exception ex) {
                    // ex.printStackTrace();
                    break;
                }
                //
                byteBuffer.rewind(); // 倒带 position = 0 mark 作废
            }
            if (readcount > 0) {
                System.out.println("收到数据长度:" + readcount);
            }
        }
    }
}
客户端
package com.zxbd.project.test;
import java.io.FileInputStream;
import java.net.InetSocketAddress;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
public class NIOClient {
    public static void main(String[] args) throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("localhost", 7001));
        String filename = "C:\\Users\\22720\\Downloads\\历史数据.xls";
        // 得到一个文件channel
        FileChannel fileChannel = new FileInputStream(filename).getChannel();
        // 准备发送
        long startTime = System.currentTimeMillis();
        // 在Linux下一个transferTo方法就可以完成传输
        // 在Windows下一次调用transferTo只能发送8m,就需要分段传输文件
        // transferTo底层使用到零拷贝
        long transferCount = fileChannel.transferTo(0, fileChannel.size(), socketChannel);
        System.out.println("发送的总的字节数 = " + transferCount + " 耗时: " + (System.currentTimeMillis() - startTime));
        // 关闭
        fileChannel.close();
    }
}
AIO
Java AIO(Asynchronous I/O,异步 I/O)是一种在 Java 中进行非阻塞 I/O 操作的方式,用于处理大量的并发连接或文件操作。与传统的同步 I/O 不同,它允许应用程序在等待 I/O 操作完成时执行其他任务,而不会阻塞整个线程或进程。
以下是 Java AIO 的一些基本介绍:
-  异步操作:Java AIO 提供了异步操作的支持,允许应用程序发起 I/O 请求后继续执行其他任务,而不必等待 I/O 操作完成。当 I/O 操作完成时,系统会通知应用程序。 
-  NIO(New I/O)基础:Java AIO 构建在 Java NIO 的基础之上。它使用了通道(Channel)和缓冲区(Buffer)的概念,允许应用程序将数据从通道读取到缓冲区,或将数据从缓冲区写入通道。 
-  异步事件驱动:Java AIO 采用了事件驱动的方式来处理异步 I/O 操作。应用程序需要注册事件监听器,以便在 I/O 操作完成时接收通知。这种方式可以有效地管理大量的并发连接。 
-  高性能:由于异步操作的特性,Java AIO 可以实现高性能的 I/O 处理,特别适用于处理大量并发连接或需要高吞吐量的应用场景,如网络服务器、文件传输等。 
-  复杂性:相对于传统的同步 I/O,Java AIO 的编程模型可能更复杂一些,因为它涉及到事件监听、回调函数等概念。但一旦习惯了这种模型,它可以提供更好的性能和可伸缩性。 
-  主要类和接口:Java AIO 的主要类和接口包括 AsynchronousSocketChannel、AsynchronousServerSocketChannel、AsynchronousFileChannel等,以及与异步事件处理相关的回调接口。
示例代码如下所示:
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
public class AIOExample {
    public static void main(String[] args) throws Exception {
        // 打开异步文件通道
        AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(
                Paths.get("C:\\tmp\\nio\\1.txt"),
                StandardOpenOption.READ
        );
        // 创建读取操作的缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        // 发起异步读取操作
        fileChannel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                System.out.println("读取完成,读取字节数:" + result);
                // 处理读取的数据
                attachment.flip();
                while (attachment.hasRemaining()) {
                    System.out.print((char) attachment.get());
                }
                System.out.println();
                // 关闭文件通道
                try {
                    fileChannel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                System.err.println("读取失败:" + exc.getMessage());
                // 关闭文件通道
                try {
                    fileChannel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        System.out.println("主线程111");
    }
}
上面的示例演示了如何使用 Java AIO 进行异步文件读取操作。当读取操作完成时,completed 方法将被调用,而当操作失败时,failed 方法将被调用。这种异步编程模型可以用于处理各种类型的异步 I/O 操作。










