NIO
IO 模型
BIO
阻塞IO
,即进行I/O
操作时,线程被阻塞
Sokcet
是阻塞调用
SocketServer
典型代码如下:
public class Server {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(Constants.port);
//接收新连接线程
new Thread(() -> {
try {
//(1)阻塞方法获取新的连接
Socket socket = serverSocket.accept();
//(2)每一个新的连接都创建一个线程,负责读取数据
new Thread(() -> {
try {
byte[] data = new byte[1024];
InputStream inputStream = socket.getInputStream();
while (true) {
int len;
//(3)按照字节流方式读取数据
while ((len = inputStream.read(data)) != -1)
System.out.println(new String(data, 0, len));
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
serverSocket.accept();
是阻塞的,只有接收到客户端连接才会向下执行,并为每一个Socket
创建一个独立的线程。可以通过线程池优化,但高并发下,线程会创建过多,导致效率低下。
NIO
NIO 是同步非阻塞的I/O
模型,提供了Channel
,Selector
,Buffer
核心组件。
Channel
向Selector
注册事件,Selector
通过阻塞的select
方法来获取发生的事件。
典型代码:
public class NIOServer {
Selector serverSelector;
Selector clientSelector;
ServerSocketChannel serverSocketChannel;
final AtomicInteger atomicInteger = new AtomicInteger();
public NIOServer() throws IOException {
init();
}
public void init() throws IOException {
serverSelector = Selector.open();
clientSelector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(Constants.port));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(serverSelector, SelectionKey.OP_ACCEPT);
System.out.println("Server Start");
}
public void start() {
serverThread().start();
clientThread().start();
}
public Thread serverThread() {
return new Thread(() -> {
try {
serverCycle();
} catch (Exception e) {
e.printStackTrace();
}
});
}
public void serverCycle() throws Exception {
for (; ; ) {
//阻塞事件
int accepts = serverSelector.select();
if (accepts <= 0) {
continue;
}
Iterator<SelectionKey> iterator = serverSelector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
accept(selectionKey);
iterator.remove();
}
}
}
public void accept(SelectionKey selectionKey) throws Exception {
SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept();
socketChannel.configureBlocking(false);
socketChannel.register(clientSelector, (SelectionKey.OP_READ));
System.out.println(atomicInteger.incrementAndGet());
}
public Thread clientThread() {
return new Thread(() -> {
try {
clientCycle();
} catch (Exception e) {
e.printStackTrace();
}
});
}
public void clientCycle() throws Exception {
System.out.println("clientCycle");
for (; ; ) {
int readyCount = clientSelector.select(1);
if (readyCount <= 0)
continue;
try {
Iterator<SelectionKey> iterator = clientSelector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isReadable()) {
write(selectionKey);
}
iterator.remove();
}
} catch (Exception e) {
}
}
}
public void write(SelectionKey selectionKey) throws Exception {
SocketChannel socketChannel = ((SocketChannel) selectionKey.channel());
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//(3)读取数据以块为单位批量读取
socketChannel.read(byteBuffer);
byteBuffer.flip();
System.out.println(Charset.defaultCharset().newDecoder().decode(byteBuffer)
.toString());
selectionKey.interestOps(SelectionKey.OP_READ);
}
public static void main(String[] args) throws IOException {
new NIOServer().start();
}
}
serverSelector
开启后向serverSelector
注册连接事件,我们开启一个线程来循环检测连接事件是否发生
, serverSelector.select();
这一步是阻塞的,实际的事件发生后才会解除阻塞。
当我们获取到了连接的SocketChannel
,将这个SocketChannel
可读事件注册到clientSelector
,clientSelector
也是在一个线程中进行循环,并通过clientSelector.select();
获取可读的客户端,然后便可以进行操作。
这样做到了多个连接使用一个线程,提高了高并发环境下的性能。