基础 NIO:Non-Blocking IO
非阻塞 IO
,主要用于网络连接中非阻塞的读写,提供多路非阻塞式的高伸缩性网络 I/O
。异步 I/O
的一个优势在于,可以同时根据大量的输入和输出执行 I/O
。同步程序常常需要轮询,或者创建很多线程处理大量的连接。使用异步 I/O
,可以监听任何数量的通道上的事件,不用轮询也不用额外的线程。
Selector 选择器:是 Java NIO
中能够检测一到多个 NIO
通道,是多路复用器,能够监听通道是否为读写事件做好准备。因此一个单独的线程可以管理多个通道,从而管理多个网络连接。Selector
用来支持异步 I/O
操作(非阻塞I/O操作),Channel
必须处于非阻塞模式下(因此 FileChannel, Selector
不能一起使用)。Selector
是非阻塞 I/O
的核心,所有希望采用非阻塞 I/O
进行通信的通道,都应该注册到 Selector
对象。
SelectionKey
SelectionKey
包含监听的不同类型事件:
OP_READ
OP_WRITE
OP_CONNECT
OP_ACCEPT
常用 API
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public abstract class SelectionKey { public abstract SelectableChannel channel () ; public abstract Selector selector () ; public abstract int interestOps () ; public abstract SelectionKey interestOps (int ops) ; public abstract int readyOps () ; public abstract void cancel () ; public final boolean isReadable () {...} public final boolean isWritable () {...} public final boolean isConnectable () {...} public final boolean isAcceptable () {...} public final Object attach (Object ob) {...} public final Object attachment () {...} }
常用 API
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public abstract class Selector implements Closeable { public static Selector open () throws IOException {...} public abstract boolean isOpen () ; public abstract Set<SelectionKey> keys () ; public abstract Set<SelectionKey> selectedKeys () ; public abstract int selectNow () throws IOException ; public abstract int select (long timeout) throws IOException ; public abstract int select () throws IOException ; public abstract Selector wakeup () ; public abstract void close () throws IOException ; }
SelectableChannel
支持选择器的通道表示可以支持多路复用的通道,支持阻塞和非阻塞模式。(默认情况下,所有的 Channel
都是阻塞的),需要设置为非阻塞模式,才能使用 NIO
特性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public abstract class SelectableChannel extends AbstractInterruptibleChannel implements Channel { public abstract SelectableChannel configureBlocking (boolean block) throws IOException ; public abstract boolean isBlocking () ; public abstract SelectionKey register (Selector sel, int ops, Object att) throws ClosedChannelException ; public final SelectionKey register (Selector sel, int ops) throws ClosedChannelException {...} public abstract boolean isRegistered () ; public abstract SelectionKey keyFor (Selector sel) ; }
注册通道 通道必须处于非阻塞模式,可以监听多个事件。
1 2 3 4 5 channel.configureBlocking(false ); SelectionKey key = channel.register(selector, Selectionkey.OP_READ); int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;SelectionKey key = channel.register(selector, interestSet);
参考示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 Selector selector = Selector.open(); channel.configureBlocking(false ); SelectionKey key = channel.register(selector, SelectionKey.OP_READ); while (true ) { int readyChannels = selector.select(); if (readyChannels == 0 ) continue ; Set selectedKeys = selector.selectedKeys(); Iterator keyIterator = selectedKeys.iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if (key.isAcceptable()) { } else if (key.isConnectable()) { } else if (key.isReadable()) { } else if (key.isWritable()) { } keyIterator.remove(); } }
SocketChannel
对应于 java.net.Socket
类。
创建 SocketChannel
默认创建一个阻塞 SocketChannel
,两种方法的差异:
open()
后续还需要手动配置为非阻塞后,监听连接事件。
open(SocketAddress remote)
创建并同步等待连接 remote
,直到连接成功后返回。不需要再手动连接。
支持的非阻塞事件
OP_CONNECT
OP_READ
OP_WRITE
不支持 Accept
事件
常见 API
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public abstract class SocketChannel extends AbstractSelectableChannel implements ByteChannel , ScatteringByteChannel , GatheringByteChannel , NetworkChannel { public static SocketChannel open () throws IOException {...} public static SocketChannel open (SocketAddress remote) throws IOException ; public abstract SocketChannel bind (SocketAddress local) throws IOException ; public abstract boolean connect (SocketAddress remote) throws IOException ; public abstract boolean finishConnect () throws IOException ; public abstract boolean isConnected () ; public abstract boolean isConnectionPending () ; public abstract <T> SocketChannel setOption (SocketOption<T> name, T value) throws IOException ; public abstract Socket socket () ; public abstract SocketAddress getRemoteAddress () throws IOException ; public abstract SocketAddress getLocalAddress () throws IOException ; public abstract int read (ByteBuffer dst) throws IOException ; public abstract long read (ByteBuffer[] dsts, int offset, int length) throws IOException ; public final long read (ByteBuffer[] dsts) throws IOException {...} public abstract int write (ByteBuffer src) throws IOException ; public abstract long write (ByteBuffer[] srcs, int offset, int length) throws IOException ; public final long write (ByteBuffer[] srcs) throws IOException {...} public final int validOps () {...} public abstract SocketChannel shutdownInput () throws IOException ; public abstract SocketChannel shutdownOutput () throws IOException ; }
ServerSocketChannel
对应于 java.net.ServerSocket
类。
创建 ServerSocketChannel
创建 ServerSocketChannel
也很简单:ServerSocketChannel server = ServerSocketChannel.open();
支持的非阻塞事件 仅支持 OP_ACCEPT
事件。
常见 API
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public abstract class ServerSocketChannel extends AbstractSelectableChannel implements NetworkChannel { public static ServerSocketChannel open () throws IOException {...} public final int validOps () {...} public final ServerSocketChannel bind (SocketAddress local) throws IOException {...} public abstract ServerSocketChannel bind (SocketAddress local, int backlog) throws IOException ; public abstract <T> ServerSocketChannel setOption (SocketOption<T> name, T value) throws IOException ; public abstract ServerSocket socket () ; public abstract SocketChannel accept () throws IOException ; public abstract SocketAddress getLocalAddress () throws IOException ; }
客户端和服务端 TCP
通信流程 整个通信流程参考 TCP
通信流程,只是实现方式不一样。本例实现一个聊天室功能。
服务端初始化
创建 Selector
public static Selector open() throws IOException;
,创建选择器。
创建 ServerSocketChannel
public static ServerSocketChannel open() throws IOException;
,创建服务端通道,并配置为非阻塞。
绑定 bind
public final ServerSocketChannel bind(SocketAddress local){...}
,绑定指定地址和端口。
注册 Accept
事件serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
,注册后服务端选择器开始监听客户端的连接。
服务端轮询监听并处理事件 selector.select()
,阻塞等待。监听客户端的连接及输入,从这里可以看出 nio
并不完全是异步,还是会有阻塞。
客户端初始化
创建 Selector
public static Selector open() throws IOException;
,创建选择器。
创建 SocketChannel
public static SocketChannel open() throws IOException;
,创建客户端通道,并配置为非阻塞。
注册 Connect
事件socketChannel.register(selector, SelectionKey.OP_CONNECT);
,注册事件后准备连接。
连接 connect
public abstract boolean connect(SocketAddress remote) throws IOException;
,连接服务器。
客户端轮询监听并处理事件 selector.select()
,阻塞等待。监听服务端反馈和输入。
数据通信
服务端接受连接后监听客户端写入 服务端接受客户端连接后,拿到客户端 SocketChannel
,并同步注册 Read
事件,监听客户端输入。通过该通道读写缓冲区实现数据通信。
客户端监听服务端发送的消息 客户端接受到服务端连接响应后,客户端 SocketChannel
注册 Read
事件,监听该通道来自服务端的输入。
服务端和客户端关闭 客户端和服务端分别关闭 Selector, ServerSocketChannel, SocketChannel
。
服务端存在的问题:客户端的 SocketChannel
关闭后,服务端此通道并没有断开连接,并且该通道注册到选择器的读事件,会被反复触发。也就是说服务端 select
一直都会返回 OP_READ
,但是通道中读入缓冲区的数据实际总是为 -1,即并没有数据。为什么会反复触发?
1 2 3 4 5 6 7 8 9 10 11 12 // 服务端打印信息,服务端 10000,客户端 62676 client: java.nio.channels.SocketChannel[connected local=/127.0.0.1:10000 remote=/127.0.0.1:62676] client.isOpen(): true client.isConnected(): true client.isConnectionPending(): false client.isRegistered(): true // 1 表示为 OP_READ 事件 client.keyFor(selector).interestOps(): 1 client.keyFor(selector).readyOps(): 1 // 通道读入缓冲区实际值为 -1,即没有数据 count: -1
TCP
通信示例本例实现一个聊天室功能。
服务端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 public class TestNIOTCPServer { private static final int PORT = 10000 ; private static final int BUFF_CAP = 1024 ; private Selector selector = null ; private ServerSocketChannel serverSocketChannel = null ; private Charset charset = Charset.forName("UTF-8" ); private ByteBuffer rBuffer = ByteBuffer.allocate(BUFF_CAP); private void init () throws IOException { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); InetSocketAddress localAddress = new InetSocketAddress(PORT); serverSocketChannel.bind(localAddress); serverSocketChannel.configureBlocking(false ); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("server start on port: " + PORT); } private void listen () { System.out.println("listen..." ); try { while (selector.select() > 0 ){ for (SelectionKey selectionKey : selector.selectedKeys()){ handleSelectorKey(selectionKey); } selector.selectedKeys().clear(); } } catch (IOException e){ e.printStackTrace(); } finally { try { if (serverSocketChannel != null ) { serverSocketChannel.close(); } if (selector != null ){ selector.close(); } } catch (IOException e) { e.printStackTrace(); } } } private void handleSelectorKey (SelectionKey selectionKey) throws IOException { System.out.println("handleSelectorKey, key = " + selectionKey.readyOps()); if (selectionKey.isAcceptable()){ SocketChannel client = serverSocketChannel.accept(); client.configureBlocking(false ); client.register(selector, SelectionKey.OP_READ); System.out.println(client.getRemoteAddress() + " connected." ); } if (selectionKey.isReadable()){ SocketChannel client = (SocketChannel) selectionKey.channel(); StringBuilder builder = new StringBuilder(); rBuffer.clear(); int count = 0 ; try { while ((count = client.read(rBuffer)) > 0 ){ rBuffer.flip(); builder.append(charset.decode(rBuffer)); } System.out.println(client.toString() + ":" + builder); if (count < 0 ){ System.out.println(client.toString() + " has been closed." ); selectionKey.cancel(); if (client != null ){ client.close(); } } } catch (IOException e){ e.printStackTrace(); System.out.println(client.toString() + " disconnected." ); selectionKey.cancel(); if (client != null ){ client.close(); } } if (builder.length() > 0 ){ dispatchInfoToAllClient(client, builder.toString()); } } } private void dispatchInfoToAllClient (SocketChannel client, String info) throws IOException { String name = "[" + client.getRemoteAddress() + "-" + client.hashCode() + "]" ; for (SelectionKey key : selector.keys()){ Channel targetChannel = key.channel(); if (targetChannel instanceof SocketChannel){ SocketChannel dest = (SocketChannel) targetChannel; if (!client.equals(dest)) { dest.write(charset.encode(name + ":" + info)); } } } } public static void main (String[] args) throws IOException { TestNIOTCPServer server = new TestNIOTCPServer(); server.init(); server.listen(); } }
客户端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 public class TestNIOTCPClient { private static final String HOST_NAME = "127.0.0.1" ; private static final int PORT = 10000 ; private static final int BUFF_CAP = 1024 ; private Selector selector = null ; private SocketChannel client = null ; private Charset charset = Charset.forName("UTF-8" ); ByteBuffer rBuffer = ByteBuffer.allocate(BUFF_CAP); private volatile boolean isQuit = false ; private void init () throws IOException { selector = Selector.open(); client = SocketChannel.open(); client.configureBlocking(false ); client.register(selector, SelectionKey.OP_CONNECT); InetSocketAddress remote = new InetSocketAddress(HOST_NAME, PORT); client.connect(remote); ReadThread readThread = new ReadThread(); readThread.setDaemon(true ); readThread.start(); } private void listen () { try { while (selector.select() > 0 && !isQuit){ for (SelectionKey selectionKey : selector.selectedKeys()){ handleSelectionKey(selectionKey); } selector.selectedKeys().clear(); } System.out.println("listen: quit..." ); } catch (IOException e){ e.printStackTrace(); } finally { try { if (selector != null ){ System.out.println("close selector" ); selector.close(); } if (client != null ){ System.out.println("close client" ); client.close(); } } catch (IOException e){ e.printStackTrace(); } } } private void handleSelectionKey (SelectionKey selectionKey) throws IOException { if (selectionKey.isConnectable()){ System.out.println("connected: " + client.isConnected()); System.out.println("connectionPending: " + client.isConnectionPending()); if (client.isConnectionPending()){ client.finishConnect(); System.out.println("connected: " + client.getRemoteAddress()); client.register(selector, SelectionKey.OP_READ); } } if (selectionKey.isReadable()){ StringBuilder builder = new StringBuilder(); rBuffer.clear(); while (client.read(rBuffer) > 0 ){ rBuffer.flip(); builder.append(charset.decode(rBuffer)); } System.out.println(builder); } } private class ReadThread extends Thread { @Override public void run () { Scanner scanner = new Scanner(System.in); try { while (scanner.hasNextLine()) { String line = scanner.nextLine(); if (isQuited(line)) { scanner.close(); isQuit = true ; selector.wakeup(); break ; } else { client.write(charset.encode(line)); } } } catch (IOException e) { e.printStackTrace(); } } } private boolean isQuited (String value) { value = value.trim(); return value.equalsIgnoreCase("quit" ) || value.equalsIgnoreCase("exit" ); } public static void main (String[] args) throws IOException { TestNIOTCPClient client = new TestNIOTCPClient(); client.init(); client.listen(); } }
DatagramChannel
[todo]http://ifeve.com/datagram-channel/ http://blog.csdn.net/foart/article/details/47608475 http://www.365mini.com/page/java-nio-course-27.htm 网络编程第四版-425
参考文档
Java NIO系列教程
java nio SocketChannel 服务器端与多客户端信息交互(聊天功能)
疯狂 Java
讲义