0
点赞
收藏
分享

微信扫一扫

NIO 非阻塞IO

笙烛 2022-04-17 阅读 25
java
/**
 * JAVA NIO
 * NIO称之为非阻塞IO,读写的过程中不会发生阻塞现象
 * 我们之前所学的流,称为BIO:阻塞式IO,就是在读写的过程中可能会发生阻塞现象
 */
public class NIODemo1 {
    public static void main(String[] args) throws IOException {
        /*
            非阻塞IO面向Channel(“通道”)的,不是面向Stream(流)的.

            流的特点:方向单一,顺序读写。流要么是输入流用于顺序读写数据,要么输出流用于顺序写出数据

            Channel的特点:双向的,即可以读也可以写

            JAVA NIO核心API:
            Channel 通道,常见的实现:
            FileChannel:文件通道,可以对文件进行读写操作
            SocketChannel:套接字通道,可以与远端计算机进行TCP读写操作
            ServerSocketChannel:服务端的套接字通道,用于监听客户端的连接

            Buffer缓冲区,通道是对缓冲区中的数据进行读写操作
            常见的缓冲区实现
            ByteBuffer:字节缓冲区:缓冲区内部内容都是字节
         */
        //BIO的文件复制操作,使用流的方式进行复制
        /*
        FileInputStream fis = new FileInputStream("movie.wmv");
        FileOutputStream fos = new FileOutputStream("movie_cp.wmv");
        byte[] buffer = new byte[1024*10];//创建一个字节数组作为缓冲区
        int len;//记录每次实际读取的字节数
        while((len = fis.read(buffer))!=-1){
            fos.write(buffer,0,len);
        }
        System.out.println("复制完毕");
        fis.close();
        fos.close();

         */
        //NIO的文件复制操作
        FileInputStream fis = new FileInputStream("movie.wmv");
        //基于文件输入流获取一个用于读取该文件的文件通道
        FileChannel inChannel = fis.getChannel();


        FileOutputStream fos = new FileOutputStream("movie_cp2.wmv");
        FileChannel outChannel = fos.getChannel();

        //创建一个字节缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(1024*10);//创建一个10k大小缓冲区
        int len;//记录每次实际读取的数据量

        /*
            缓冲区中重要的几个属性:
            position:当前位置,用来表示当前缓冲区已经有多少数据被操作了
            limit:缓冲区最大可以操作的位置
            capacity:容量,缓冲区的大小

            默认创建一个缓冲区时:
            position=0
            limit=capacity
            capacity=创建缓冲区时指定的大小
         */
        /*
            position=0
            limit=10240
            一次可以读取最多读取数据为:position到limit之间的数据量
            limit-position = 10240
         */
//        System.out.println("读取前buffer状态=========================");
//        System.out.println("position:"+buffer.position());
//        System.out.println("limit:"+buffer.limit());
//        len = inChannel.read(buffer);//从通道中读取数据到缓冲区中
//        System.out.println("本次读取了:"+len+"个字节");
//        System.out.println("读取后buffer状态=========================");
//        System.out.println("position:"+buffer.position());
//        System.out.println("limit:"+buffer.limit());
//
//        System.out.println("==============第二次读取");
//        System.out.println("读取前buffer状态=========================");
//        System.out.println("position:"+buffer.position());
//        System.out.println("limit:"+buffer.limit());
//        len = inChannel.read(buffer);//从通道中读取数据到缓冲区中
//        System.out.println("本次读取了:"+len+"个字节");
//        System.out.println("读取后buffer状态=========================");
//        System.out.println("position:"+buffer.position());
//        System.out.println("limit:"+buffer.limit());

        //完成一轮复制
        /*
            position:0
            limit:1024
         */
//        len = inChannel.read(buffer);
//        buffer.flip();
//        outChannel.write(buffer);
        //完整的复制动作
        while((len = inChannel.read(buffer))!=-1){
            buffer.flip();
            outChannel.write(buffer);
            buffer.clear();
        }

 while((len = inChannel.read(buffer))!=-1){
            buffer.flip();
            outChannel.write(buffer);
            buffer.clear();
        }

 

 对聊天室服务端的改造:

/**
 * NIO实现聊天室服务端
 */
public class NIOServer {
    public static void main(String[] args) {
        try {
            //存放所有客户端的SocketChannel,用于广播消息
            List<SocketChannel> allChannel = new ArrayList<>();



            //创建总机,ServerSocketChannel
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            //ServerSocketChannel 模式为阻塞模式,可以将其设置为非阻塞模式
            serverSocketChannel.configureBlocking(false); //传入false设置为非阻塞模式
            //为ServerSocketChannel绑定服务端口,客户端可以通过该端口与我们建立连接
            serverSocketChannel.bind(new InetSocketAddress(8088));
            //以上创建为固定模式,以后用这样的形式创建ServerSocketChannel的非阻塞模式

            /*
                多路选择器的应用
                这个是NIO解决非阻塞的关键API,用于监听所有通道对应的事件,并做出对应的操作。
                我们的线程只要轮询处理多路选择器中待处理的通道事件即可完成所有通道的工作,避免使用大量线程
                处于阻塞来减少不必要的系统开销。
             */
            Selector selector = Selector.open();//使用其静态方法open获取一个多路选择器实例

            /**
             * 让"总机"ServerSocketChannel向多路选择器上注册一个事件,即:accept事件。
             * 原因:原来使用ServerSocket时,一旦主线程调用accept方法就会进入阻塞状态,直到一个客户端连接
             * 否则将无法继续执行其他工作。而现在的操作是让多路选择器关心该事件,避免让线程处于阻塞。
             */
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {
            /*
                多路选择器的select方法
                当注册在该选择器上的channel有事件待处理时,此方法会立即返回一个int值,表示有多少个事件待处理
                若没有任何事件待处理时,此方法会形成阻塞。
             */
                System.out.println("等待选择器告知是否有事件等待处理...");
                selector.select();
                System.out.println("选择器有事件待处理!!!");
                //通过选择器获取所有待处理的事件
                Set<SelectionKey> keySet = selector.selectedKeys();
                //遍历集合,将所有待处理的事件处理完毕
                for (SelectionKey key : keySet) {
                    //判断该事件是否为可以接受一个客户端连接了(对应的是向多路选择器注册的事件SelectionKey.OP_ACCEPT)
                    if (key.isAcceptable()) { //是不是接电话动作
                        //处理接收客户端连接的操作
                    /*
                        通过SelectionKey的方法channel()获取该事件触发的通道

                        因为只有ServerSocketChannel向多路选择器注册了OP_ACCEPT事件,因此当该事件
                        isAcceptable()返回值为true,则说明该事件一定是由ServerSocketChannel触发的
                        所以我们通过该事件获取触发该事件的通道时,一定获取的是ServerSocketChannel
                     */
                        ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    /*
                        获取的SocketChannel与原来ServerSocket.accept返回的Socket道理一致
                        通过该SocketChannel就可以与连接的客户端进行双向交互数据了
                     */
                        SocketChannel socket = channel.accept();//接受客户端的连接
                    /*
                        非阻塞的ServerSocketChannel就算多路选择器提示有客户端请求可接受了,accept返回时
                        得到的SocketChanel有可能为null
                     */
                        if (socket == null) {
                            continue;//忽略本次事件的后续处理
                        }
                    /*
                        当我们接受了客户端连接后,原来的聊天室服务端我们通过Socket获取输入流读取客户端
                        发送过来消息的操作时如果客户端不发送内容,那么读取操作就会阻塞!
                        对此,我们将当前SocketChannel它的读取消息操作也注册到多路选择器中,这样一来只有
                        当客户端发送过来消息时,多路选择器才会通知我们进行处理。
                     */
                        //将当前SocketChannel设置为非阻塞模式
                        socket.configureBlocking(false);
                        //向多路选择器中注册读取客户端消息的事件
                        socket.register(selector, SelectionKey.OP_READ);
                        //将该SocketChannel存入共享集合,用于将消息广播
                        allChannel.add(socket);
                        System.out.println("一个客户端连接了!当前在线人数" + allChannel.size());

                        //该事件是否为某个SocketChannel有消息可以读取了(某个客户端发送过来了消息)
                    }else if(key.isReadable()){
                        try {
                            //通过事件获取触发了该事件的channel
                            SocketChannel socketChannel = (SocketChannel) key.channel();
                            //通过SocketChannel读取该客户端发送过来的消息
                            ByteBuffer buffer = ByteBuffer.allocate(1024);

                            //非阻塞状态下,有可能读取数据时未读取到任何字节
                            int len = socketChannel.read(buffer);
                            if (len == 0) {
                                continue;
                            } else if (len == -1) { //若本次读取的长度返回值为-1,则表示客户端断开连接了
                                socketChannel.close(); //关闭socketChannel与客户端也断开
                                allChannel.remove(socketChannel);
                                continue;
                            }

                            buffer.flip();//反转后position到limit之间就是本次读取到的数据了

                            byte[] data = new byte[buffer.limit()];
                       /*
                            Buffer的get方法要求我们传入一个字节数组,作用是将当前Buffer中从下标
                            position开始的连续data数组长度的字节量装入该data数组。
                        */
                            buffer.get(data);//调用完毕后,data中保存的就是buffer中本次读取到的所有字节了
                            //将读取的消息转换为字符串(客户端发送过来的消息)
                            String line = new String(data, StandardCharsets.UTF_8);
                            //通过SocketChannel获取客户端的地址信息
                            String host = socketChannel.socket().getInetAddress().getHostAddress();
                            System.out.println(host+"说:" + line);
                            //遍历所有的SocketChannel,将该消息发送给所有的客户端
                            for(SocketChannel sc : allChannel){
                                buffer.flip(); //position:0  limit:buffer中所有之前读到的字节
                                sc.write(buffer); //position=limit=buffer中所有之前读到的字节
                            }

                        }catch (IOException e){
                            //读取客户端消息这里若抛出异常,则通常是客户端强行断开连接造成的。
                            key.channel().close(); //不能放在finally里面
                            allChannel.remove(key.channel());
                        }

                    }

                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

 

举报

相关推荐

0 条评论