NIO

IO 模型

BIO

阻塞IO,即进行I/O操作时,线程被阻塞

Sokcet是阻塞调用

SocketServer

典型代码如下:

public class Server {
    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(Constants.port);
//接收新连接线程
        new Thread(() -> {
            try {
//(1)阻塞方法获取新的连接
                Socket socket = serverSocket.accept();
//(2)每一个新的连接都创建一个线程,负责读取数据
                new Thread(() -> {
                    try {
                        byte[] data = new byte[1024];
                        InputStream inputStream = socket.getInputStream();
                        while (true) {
                            int len;
//(3)按照字节流方式读取数据
                            while ((len = inputStream.read(data)) != -1)
                                System.out.println(new String(data, 0, len));
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }).start();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

serverSocket.accept();是阻塞的,只有接收到客户端连接才会向下执行,并为每一个Socket创建一个独立的线程。可以通过线程池优化,但高并发下,线程会创建过多,导致效率低下。

NIO

NIO 是同步非阻塞的I/O模型,提供了Channel,SelectorBuffer核心组件。

ChannelSelector注册事件,Selector通过阻塞的select方法来获取发生的事件。

典型代码:


public class NIOServer {
    Selector serverSelector;
    Selector clientSelector;
    ServerSocketChannel serverSocketChannel;
    final AtomicInteger atomicInteger = new AtomicInteger();

    public NIOServer() throws IOException {
        init();
    }

    public void init() throws IOException {
        serverSelector = Selector.open();
        clientSelector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(Constants.port));
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.register(serverSelector, SelectionKey.OP_ACCEPT);
        System.out.println("Server Start");
    }

    public void start() {
        serverThread().start();
        clientThread().start();
    }

    public Thread serverThread() {
        return new Thread(() -> {
            try {
                serverCycle();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

    public void serverCycle() throws Exception {
        for (; ; ) {
            //阻塞事件
            int accepts = serverSelector.select();
            if (accepts <= 0) {
                continue;
            }
            Iterator<SelectionKey> iterator = serverSelector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                accept(selectionKey);
                iterator.remove();
            }
        }
    }

    public void accept(SelectionKey selectionKey) throws Exception {
        SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept();
        socketChannel.configureBlocking(false);
        socketChannel.register(clientSelector, (SelectionKey.OP_READ));
        System.out.println(atomicInteger.incrementAndGet());
    }

    public Thread clientThread() {
        return new Thread(() -> {
            try {
                clientCycle();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

    public void clientCycle() throws Exception {
        System.out.println("clientCycle");
        for (; ; ) {
            int readyCount = clientSelector.select(1);
            if (readyCount <= 0)
                continue;
            try {
                Iterator<SelectionKey> iterator = clientSelector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    if (selectionKey.isReadable()) {
                        write(selectionKey);
                    }
                    iterator.remove();
                }
            } catch (Exception e) {

            }
        }
    }

    public void write(SelectionKey selectionKey) throws Exception {
        SocketChannel socketChannel = ((SocketChannel) selectionKey.channel());
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//(3)读取数据以块为单位批量读取
        socketChannel.read(byteBuffer);
        byteBuffer.flip();
        System.out.println(Charset.defaultCharset().newDecoder().decode(byteBuffer)
                .toString());
        selectionKey.interestOps(SelectionKey.OP_READ);
    }

    public static void main(String[] args) throws IOException {
        new NIOServer().start();
    }
}

serverSelector开启后向serverSelector注册连接事件,我们开启一个线程来循环检测连接事件是否发生

serverSelector.select();这一步是阻塞的,实际的事件发生后才会解除阻塞。

当我们获取到了连接的SocketChannel,将这个SocketChannel可读事件注册到clientSelector,clientSelector也是在一个线程中进行循环,并通过clientSelector.select();获取可读的客户端,然后便可以进行操作。

这样做到了多个连接使用一个线程,提高了高并发环境下的性能。

Last Updated:
Contributors: himcs, himcs