柚子快報(bào)邀請(qǐng)碼778899分享:后端 Java NIO
柚子快報(bào)邀請(qǐng)碼778899分享:后端 Java NIO
一、NIO
NIO(New I/O)即新的輸入/輸出庫(kù)是在 JDK 1.4 中引入的,彌補(bǔ)了原來(lái)的 I/O 的不足,提供了高速的、面向塊的 I/O。
NIO 核心組件:
通道(Channels) 緩沖區(qū)(Buffers) 選擇器(Selectors)
流與塊
I/O 與 NIO 最重要的區(qū)別是數(shù)據(jù)打包和傳輸?shù)姆绞剑琁/O 以流的方式處理數(shù)據(jù),而 NIO 以塊的方式處理數(shù)據(jù)。
面向流的 I/O 一次處理一個(gè)字節(jié)數(shù)據(jù):一個(gè)輸入流產(chǎn)生一個(gè)字節(jié)數(shù)據(jù),一個(gè)輸出流消費(fèi)一個(gè)字節(jié)數(shù)據(jù)。 為流式數(shù)據(jù)創(chuàng)建過(guò)濾器非常容易,鏈接幾個(gè)過(guò)濾器,以便每個(gè)過(guò)濾器只負(fù)責(zé)復(fù)雜處理機(jī)制的一部分。不利的一面是,面向流的 I/O 通常相當(dāng)慢。
面向塊的 I/O 一次處理一個(gè)數(shù)據(jù)塊,按塊處理數(shù)據(jù)比按流處理數(shù)據(jù)要快得多。 但是面向塊的 I/O 缺少一些面向流的 I/O 所具有的優(yōu)雅性和簡(jiǎn)單性。
I/O 包和 NIO 已經(jīng)很好地集成了,java.io.* 已經(jīng)以 NIO 為基礎(chǔ)重新實(shí)現(xiàn)了,所以現(xiàn)在它可以利用 NIO 的一些特性。 例如,java.io.* 包中的一些類(lèi)包含以塊的形式讀寫(xiě)數(shù)據(jù)的方法,這使得即使在面向流的系統(tǒng)中,處理速度也會(huì)更快。
通道與緩沖區(qū)
1. 通道
通道 Channel 是對(duì)原 I/O 包中的流的模擬,可以通過(guò)它讀取和寫(xiě)入數(shù)據(jù)。
通道與流的不同之處在于,流只能在一個(gè)方向上移動(dòng)(一個(gè)流必須是 InputStream 或者 OutputStream 的子類(lèi)), 而通道是雙向的,可以用于讀、寫(xiě)或者同時(shí)用于讀寫(xiě)。
通道包括以下類(lèi)型:
FileChannel:從文件中讀寫(xiě)數(shù)據(jù);DatagramChannel:通過(guò) UDP 讀寫(xiě)網(wǎng)絡(luò)中數(shù)據(jù);SocketChannel:通過(guò) TCP 讀寫(xiě)網(wǎng)絡(luò)中數(shù)據(jù);ServerSocketChannel:可以監(jiān)聽(tīng)新進(jìn)來(lái)的 TCP 連接,對(duì)每一個(gè)新進(jìn)來(lái)的連接都會(huì)創(chuàng)建一個(gè) SocketChannel。
2. 緩沖區(qū)
發(fā)送給一個(gè)通道的所有數(shù)據(jù)都必須首先放到緩沖區(qū)中,同樣地,從通道中讀取的任何數(shù)據(jù)都要先讀到緩沖區(qū)中。也就是說(shuō),不會(huì)直接對(duì)通道進(jìn)行讀寫(xiě)數(shù)據(jù),而是要先經(jīng)過(guò)緩沖區(qū)。
緩沖區(qū)實(shí)質(zhì)上是一個(gè)數(shù)組,但它不僅僅是一個(gè)數(shù)組。緩沖區(qū)提供了對(duì)數(shù)據(jù)的結(jié)構(gòu)化訪問(wèn),而且還可以跟蹤系統(tǒng)的讀/寫(xiě)進(jìn)程。
緩沖區(qū)包括以下類(lèi)型:
ByteBufferCharBufferShortBufferIntBufferLongBufferFloatBufferDoubleBuffer
緩沖區(qū)狀態(tài)變量
capacity:最大容量;position:當(dāng)前已經(jīng)讀寫(xiě)的字節(jié)數(shù);limit:還可以讀寫(xiě)的字節(jié)數(shù)。
狀態(tài)變量的改變過(guò)程舉例:
① 新建一個(gè)大小為 8 個(gè)字節(jié)的緩沖區(qū),此時(shí) position 為 0,而 limit = capacity = 8。capacity 變量不會(huì)改變,下面的討論會(huì)忽略它。
② 從輸入通道中讀取 5 個(gè)字節(jié)數(shù)據(jù)寫(xiě)入緩沖區(qū)中,此時(shí) position 為 5,limit 保持不變。
③ 在將緩沖區(qū)的數(shù)據(jù)寫(xiě)到輸出通道之前,需要先調(diào)用 flip() 方法,這個(gè)方法將 limit 設(shè)置為當(dāng)前 position,并將 position 設(shè)置為 0。
④ 從緩沖區(qū)中取 4 個(gè)字節(jié)到輸出緩沖中,此時(shí) position 設(shè)為 4。
⑤ 最后需要調(diào)用 clear() 方法來(lái)清空緩沖區(qū),此時(shí) position 和 limit 都被設(shè)置為最初位置。
文件 NIO 實(shí)例
FileChannel的使用
開(kāi)啟FileChannel 從FileChannel讀取數(shù)據(jù)/寫(xiě)入數(shù)據(jù)
3.關(guān)閉FileChannel
public class FileChannelDemo {
public static void main(String[] args) throws IOException {
//1.創(chuàng)建一個(gè)RandomAccessFile(隨機(jī)訪問(wèn)文件)對(duì)象通過(guò)RandomAccessFile對(duì)象的getChannel()方法。
RandomAccessFile raf=new RandomAccessFile("demo6.txt","rw");
FileChannel fc=raf.getChannel();
//使用FileChannel的read()方法讀取數(shù)據(jù):
ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
int bys=fc.read(byteBuffer);
//使用FileChannel的write()方法寫(xiě)入數(shù)據(jù):
ByteBuffer byteBuffer2=ByteBuffer.allocate(1024);
byteBuffer2.put("hello".getBytes());
fc.write(byteBuffer2);
//3.關(guān)閉FileChannel
fc.close();
}
}
以下展示了使用 NIO 快速?gòu)?fù)制文件的實(shí)例:
public class CopyFile {
public static void main(String[] args) throws IOException {
String srcFile="國(guó)旗歌.mp4";
String destFile="demo3.mp4";
long start = System.currentTimeMillis();
//copyFile(srcFile,destFile); //共耗時(shí):75309毫秒
//copyFile2(srcFile,destFile); //共耗時(shí):153毫秒
//copyFile3(srcFile,destFile);//共耗時(shí):282毫秒
//copyFile4(srcFile,destFile);//共耗時(shí):44毫秒
copyFile5(srcFile,destFile);//共耗時(shí):共耗時(shí):113毫秒
long end = System.currentTimeMillis();
System.out.println("共耗時(shí):" + (end - start) + "毫秒");
}
/**
* 基本字節(jié)流一次讀寫(xiě)一個(gè)字節(jié)
*/
public static void copyFile(String srcFile,String destFile) throws IOException {
FileInputStream fis=new FileInputStream(srcFile);
FileOutputStream fos=new FileOutputStream(destFile);
int by=0;
while((by=fis.read())!=-1){
fos.write(by);
}
fis.close();
fos.close();
}
/**
* 基本字節(jié)流一次讀寫(xiě)一個(gè)字節(jié)數(shù)組
*/
public static void copyFile2(String srcFile,String destFile) throws IOException{
FileInputStream fis=new FileInputStream(srcFile);
FileOutputStream fos=new FileOutputStream(destFile);
int len=0;
byte[] bys=new byte[1024];
while((len=fis.read(bys))!=-1){
fos.write(bys,0,len);
}
fis.close();
fos.close();
}
/**
* 高效字節(jié)流一次讀寫(xiě)一個(gè)字節(jié)
*/
public static void copyFile3(String srcFile,String destFile) throws IOException{
BufferedInputStream bis=new BufferedInputStream(new FileInputStream(srcFile));
BufferedOutputStream bos=new BufferedOutputStream(new FileOutputStream(destFile));
int by=0;
while((by=bis.read())!=-1){
bos.write(by);
}
bis.close();
bos.close();
}
/**
* 高效字節(jié)流一次讀寫(xiě)一個(gè)字節(jié)數(shù)組
*/
public static void copyFile4(String srcFile,String destFile) throws IOException{
BufferedInputStream bis=new BufferedInputStream(new FileInputStream(srcFile));
BufferedOutputStream bos=new BufferedOutputStream(new FileOutputStream(destFile));
int len=0;
byte[] bys=new byte[1024];
while((len=bis.read(bys))!=-1){
bos.write(bys,0,len);
}
bis.close();
bos.close();
}
/**
* 使用FileChannel復(fù)制文件
*/
public static void copyFile5(String srcFile,String destFile) throws IOException{
FileInputStream fis=new FileInputStream(srcFile);
//獲取輸入字節(jié)流的文件通道
FileChannel fcin=fis.getChannel();
FileOutputStream fos=new FileOutputStream(destFile);
//獲取輸出字節(jié)流的文件通道
FileChannel fcout=fos.getChannel();
//為緩沖區(qū)分配 1024 個(gè)字節(jié)
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
while(true){
//從輸入通道中讀取數(shù)據(jù)到緩沖區(qū)中
int r = fcin.read(buffer);
// read() 返回 -1 表示 EOF
if(r==-1){
break;
}
//切換讀寫(xiě)
buffer.flip();
//把緩沖區(qū)的內(nèi)容寫(xiě)入輸出文件中
fcout.write(buffer);
//清空緩沖區(qū)
buffer.clear();
}
}
}
SocketChannel和ServerSocketChannel的使用
SocketChannel用于創(chuàng)建基于TCP協(xié)議的客戶(hù)端對(duì)象,因?yàn)镾ocketChannel中不存在accept()方法, 所以,它不能成為一個(gè)服務(wù)端程序。 通過(guò)connect()方法,SocketChannel對(duì)象可以連接到其他TCP服務(wù)器程序。
ServerSocketChannel允許我們監(jiān)聽(tīng)TCP協(xié)議請(qǐng)求,通過(guò)ServerSocketChannel的accept()方法創(chuàng)建一個(gè)SocketChannel對(duì)象用戶(hù)從客戶(hù)端讀/寫(xiě)數(shù)據(jù)。
服務(wù)端:
通過(guò)ServerSocketChannel 綁定ip地址和端口號(hào) 通過(guò)ServerSocketChannel的accept()方法創(chuàng)建一個(gè)SocketChannel對(duì)象用戶(hù)從客戶(hù)端讀/寫(xiě)數(shù)據(jù) 創(chuàng)建讀數(shù)據(jù)/寫(xiě)數(shù)據(jù)緩沖區(qū)對(duì)象來(lái)讀取客戶(hù)端數(shù)據(jù)或向客戶(hù)端發(fā)送數(shù)據(jù) 關(guān)閉SocketChannel和ServerSocketChannel
public class Server {
public static void main(String[] args) throws IOException {
//通過(guò)ServerSocketChannel 的open()方法創(chuàng)建一個(gè)ServerSocketChannel對(duì)象
ServerSocketChannel ssc=ServerSocketChannel.open();
//1. 通過(guò)ServerSocketChannel 綁定ip地址和端口號(hào)
ssc.socket().bind(new InetSocketAddress(InetAddress.getByName("LAPTOP-D9966H06"),8888));
//2. 通過(guò)ServerSocketChannel的accept()方法創(chuàng)建一個(gè)SocketChannel對(duì)象用戶(hù)從客戶(hù)端讀/寫(xiě)數(shù)據(jù)
SocketChannel sc=ssc.accept();
//3. 創(chuàng)建讀數(shù)據(jù)/寫(xiě)數(shù)據(jù)緩沖區(qū)對(duì)象來(lái)讀取客戶(hù)端數(shù)據(jù)或向客戶(hù)端發(fā)送數(shù)據(jù)
//讀取客戶(hù)端發(fā)送的數(shù)據(jù)
ByteBuffer buffer=ByteBuffer.allocate(1024);
//從通道中讀取數(shù)據(jù)到緩沖區(qū)
sc.read(buffer);
StringBuffer sb=new StringBuffer();
buffer.flip();
while(buffer.hasRemaining()){
sb.append((char)buffer.get());
}
System.out.println(sb.toString());
ByteBuffer buffer2=ByteBuffer.allocate(1024);
//向客戶(hù)端發(fā)送數(shù)據(jù)
buffer2.put("data has been received.".getBytes());
buffer2.flip();
sc.write(buffer2);
//4. 關(guān)閉SocketChannel和ServerSocketChannel
sc.close();
ssc.close();
}
}
客戶(hù)端:
1.通過(guò)SocketChannel連接到遠(yuǎn)程服務(wù)器
2.創(chuàng)建讀數(shù)據(jù)/寫(xiě)數(shù)據(jù)緩沖區(qū)對(duì)象來(lái)讀取服務(wù)端數(shù)據(jù)或向服務(wù)端發(fā)送數(shù)據(jù)
3.關(guān)閉SocketChannel
public class Client {
public static void main(String[] args) throws IOException {
//1.通過(guò)SocketChannel連接到遠(yuǎn)程服務(wù)器
SocketChannel sc=SocketChannel.open();
sc.connect(new InetSocketAddress(InetAddress.getByName("LAPTOP-D9966H06"),8888));
//2.創(chuàng)建讀數(shù)據(jù)/寫(xiě)數(shù)據(jù)緩沖區(qū)對(duì)象來(lái)讀取服務(wù)端數(shù)據(jù)或向服務(wù)端發(fā)送數(shù)據(jù)
//向通道中寫(xiě)入數(shù)據(jù)
ByteBuffer buffer=ByteBuffer.allocate(1024);
buffer.put("hello".getBytes());
buffer.flip();
sc.write(buffer);
//讀取從客戶(hù)端中獲取的數(shù)據(jù)
ByteBuffer buffer2=ByteBuffer.allocate(1024);
sc.read(buffer2);
StringBuffer sb=new StringBuffer();
buffer2.flip();
while(buffer2.hasRemaining()){
sb.append((char)buffer2.get());
}
System.out.println(sb.toString());
//3.關(guān)閉SocketChannel
sc.close();
}
}
DatagramChannel的使用
DataGramChannel,類(lèi)似于java 網(wǎng)絡(luò)編程的DatagramSocket類(lèi); 使用UDP進(jìn)行網(wǎng)絡(luò)傳輸, UDP是無(wú)連接,面向數(shù)據(jù)報(bào)文段的協(xié)議。
服務(wù)端:
public class Server {
public static void main(String[] args) throws IOException {
DatagramChannel dc= DatagramChannel.open();
dc.bind(new InetSocketAddress(InetAddress.getByName("LAPTOP-D9966H06"),8888));
//創(chuàng)建讀數(shù)據(jù)/寫(xiě)數(shù)據(jù)緩沖區(qū)對(duì)象來(lái)讀取客戶(hù)端數(shù)據(jù)或向客戶(hù)端發(fā)送數(shù)據(jù)
//讀取客戶(hù)端發(fā)送的數(shù)據(jù)
ByteBuffer buffer=ByteBuffer.allocate(1024);
//從通道中讀取數(shù)據(jù)到緩沖區(qū)
dc.receive(buffer);
StringBuffer sb=new StringBuffer();
buffer.flip();
while(buffer.hasRemaining()){
sb.append((char)buffer.get());
}
System.out.println(sb.toString());
ByteBuffer buffer2=ByteBuffer.allocate(1024);
//向客戶(hù)端發(fā)送數(shù)據(jù)
buffer2.put("data has been received.".getBytes());
buffer2.flip();
dc.send(buffer2,new InetSocketAddress(InetAddress.getByName("LAPTOP-D9966H06"),9999));
dc.close();
}
}
客戶(hù)端:
public class Client {
public static void main(String[] args) throws IOException {
DatagramChannel dc= DatagramChannel.open();
dc.bind(new InetSocketAddress(InetAddress.getByName("LAPTOP-D9966H06"),9999));
//創(chuàng)建讀數(shù)據(jù)/寫(xiě)數(shù)據(jù)緩沖區(qū)對(duì)象來(lái)讀取服務(wù)端數(shù)據(jù)或向服務(wù)端發(fā)送數(shù)據(jù)
//向通道中寫(xiě)入數(shù)據(jù)
ByteBuffer buffer=ByteBuffer.allocate(1024);
buffer.put("hello".getBytes());
buffer.flip();
dc.send(buffer,new InetSocketAddress(InetAddress.getByName("LAPTOP-D9966H06"),8888));
//讀取從客戶(hù)端中獲取的數(shù)據(jù)
ByteBuffer buffer2=ByteBuffer.allocate(1024);
dc.receive(buffer2);
StringBuffer sb=new StringBuffer();
buffer2.flip();
while(buffer2.hasRemaining()){
sb.append((char)buffer2.get());
}
System.out.println(sb.toString());
dc.close();
}
}
通道之間的數(shù)據(jù)傳輸
在Java NIO中如果一個(gè)channel是FileChannel類(lèi)型的,那么他可以直接把數(shù)據(jù)傳輸?shù)搅硪粋€(gè)channel。
transferFrom() :transferFrom方法把數(shù)據(jù)從通道源傳輸?shù)紽ileChannel
transferTo() :transferTo方法把FileChannel數(shù)據(jù)傳輸?shù)搅硪粋€(gè)FileChhannel
public static void copyFile6(String srcFile,String destFile) throws IOException {
FileInputStream fis = new FileInputStream(srcFile);
//獲取輸入字節(jié)流的文件通道
FileChannel fcin = fis.getChannel();
FileOutputStream fos = new FileOutputStream(destFile);
//獲取輸出字節(jié)流的文件通道
FileChannel fcout = fos.getChannel();
//fcin通道中讀出count bytes ,并寫(xiě)入fcout通道中
//fcin.transferTo(0,fcin.size(),fcout);
//或者
fcout.transferFrom(fcin,0,fcin.size());
}
選擇器
NIO 常常被叫做非阻塞 IO,主要是因?yàn)?NIO 在網(wǎng)絡(luò)通信中的非阻塞特性被廣泛使用。
NIO 實(shí)現(xiàn)了 IO 多路復(fù)用中的 Reactor 模型,一個(gè)線程 Thread 使用一個(gè)選擇器 Selector 通過(guò)輪詢(xún)的方式 去監(jiān)聽(tīng)多個(gè)通道 Channel 上的事件,從而讓一個(gè)線程就可以處理多個(gè)事件。
通過(guò)配置監(jiān)聽(tīng)的通道 Channel 為非阻塞,那么當(dāng) Channel 上的 IO 事件還未到達(dá)時(shí), 就不會(huì)進(jìn)入阻塞狀態(tài)一直等待,而是繼續(xù)輪詢(xún)其它 Channel,找到 IO 事件已經(jīng)到達(dá)的 Channel 執(zhí)行。
因?yàn)閯?chuàng)建和切換線程的開(kāi)銷(xiāo)很大,因此使用一個(gè)線程來(lái)處理多個(gè)事件而不是一個(gè)線程處理一個(gè)事件, 對(duì)于 IO 密集型的應(yīng)用具有很好地性能。
應(yīng)該注意的是,只有套接字 Channel 才能配置為非阻塞,而 FileChannel 不能, 為 FileChannel 配置非阻塞也沒(méi)有意義。
使用Selector的優(yōu)點(diǎn):
使用更少的線程來(lái)就可以來(lái)處理通道了, 相比使用多個(gè)線程, 避免了線程上下文切換帶來(lái)的開(kāi)銷(xiāo)。
1. 創(chuàng)建選擇器
Selector selector = Selector.open();
2. 將通道注冊(cè)到選擇器上
ServerSocketChannel ssChannel = ServerSocketChannel.open();
ssChannel.configureBlocking(false);//通道必須配置為非阻塞模式
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
通道必須配置為非阻塞模式,否則使用選擇器就沒(méi)有任何意義了,因?yàn)槿绻ǖ涝谀硞€(gè)事件上被阻塞,那么服務(wù)器就不能響應(yīng)其它事件,必須等待這個(gè)事件處理完畢才能去處理其它事件,顯然這和選擇器的作用背道而馳。
在將通道注冊(cè)到選擇器上時(shí),還需要指定要注冊(cè)的具體事件,主要有以下幾類(lèi):
SelectionKey.OP_CONNECTSelectionKey.OP_ACCEPTSelectionKey.OP_READSelectionKey.OP_WRITE
它們?cè)?SelectionKey 的定義如下:
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;
可以看出每個(gè)事件可以被當(dāng)成一個(gè)位域,從而組成事件集整數(shù)。例如:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
3. 監(jiān)聽(tīng)事件
int num = selector.select();
使用 select() 來(lái)監(jiān)聽(tīng)到達(dá)的事件,它會(huì)一直阻塞直到有至少一個(gè)事件到達(dá)。
4. 獲取到達(dá)的事件
Set
Iterator
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
// ...
} else if (key.isReadable()) {
// ...
}
keyIterator.remove();
}
5. 事件循環(huán)
因?yàn)橐淮?select() 調(diào)用不能處理完所有的事件,并且服務(wù)器端有可能需要一直監(jiān)聽(tīng)事件,因此服務(wù)器端處理事件的代碼一般會(huì)放在一個(gè)死循環(huán)內(nèi)。
while (true) {
int num = selector.select();
Set
Iterator
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
// ...
} else if (key.isReadable()) {
// ...
}
keyIterator.remove();
}
}
Reactor 模型
我們知道,在我們使用傳統(tǒng)方法進(jìn)行網(wǎng)絡(luò)IO操作的時(shí)候,需要使用一個(gè)線程監(jiān)聽(tīng)I(yíng)O事件,當(dāng)事件到達(dá)之后,創(chuàng)建一個(gè)線程去處理接收到的IO事件,這種模型需要?jiǎng)?chuàng)建大量的線程,會(huì)極大的浪費(fèi)內(nèi)存等資源。
這種情況下,有人提出了Reactor模式,在Reactor中,拆分為不同的小線程或者子過(guò)程,這些被拆分的小線程和子過(guò)程對(duì)應(yīng)的是handler,每一種handler都會(huì)處理一種事件(event)。這個(gè)需要一個(gè)全局的管理者selector,向channel注冊(cè)感興趣的事件,selector不斷在channel中監(jiān)測(cè)是否有該事件發(fā)生,如果沒(méi)有,那么主線程會(huì)被阻塞,否則會(huì)調(diào)用相應(yīng)的事件處理函數(shù)來(lái)處理。這些事件典型的有連接、讀取、寫(xiě)入,我們需要為這些事件分別提供處理器,事件到達(dá)后分發(fā)到處理器中就可以返回處理后面的事件,吞吐量能夠極大的提高。其中定義了以下三種角色:
Reactor:負(fù)責(zé)將IO事件分派給指定的處理器(Handler)Acceptor:負(fù)責(zé)處理新的連接,并將請(qǐng)求移交給ReactorHandler:負(fù)責(zé)讀寫(xiě)的處理器
單Reactor單線程模型
這是最基本的單Reactor單線程模型。其中Reactor線程,負(fù)責(zé)多路復(fù)用socket,有新連接到來(lái)觸發(fā)事件之后,交由Acceptor進(jìn)行處理,有IO讀寫(xiě)事件之后交給hanlder 處理。
Acceptor主要任務(wù)就是構(gòu)建handler ,在獲取到和client相關(guān)的SocketChannel之后 ,綁定到相應(yīng)的hanlder上,對(duì)應(yīng)的SocketChannel有讀寫(xiě)事件之后,基于reactor 分發(fā),hanlder就可以處理了(所有的IO事件都綁定到selector上,有Reactor分發(fā))。
該模型 適用于處理器鏈中業(yè)務(wù)處理組件能快速完成的場(chǎng)景。不過(guò),這種單線程模型不能充分利用多核資源,所以實(shí)際使用的不多。
單Reactor多線程模型
相對(duì)于第一種單線程的模式來(lái)說(shuō),在處理業(yè)務(wù)邏輯,也就是獲取到IO的讀寫(xiě)事件之后,交由線程池來(lái)處理,這樣可以減小主reactor的性能開(kāi)銷(xiāo),從而更專(zhuān)注的做事件分發(fā)工作了,從而提升整個(gè)應(yīng)用的吞吐量。
多Reactor多線程模型
第三種模型比起第二種模型,是將Reactor分成兩部分:
mainReactor:負(fù)責(zé)監(jiān)聽(tīng)server socket,用來(lái)處理新連接的建立,將建立的socketChannel指定注冊(cè)給subReactor;subReactor:維護(hù)自己的selector, accept會(huì)將連接交給它來(lái)處理,讀寫(xiě)(read、send)網(wǎng)絡(luò)數(shù)據(jù),對(duì)于業(yè)務(wù)邏輯的處理,將其扔給線程池中的worker來(lái)處理。
在該模型中,mainReactor 主要是用來(lái)處理網(wǎng)絡(luò)IO 連接建立操作,通常一個(gè)線程就可以處理,而subReactor主要做和建立起來(lái)的socket做數(shù)據(jù)交互和事件業(yè)務(wù)處理操作,它的個(gè)數(shù)上一般是和CPU個(gè)數(shù)等同,一個(gè)subReactor對(duì)應(yīng)一個(gè)線程處理。
此種模型中,每個(gè)模塊的工作更加專(zhuān)一,耦合度更低,性能和穩(wěn)定性也大量的提升,支持的可并發(fā)客戶(hù)端數(shù)量可達(dá)到上百萬(wàn)級(jí)別。關(guān)于此種模型的應(yīng)用,目前有很多優(yōu)秀的框架已經(jīng)應(yīng)用,比如Netty。
優(yōu)缺點(diǎn)
優(yōu)點(diǎn):
響應(yīng)快,不必為單個(gè)同步時(shí)間所阻塞,雖然Reactor本身依然是同步的編程相對(duì)簡(jiǎn)單,可以最大程度的避免復(fù)雜的多線程及同步問(wèn)題,并且避免了多線程/進(jìn)程的切換開(kāi)銷(xiāo)可擴(kuò)展性,可以方便的通過(guò)增加Reactor實(shí)例個(gè)數(shù)來(lái)充分利用CPU資源可復(fù)用性,reactor框架本身與具體事件處理邏輯無(wú)關(guān),具有很高的復(fù)用性
缺點(diǎn):
相比傳統(tǒng)的簡(jiǎn)單模型,Reactor增加了一定的復(fù)雜性,因而有一定的門(mén)檻,并且不易于調(diào)試Reactor模式需要底層的Synchronous Event Demultiplexer支持,比如Java中的Selector支持,操作系統(tǒng)的select系統(tǒng)調(diào)用支持,如果要自己實(shí)現(xiàn)Synchronous Event Demultiplexer可能不會(huì)有那么高效Reactor模式在IO讀寫(xiě)數(shù)據(jù)時(shí)還是在同一個(gè)線程中實(shí)現(xiàn)的,即使使用多個(gè)Reactor機(jī)制的情況下,那些共享一個(gè)Reactor的Channel如果出現(xiàn)一個(gè)長(zhǎng)時(shí)間的數(shù)據(jù)讀寫(xiě),會(huì)影響這個(gè)Reactor中其他Channel的相應(yīng)時(shí)間,比如在大文件傳輸時(shí),IO操作就會(huì)影響其他Client的相應(yīng)時(shí)間,因而對(duì)這種操作,使用傳統(tǒng)的Thread-Per-Connection或許是一個(gè)更好的選擇,或則此時(shí)使用Proactor模式
套接字 NIO 實(shí)例
public class NIOServer {
public static void main(String[] args) throws IOException {
//1. 創(chuàng)建選擇器
Selector selector = Selector.open();
//2.將通道注冊(cè)到選擇器上
ServerSocketChannel ssChannel = ServerSocketChannel.open();
ssChannel.configureBlocking(false);
//通道必須配置為非阻塞模式,否則使用選擇器就沒(méi)有任何意義了
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
ServerSocket ss=ssChannel.socket();
ss.bind(new InetSocketAddress("127.0.0.1",8888));
while (true){
//3. 監(jiān)聽(tīng)事件
selector.select();
//4. 獲取到達(dá)的事件
Set
Iterator
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
ServerSocketChannel ssChannel1 = (ServerSocketChannel) key.channel();
// 服務(wù)器會(huì)為每個(gè)新連接創(chuàng)建一個(gè) SocketChannel
SocketChannel sChannel = ssChannel1.accept();
sChannel.configureBlocking(false);
// 這個(gè)新連接主要用于從客戶(hù)端讀取數(shù)據(jù)
sChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
SocketChannel sChannel = (SocketChannel) key.channel();
System.out.println(readDataFromSocketChannel(sChannel));
sChannel.close();
}
keyIterator.remove();
}
}
}
private static String readDataFromSocketChannel(SocketChannel sChannel) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(1024);
StringBuilder data = new StringBuilder();
while (true) {
buffer.clear();
int r = sChannel.read(buffer);
if (r == -1) {
break;
}
buffer.flip();
int limit = buffer.limit();
char[] dst = new char[limit];
for (int i = 0; i < limit; i++) {
dst[i] = (char) buffer.get(i);
}
data.append(dst);
buffer.clear();
}
return data.toString();
}
}
public class NIOClient {
public static void main(String[] args) throws IOException {
Socket socket = new Socket("127.0.0.1", 8888);
OutputStream out = socket.getOutputStream();
String s = "hello world";
out.write(s.getBytes());
out.close();
}
}
內(nèi)存映射文件
內(nèi)存映射文件((memory-mapped file)) I/O 是一種讀和寫(xiě)文件數(shù)據(jù)的方法,它可以比常規(guī)的基于流或者基于通道的 I/O 快得多。
內(nèi)存映射文件能讓你創(chuàng)建和修改那些大到無(wú)法讀入內(nèi)存的文件。有了內(nèi)存映射文件,可以認(rèn)為文件已經(jīng)全部讀進(jìn)了內(nèi)存,然后把它當(dāng)成一個(gè)非常大的數(shù)組來(lái)訪問(wèn)了。將文件的一段區(qū)域映射到內(nèi)存中,比傳統(tǒng)的文件處理速度要快很多。內(nèi)存映射文件它雖然最終也是要從磁盤(pán)讀取數(shù)據(jù),但是它并不需要將數(shù)據(jù)讀取到 OS 內(nèi)核緩沖區(qū),而是直接將進(jìn)程的用戶(hù)私有地址空間中的一部分區(qū)域與文件對(duì)象建立起映射關(guān)系,就好像直接從內(nèi)存中讀、寫(xiě)文件一樣,速度當(dāng)然快了。
向內(nèi)存映射文件寫(xiě)入可能是危險(xiǎn)的,只是改變數(shù)組的單個(gè)元素這樣的簡(jiǎn)單操作,就可能會(huì)直接修改磁盤(pán)上的文件。修改數(shù)據(jù)與將數(shù)據(jù)保存到磁盤(pán)是沒(méi)有分開(kāi)的。
下面代碼行將文件的前 1024 個(gè)字節(jié)映射到內(nèi)存中,map() 方法返回一個(gè) MappedByteBuffer,它是 ByteBuffer 的子類(lèi)。因此,可以像使用其他任何 ByteBuffer 一樣使用新映射的緩沖區(qū),操作系統(tǒng)會(huì)在需要時(shí)負(fù)責(zé)執(zhí)行映射。
MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0, 1024);
Path
Java7中文件IO發(fā)生了很大的變化,專(zhuān)門(mén)引入了很多新的類(lèi)來(lái)取代原來(lái)的 基于java.io.File的文件IO操作方式。
創(chuàng)建一個(gè)Path
使用Paths工具類(lèi)的get()方法創(chuàng)建Path對(duì)象
public class PathDemo {
public static void main(String[] args) {
//方式一
Path path=Paths.get("demo5.txt");
System.out.println(path);
//方式二
Path path2 = FileSystems.getDefault().getPath("demo5.txt");
System.out.println(path2);
}
}
File和Path之間的轉(zhuǎn)換,F(xiàn)ile和URI之間的轉(zhuǎn)換
public class PathDemo2 {
public static void main(String[] args) {
Path path=Paths.get("demo5.txt");
File file=path.toFile();
URI uri=path.toUri();
System.out.println(path);
System.out.println(file);
System.out.println(uri);
}
}
demo5.txt
demo5.txt
file:///F:/Java_Review/05Java/JavaIO/demo5.txt
獲取Path的相關(guān)信息
public class PathDemo3 {
public static void main(String[] args) {
Path path= Paths.get("demo3\\test3.txt");
System.out.println("文件名:"+ path.getFileName());
System.out.println("名稱(chēng)元素的數(shù)量:"+path.getNameCount());
System.out.println("父路徑:"+ path.getParent());
System.out.println("根路徑:"+ path.getRoot());
System.out.println("是否是絕對(duì)路徑:"+path.isAbsolute());
//startWith() 參數(shù)既可以是字符串,也可以是Path
System.out.println("是否是以路徑demo3開(kāi)頭:"+path.startsWith(Paths.get("demo3")));
System.out.println("該路徑的字符串形式:"+path.toString());
}
}
文件名:test3.txt
名稱(chēng)元素的數(shù)量:2
父路徑:demo3
根路徑:null
是否是絕對(duì)路徑:false
是否是以路徑demo3開(kāi)頭:true
該路徑的字符串形式:demo3\test3.txt
移除Path中的冗余項(xiàng)
\ .表示的是當(dāng)前目錄 ?\ ..表示父目錄或者說(shuō)是上一級(jí)目錄
normalize() : 返回一個(gè)路徑,該路徑是取出冗余項(xiàng)的路徑。
toRealPath() : 可以看成,先進(jìn)行toAbsolutePath()操作,然后進(jìn)行normalize()操作
public class PathDemo4 {
public static void main(String[] args) throws IOException {
Path path= Paths.get("./demo3");
System.out.println("original :"+ path.toAbsolutePath());
System.out.println("after normalize:"+ path.toAbsolutePath().normalize());
System.out.println("after toRealPath:"+ path.toRealPath());
}
}
original :F:\Java_Review\05Java\JavaIO\.\demo3
after normalize:F:\Java_Review\05Java\JavaIO\demo3
after toRealPath:F:\Java_Review\05Java\JavaIO\demo3
public class PathDemo5 {
public static void main(String[] args) throws IOException {
Path path= Paths.get("../JavaIO");
System.out.println("original :"+ path.toAbsolutePath());
System.out.println("after normalize:"+ path.toAbsolutePath().normalize());
System.out.println("after toRealPath:"+ path.toRealPath());
}
}
original :F:\Java_Review\05Java\JavaIO\..\JavaIO
after normalize:F:\Java_Review\05Java\JavaIO
after toRealPath:F:\Java_Review\05Java\JavaIO
Files
java.nio.file.Files類(lèi)是和java.nio.file.Path相結(jié)合使用的
檢查給定的Path在文件系統(tǒng)中是否存在
Files.exists():檢測(cè)文件路徑是否存在
public class FilesDemo {
public static void main(String[] args) {
Path path = Paths.get("demo5.txt");
//LinkOptions.NOFOLLOW_LINKS:表示檢測(cè)時(shí)不包含符號(hào)鏈接文件。
boolean isExist= Files.exists(path,new LinkOption[]{LinkOption.NOFOLLOW_LINKS});
System.out.println(isExist);
}
}
創(chuàng)建文件/文件夾
Files.createFile():創(chuàng)建文件
Files.createDirectory(): 創(chuàng)建文件夾
Files.createDirectories(): 創(chuàng)建文件夾
public class FilesDemo2 {
public static void main(String[] args) throws IOException {
Path path= Paths.get("demo7.txt");
if(!Files.exists(path)){
Files.createFile(path);
}
Path path2=Paths.get("demo4");
if(!Files.exists(path2)){
Files.createDirectory(path2);
}
Path path3=Paths.get("demo5\\test");
if(!Files.exists(path3)){
Files.createDirectories(path3);
}
}
}
刪除文件或目錄
Files.delete():刪除一個(gè)文件或目錄
public class FilesDemo3 {
public static void main(String[] args) throws IOException {
Path path= Paths.get("demo7.txt");
Files.delete(path);
}
}
把一個(gè)文件從一個(gè)地址復(fù)制到另一個(gè)位置
Files.copy():把一個(gè)文件從一個(gè)地址復(fù)制到另一個(gè)位置
public class FilesDemo4 {
public static void main(String[] args) throws IOException {
Path srcPath= Paths.get("demo6.txt");
Path destPath=Paths.get("demo7.txt");
//Files.copy(srcPath,destPath);
//強(qiáng)制覆蓋已經(jīng)存在的目標(biāo)文件
Files.copy(srcPath,destPath, StandardCopyOption.REPLACE_EXISTING);
}
}
獲取文件屬性
public class FilesDemo5 {
public static void main(String[] args) throws IOException {
Path path= Paths.get("demo7.txt");
System.out.println(Files.getLastModifiedTime(path));
System.out.println(Files.size(path));
System.out.println(Files.isSymbolicLink(path));
System.out.println(Files.isDirectory(path));
System.out.println(Files.readAttributes(path,"*"));
}
}
遍歷一個(gè)文件夾
public class FilesDemo6 {
public static void main(String[] args) throws IOException {
Path path= Paths.get("demo3\\demo2");
DirectoryStream
for(Path p:paths){
System.out.println(p.getFileName());
}
}
}
遍歷整個(gè)文件目錄
FileVisitor需要調(diào)用方自行實(shí)現(xiàn),然后作為參數(shù)傳入walkFileTree(); FileVisitor的每個(gè)方法會(huì)在遍歷過(guò)程中被調(diào)用多次。
public class FilesDemo7 {
public static void main(String[] args) throws IOException {
Path path= Paths.get("demo3\\demo2");
List
Files.walkFileTree(path,new FileVisitor(paths));
System.out.println("paths:"+paths);
}
private static class FileVisitor extends SimpleFileVisitor
private List
public FileVisitor(List
this.paths=paths;
}
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if(file.toString().endsWith(".txt")){
paths.add(file.getFileName());
}
return super.visitFile(file, attrs);
}
}
}
輸出結(jié)果:
paths:[a.txt, test2.txt, test.txt, test3.txt]
推薦閱讀:
程序員面試手冊(cè)編程筆記
本文由博客一文多發(fā)平臺(tái) OpenWrite 發(fā)布!
柚子快報(bào)邀請(qǐng)碼778899分享:后端 Java NIO
好文鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。