柚子快報(bào)激活碼778899分享:BIO、NIO、AIO詳解
柚子快報(bào)激活碼778899分享:BIO、NIO、AIO詳解
一、Java的I/O演進(jìn)之路
Java共支持3種網(wǎng)絡(luò)編程的I/O模型:BIO、NIO、AIO
BIO:
同步并阻塞(傳統(tǒng)阻塞型),服務(wù)器實(shí)現(xiàn)模式為一個(gè)連接一個(gè)線程,即客戶端有連接請(qǐng)求時(shí)服務(wù)器端就需要啟動(dòng)一個(gè)線程進(jìn)行處理,如果這個(gè)連接不做任何事情會(huì)造成不必要的線程開銷
NIO:
同步非阻塞,服務(wù)器實(shí)現(xiàn)模式為一個(gè)線程處理多個(gè)請(qǐng)求(連接),即客戶端發(fā)送的連接請(qǐng)求都會(huì)注冊(cè)到多路復(fù)用器上,多路復(fù)用器輪詢到連接有I/O請(qǐng)求就進(jìn)行處理
AIO:
異步非阻塞,服務(wù)器實(shí)現(xiàn)模式為一個(gè)有效請(qǐng)求一個(gè)線程,客戶端的I/O請(qǐng)求都是由操作系統(tǒng)先完成了再通知服務(wù)器應(yīng)用去啟動(dòng)線程進(jìn)行處理,一般適用于連接數(shù)較多且連接時(shí)間較長(zhǎng)的應(yīng)用
二、BIO深入剖析
1、BIO概述
BIO(Blocking I/O)就是傳統(tǒng)的Java IO編程,其相關(guān)的類和接口在java.io包下。BIO是同步阻塞的,服務(wù)器實(shí)現(xiàn)模式為一個(gè)連接一個(gè)線程,即客戶端有連接請(qǐng)求時(shí)服務(wù)器端就需要啟動(dòng)一個(gè)線程進(jìn)行處理,如果這個(gè)連接不做任何事情會(huì)造成不必要的線程開銷,可以通過線程池機(jī)制改善(實(shí)現(xiàn)多個(gè)客戶連接服務(wù)器)
BIO編程流程的梳理:
服務(wù)器端啟動(dòng)一個(gè)ServerSocket,注冊(cè)端口,調(diào)用accpet方法監(jiān)聽客戶端的Socket連接客戶端啟動(dòng)Socket對(duì)服務(wù)器進(jìn)行通信,默認(rèn)情況下服務(wù)器端需要對(duì)每個(gè)客戶建立一個(gè)線程與之通訊
2、BIO案例
服務(wù)端:
public class Server {
public static void main(String[] args) throws IOException {
System.out.println("===服務(wù)端啟動(dòng)===");
// 1.定義一個(gè)ServerSocket對(duì)象進(jìn)行服務(wù)端的端口注冊(cè)
ServerSocket ss = new ServerSocket(9999);
// 2.監(jiān)聽客戶端的Socket連接請(qǐng)求
Socket socket = ss.accept();
// 3.從Socket管道中得到一個(gè)字節(jié)輸入流對(duì)象
InputStream is = socket.getInputStream();
// 4.把字節(jié)輸入流包裝成一個(gè)緩沖字符輸入流
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String msg;
while ((msg = br.readLine()) != null) {
System.out.println("服務(wù)端接收到:" + msg);
}
}
}
客戶端:
public class Client {
public static void main(String[] args) throws IOException {
System.out.println("===客戶端啟動(dòng)===");
// 1.創(chuàng)建Socket對(duì)象請(qǐng)求服務(wù)端的連接
Socket socket = new Socket("127.0.0.1", 9999);
// 2.從Socket對(duì)象中獲取一個(gè)字節(jié)輸出流
OutputStream os = socket.getOutputStream();
// 3.把字節(jié)輸出流包裝成一個(gè)打印流
PrintStream ps = new PrintStream(os);
ps.println("Hello World!服務(wù)端,你好!");
ps.flush();
}
}
小結(jié):
在以上通信中,服務(wù)端會(huì)一直等待客戶端的消息,如果客戶端沒有進(jìn)行消息的發(fā)送,服務(wù)端將一直進(jìn)入阻塞狀態(tài)
3、偽異步I/O
1)、概述
偽異步I/O采用線程池和任務(wù)隊(duì)列實(shí)現(xiàn),當(dāng)客戶端接入時(shí),將客戶端的Socket封裝成一個(gè)Task(該任務(wù)實(shí)現(xiàn)java.lang.Runnable線程任務(wù)接口)交給后端的線程池中進(jìn)行處理。JDK的線程池維護(hù)一個(gè)消息隊(duì)列和N個(gè)活躍的線程,對(duì)消息隊(duì)列中Socket任務(wù)進(jìn)行處理,由于線程池可以設(shè)置消息隊(duì)列的大小和最大線程數(shù),因此,它的資源占用是可控的,無論多少個(gè)客戶端并發(fā)訪問,都不會(huì)導(dǎo)致資源的耗盡和宕機(jī)
2)、代碼實(shí)現(xiàn)
客戶端:
public class Client {
public static void main(String[] args) {
try {
Socket socket = new Socket("127.0.0.1", 9999);
OutputStream os = socket.getOutputStream();
PrintStream ps = new PrintStream(os);
Scanner sc = new Scanner(System.in);
while (true) {
System.out.print("請(qǐng)說:");
String msg = sc.nextLine();
ps.println(msg);
ps.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
線程池處理類:
public class SocketServerPoolHandler {
/**
* 1.創(chuàng)建一個(gè)線程池的成員變量用于存儲(chǔ)一個(gè)線程池對(duì)象
*/
private ExecutorService executorService;
/**
* 2.創(chuàng)建這個(gè)類的時(shí)候就需要初始化線程池對(duì)象
*/
public SocketServerPoolHandler(int maxThreadNum, int queueSize) {
executorService = new ThreadPoolExecutor(3, maxThreadNum,
120, TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueSize));
}
/**
* 3.提供一個(gè)方法來提交任務(wù)給線程池的任務(wù)隊(duì)列來暫存,等著線程池來處理
*/
public void execute(Runnable target) {
executorService.execute(target);
}
}
服務(wù)端:
public class Server {
public static void main(String[] args) {
try {
// 1.注冊(cè)端口
ServerSocket ss = new ServerSocket(9999);
// 2.定義一個(gè)循環(huán)接收客戶端的Socket連接請(qǐng)求
// 初始化一個(gè)線程池對(duì)象
SocketServerPoolHandler poolHandler = new SocketServerPoolHandler(3, 10);
while (true) {
Socket socket = ss.accept();
// 3.把Socket對(duì)象交給一個(gè)線程池進(jìn)行處理
// 把Socket封裝成一個(gè)任務(wù)對(duì)象交給線程池處理
Runnable target = new ServerRunnableTarget(socket);
poolHandler.execute(target);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class ServerRunnableTarget implements Runnable {
private Socket socket;
public ServerRunnableTarget(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
// 處理接收的客戶端Socket通信需求
try {
InputStream is = socket.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String msg;
while ((msg = br.readLine()) != null) {
System.out.println("服務(wù)端接收到:" + msg);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
運(yùn)行結(jié)果:
啟動(dòng)服務(wù)端并啟動(dòng)多個(gè)客戶端發(fā)送消息,由于核心線程數(shù)=最大線程數(shù)=3,當(dāng)客戶端數(shù)>3時(shí),客戶端的Socket任務(wù)會(huì)到線程池的阻塞隊(duì)列中等待,關(guān)閉客戶端,當(dāng)客戶端數(shù)<=3時(shí),Socket任務(wù)將會(huì)被服務(wù)端處理
3)、小結(jié)
偽異步I/O采用了線程池實(shí)現(xiàn),因此避免了為每個(gè)請(qǐng)求創(chuàng)建一個(gè)獨(dú)立線程造成線程資源耗盡的問題,但由于底層依然是采用的同步阻塞模型,因此無法從根本上解決問題如果單個(gè)消息處理的緩慢,或者服務(wù)器線程池中的全部線程都被阻塞,那么后續(xù)Socket的I/O消息都將在隊(duì)列中排隊(duì)。新的Socket請(qǐng)求將被拒絕,客戶端會(huì)發(fā)生大量連接超時(shí)
多線程BIO通信模型圖:
三、NIO深入剖析
1、NIO概述
NIO(Non-Blocking IO)是從Java 1.4版本開始引入的一個(gè)新的IO API,可以替代標(biāo)準(zhǔn)的Java IO API。NIO與原來的IO有同樣的作用和目的,但是使用的方式完全不同,NIO支持面向緩沖區(qū)的、基于通道的IO操作。NIO將以更加高效的方式進(jìn)行文件的讀寫操作。NIO可以理解為非阻塞IO,傳統(tǒng)的IO的read和write只能阻塞執(zhí)行,線程在讀寫IO期間不能干其他事情,比如調(diào)用socket.read()時(shí),如果服務(wù)器一直沒有數(shù)據(jù)傳輸過來,線程就一直阻塞,而NIO中可以配置socket為非阻塞模式NIO相關(guān)類都被放在java.nio包及子包下,并且對(duì)原java.io包中的很多類進(jìn)行改寫NIO有三大核心部分:Channel(通道)、Buffer(緩沖區(qū))、Selector(選擇器)Java NIO的非阻塞模式,使一個(gè)線程從某通道發(fā)送請(qǐng)求或者讀取數(shù)據(jù),但是它僅能得到目前可用的數(shù)據(jù),如果目前沒有數(shù)據(jù)可用時(shí),就什么都不會(huì)獲取,而不是保持線程阻塞,所以直至數(shù)據(jù)變的可以讀取之前,該線程可以繼續(xù)做其他的事情。非阻塞寫也是如此,一個(gè)線程請(qǐng)求寫入一些數(shù)據(jù)到某通道,但不需要等待它完全寫入,這個(gè)線程同時(shí)可以去做別的事情通俗理解:NIO是可以做到用一個(gè)線程來處理多個(gè)操作的。假設(shè)有1000個(gè)請(qǐng)求過來,根據(jù)實(shí)際情況,可以分配20或者80個(gè)線程來處理。不像之前的阻塞IO那樣,非得分配1000個(gè)
2、NIO和BIO的比較
BIO以流的方式處理數(shù)據(jù),而NIO以塊的方式處理數(shù)據(jù),塊I/O的效率比流I/O高很多BIO是阻塞的,NIO則是非阻塞的BIO基于字節(jié)流和字符流進(jìn)行操作,而NIO基于Channel(通道)和Buffer(緩沖區(qū))進(jìn)行操作,數(shù)據(jù)總是從通道 讀取到緩沖區(qū)中,或者從緩沖區(qū)寫入到通道中。Selector(選擇器)用于監(jiān)聽多個(gè)通道的事件(比如:連接請(qǐng)求,數(shù)據(jù)到達(dá)等),因此使用單個(gè)線程就可以監(jiān)聽多個(gè)客戶端通道
NIOBIO面向緩沖區(qū)(Buffer)面向流(Stream)非阻塞(Non Blocking IO)阻塞IO(Blocking IO)選擇器(Selector)
3、NIO三大核心原理示意圖
NIO有三大核心部分:Channel(通道)、Buffer(緩沖區(qū))、Selector(選擇器)
Buffer緩沖區(qū):
緩沖區(qū)本質(zhì)上是一塊可以寫入數(shù)據(jù),然后可以從中讀取數(shù)據(jù)的內(nèi)存。這塊內(nèi)存被包裝成NIO Buffer對(duì)象,并提供了一組方法,用來方便的訪問該塊內(nèi)存。相比較直接對(duì)數(shù)組的操作,Buffer API更加容易操作和管理
Channel(通道):
Java NIO的通道類似流,但又有些不同:既可以從通道中讀取數(shù)據(jù),又可以寫數(shù)據(jù)到通道。但流的(input或output)讀寫通常是單向的。通道可以非阻塞讀取和寫入通道,通道可以支持讀取或?qū)懭刖彌_區(qū),也支持異步地讀寫
Selector選擇器:
Selector是一個(gè)Java NIO組件,可以能夠檢查一個(gè)或多個(gè)NIO通道,并確定哪些通道已經(jīng)準(zhǔn)備好進(jìn)行讀取或?qū)懭?。這樣,一個(gè)單獨(dú)的線程可以管理多個(gè)channel,從而管理多個(gè)網(wǎng)絡(luò)連接,提高效率
每個(gè)channel都會(huì)對(duì)應(yīng)一個(gè)Buffer一個(gè)線程對(duì)應(yīng)Selector,一個(gè)Selector對(duì)應(yīng)多個(gè)channel(連接)程序切換到哪個(gè)channel是由事件決定的Selector會(huì)根據(jù)不同的事件,在各個(gè)通道上切換Buffer就是一個(gè)內(nèi)存塊,底層是一個(gè)數(shù)組數(shù)據(jù)的讀取寫入是通過Buffer完成的,BIO中要么是輸入流,或者是輸出流,不能雙向,但是NIO的Buffer是可以讀也可以寫Java NIO系統(tǒng)的核心在于:通道(Channel)和緩沖區(qū)(Buffer)。通道表示打開到IO設(shè)備(例如:文件、套接字)的連接。若需要使用NIO系統(tǒng),需要獲取用于連接IO設(shè)備的通道以及用于容納數(shù)據(jù)的緩沖區(qū)。然后操作緩沖區(qū),對(duì)數(shù)據(jù)進(jìn)行處理。簡(jiǎn)而言之,Channel負(fù)責(zé)傳輸,Buffer負(fù)責(zé)存取數(shù)據(jù)
4、NIO核心一:緩沖區(qū)(Buffer)
1)、Buffer概述
Buffer是一個(gè)用于特定基本數(shù)據(jù)類型的容器。由java.nio包定義的,所有緩沖區(qū)都是Buffer抽象類的子類。Java NIO中的Buffer主要用于與NIO通道進(jìn)行交互,數(shù)據(jù)是從通道讀入緩沖區(qū),從緩沖區(qū)寫入通道中的
2)、Buffer類及其子類
Buffer就像一個(gè)數(shù)組,可以保存多個(gè)相同類型的數(shù)據(jù)。根據(jù)數(shù)據(jù)類型不同,有以下Buffer常用子類:
ByteBufferCharBufferShortBufferIntBufferLongBufferFloatBufferDoubleBuffer
上述Buffer類都采用相似的方法進(jìn)行管理數(shù)據(jù),只是各自管理的數(shù)據(jù)類型不同而已。都是通過如下方法獲取一個(gè)Buffer對(duì)象:
static XxxBuffer allocate(int capacity) //創(chuàng)建一個(gè)容量為capacity的XxxBuffer對(duì)象
3)、緩沖區(qū)的基本屬性
容量(capacity):作為一個(gè)內(nèi)存塊,Buffer具有一定的固定大小,也稱為容量,緩沖區(qū)容量不能為負(fù),并且創(chuàng)建后不能更改限制(limit):表示緩沖區(qū)中可以操作數(shù)據(jù)的大小(limit后數(shù)據(jù)不能進(jìn)行讀寫)。緩沖區(qū)的限制不能為負(fù),并且不能大于其容量。寫入模式,限制等于buffer的容量。讀取模式下,limit等于寫入的數(shù)據(jù)量位置(position):下一個(gè)要讀取或?qū)懭氲臄?shù)據(jù)的索引。緩沖區(qū)的位置不能為負(fù),并且不能大于其限制標(biāo)記(mark)與重置(reset):標(biāo)記是一個(gè)索引,通過Buffer中的mark()方法指定Buffer中一個(gè)特定的position,之后可以通過調(diào)用reset()方法恢復(fù)到這個(gè)position標(biāo)記、位置、限制、容量遵守以下不變式:0 <= mark <= position <= limit <= capacity
4)、Buffer常見方法
Buffer clear() //清空緩沖區(qū)并返回對(duì)緩沖區(qū)的引用(緩沖區(qū)中的數(shù)據(jù)依然存在,但是處于被遺忘狀態(tài))
Buffer flip() //為將緩沖區(qū)的界限設(shè)置為當(dāng)前位置,并將當(dāng)前位置重置為0
int capacity() //返回Buffer的capacity大小
boolean hasRemaining() //判斷緩沖區(qū)中是否還有元素
int limit() //返回Buffer的界限(limit)的位置
Buffer limit(int n) //將設(shè)置緩沖區(qū)界限為n,并返回一個(gè)具有新limit的緩沖區(qū)對(duì)象
Buffer mark() //對(duì)緩沖區(qū)設(shè)置標(biāo)記
int position() //返回緩沖區(qū)的當(dāng)前位置position
Buffer position(int n) //將設(shè)置緩沖區(qū)的當(dāng)前位置為n,并返回修改后的Buffer對(duì)象
int remaining() //返回position和limit之間的元素個(gè)數(shù)
Buffer reset() //將位置position轉(zhuǎn)到以前設(shè)置的mark所在的位置
Buffer rewind() //將位置設(shè)為0,取消設(shè)置的mark
5)、緩沖區(qū)的數(shù)據(jù)操作
Buffer所有子類提供了兩個(gè)用于數(shù)據(jù)操作的方法:get()和put()方法
獲取Buffer中的數(shù)據(jù):
get() //讀取單個(gè)字節(jié)
get(byte[] dst) //批量讀取多個(gè)字節(jié)到dst中
get(int index) //讀取指定索引位置的字節(jié)(不會(huì)移動(dòng)position)
放到入數(shù)據(jù)到Buffer中:
put(byte b) //將給定單個(gè)字節(jié)寫入緩沖區(qū)的當(dāng)前位置
put(byte[] src) //將src中的字節(jié)寫入緩沖區(qū)的當(dāng)前位置
put(int index, byte b) //將指定字節(jié)寫入緩沖區(qū)的索引位置(不會(huì)移動(dòng)position)
使用Buffer讀寫數(shù)據(jù)一般遵循以下四個(gè)步驟:
寫入數(shù)據(jù)到Buffer調(diào)用flip()方法,轉(zhuǎn)換為讀取模式從Buffer中讀取數(shù)據(jù)調(diào)用buffer.clear()方法或者buffer.compact()方法清除緩沖區(qū)
6)、Buffer案例
@Test
public void test01() {
// 1.分配一個(gè)緩沖區(qū),容量設(shè)置成10
ByteBuffer buffer = ByteBuffer.allocate(10);
System.out.println(buffer.position()); // 0
System.out.println(buffer.limit()); // 10
System.out.println(buffer.capacity()); // 10
System.out.println("--------------------");
// 2.put()往緩沖區(qū)中添加數(shù)據(jù)
String name = "hello";
buffer.put(name.getBytes());
System.out.println(buffer.position()); // 5
System.out.println(buffer.limit()); // 10
System.out.println(buffer.capacity()); // 10
System.out.println("--------------------");
// 3.flip()為將緩沖區(qū)的界限設(shè)置為當(dāng)前位置,并將當(dāng)前位置重置為0 可讀模式
buffer.flip();
System.out.println(buffer.position()); // 0
System.out.println(buffer.limit()); // 5
System.out.println(buffer.capacity()); // 10
System.out.println("--------------------");
// 4.get()數(shù)據(jù)的讀取
char ch = (char) buffer.get();
System.out.println(ch);
System.out.println(buffer.position()); // 1
System.out.println(buffer.limit()); // 5
System.out.println(buffer.capacity()); // 10
System.out.println("--------------------");
}
@Test
public void test02() {
// 1.分配一個(gè)緩沖區(qū),容量設(shè)置成10 put()往緩沖區(qū)中添加數(shù)據(jù)
ByteBuffer buffer = ByteBuffer.allocate(10);
String name = "hello";
buffer.put(name.getBytes());
System.out.println(buffer.position()); // 5
System.out.println(buffer.limit()); // 10
System.out.println(buffer.capacity()); // 10
System.out.println("--------------------");
// 2.clear()清除緩沖區(qū)中的數(shù)據(jù) 并沒有真正清除數(shù)據(jù),只是讓position的位置恢復(fù)到初始位置,后續(xù)添加數(shù)據(jù)的時(shí)候才會(huì)覆蓋每個(gè)位置的數(shù)據(jù)
buffer.clear();
System.out.println(buffer.position()); // 0
System.out.println(buffer.limit()); // 10
System.out.println(buffer.capacity()); // 10
System.out.println((char) buffer.get()); // h
System.out.println("--------------------");
// 3.定義一個(gè)緩沖區(qū)
ByteBuffer buf = ByteBuffer.allocate(10);
String n = "hello";
buf.put(n.getBytes());
buf.flip();
// 讀取數(shù)據(jù)
byte[] b = new byte[2];
buf.get(b);
System.out.println(new String(b));
System.out.println(buf.position()); // 2
System.out.println(buf.limit()); // 5
System.out.println(buf.capacity()); // 10
System.out.println("--------------------");
buf.mark(); // 標(biāo)記此刻這個(gè)位置 2
byte[] b2 = new byte[3];
buf.get(b2);
System.out.println(new String(b2));
System.out.println(buf.position()); // 5
System.out.println(buf.limit()); // 5
System.out.println(buf.capacity()); // 10
System.out.println("--------------------");
buf.reset(); // 回到標(biāo)記位置
if (buf.hasRemaining()) {
System.out.println(buf.remaining()); // 3
}
}
7)、直接與非直接緩沖區(qū)
ByteBuffer可以是兩種類型,一種是基于直接內(nèi)存(也就是非堆內(nèi)存);另一種是非直接內(nèi)存(也就是堆內(nèi)存)。對(duì)于直接內(nèi)存來說,JVM將會(huì)在IO操作上具有更高的性能,因?yàn)樗苯幼饔糜诒镜叵到y(tǒng)的IO操作。而非直接內(nèi)存,也就是堆內(nèi)存中的數(shù)據(jù),如果要作IO操作,會(huì)先從本進(jìn)程內(nèi)存復(fù)制到直接內(nèi)存,再利用本地IO處理
從數(shù)據(jù)流的角度,非直接內(nèi)存是下面這樣的作用鏈:
本地IO-->直接內(nèi)存-->非直接內(nèi)存-->直接內(nèi)存-->本地IO
而直接內(nèi)存是:
本地IO-->直接內(nèi)存-->本地IO
很明顯,在做IO處理時(shí),比如網(wǎng)絡(luò)發(fā)送大量數(shù)據(jù)時(shí),直接內(nèi)存會(huì)具有更高的效率。直接內(nèi)存使用allocateDirect()創(chuàng)建,但是它比申請(qǐng)普通的堆內(nèi)存需要耗費(fèi)更高的性能。不過,這部分的數(shù)據(jù)是在JVM之外的,因此它不會(huì)占用應(yīng)用的內(nèi)存。所以呢,當(dāng)有很大的數(shù)據(jù)要緩存,并且它的生命周期又很長(zhǎng),那么就比較適合使用直接內(nèi)存。只是一般來說,如果不是能帶來很明顯的性能提升,還是推薦直接使用堆內(nèi)存。字節(jié)緩沖區(qū)是直接緩沖區(qū)還是非直接緩沖區(qū)可通過調(diào)用其isDirect()方法來確定
直接緩沖區(qū)使用場(chǎng)景:
有很大的數(shù)據(jù)需要存儲(chǔ),它的生命周期又很長(zhǎng)適合頻繁的IO操作,比如網(wǎng)絡(luò)并發(fā)場(chǎng)景
5、NIO核心二:通道(Channel)
1)、Channel概述
通道(Channel):由java.nio.channels包定義的。Channel表示IO源與目標(biāo)打開的連接。Channel類似于傳統(tǒng)的流。只不過Channel本身不能直接訪問數(shù)據(jù),Channel只能與Buffer進(jìn)行交互
1)NIO的通道類似于流,但有些區(qū)別如下:
通道可以同時(shí)進(jìn)行讀寫,而流只能讀或者只能寫通道可以實(shí)現(xiàn)異步讀寫數(shù)據(jù)通道可以從緩沖讀數(shù)據(jù),也可以寫數(shù)據(jù)到緩沖
2)BIO中的stream是單向的,例如FileInputStream對(duì)象只能進(jìn)行讀取數(shù)據(jù)的操作,而NIO中的Channel是雙向的,可以讀操作,也可以寫操作
3)Channel在NIO中是一個(gè)接口
public interface Channel extends Closeable
2)、常用的Channel實(shí)現(xiàn)類
FileChannel:用于讀取、寫入、映射和操作文件的通道DatagramChannel:通過UDP讀寫網(wǎng)絡(luò)中的數(shù)據(jù)通道SocketChannel:通過TCP讀寫網(wǎng)絡(luò)中的數(shù)據(jù)ServerSocketChannel:可以監(jiān)聽新進(jìn)來的TCP連接,對(duì)每一個(gè)新進(jìn)來的連接都會(huì)創(chuàng)建一個(gè)SocketChannel(ServerSocketChannel類似ServerSocket,SocketChannel類似Socket)
3)、FileChannel類
獲取通道的一種方式是對(duì)支持通道的對(duì)象調(diào)用getChannel()方法。支持通道的類如下:
FileInputStreamFileOutputStreamRandomAccessFileDatagramSocketSocketServerSocket
獲取通道的其他方式是使用Files類的靜態(tài)方法newByteChannel()獲取字節(jié)通道,或者通過通道的靜態(tài)方法open()打開并返回指定通道
4)、FileChannel的常用方法
int read(ByteBuffer dst) // 從Channel中讀取數(shù)據(jù)到ByteBuffer
long read(ByteBuffer[] dsts) // 將Channel中的數(shù)據(jù)分散到ByteBuffer[]
int write(ByteBuffer src) // ByteBuffer中的數(shù)據(jù)寫入到Channel
long write(ByteBuffer[] srcs) // 將ByteBuffer[]中的數(shù)據(jù)聚集到Channel
long position() // 返回此通道的文件位置
FileChannel position(long p) // 設(shè)置此通道的文件位置
long size() // 返回此通道的文件的當(dāng)前大小
FileChannel truncate(long s) // 將此通道的文件截取為給定大小
void force(boolean metaData) // 強(qiáng)制將所有對(duì)此通道的文件更新寫入到存儲(chǔ)設(shè)備中
5)、FileChannel案例
1)本地文件寫數(shù)據(jù)
@Test
public void write() {
try {
// 1.字節(jié)輸出流通向目標(biāo)文件
FileOutputStream fos = new FileOutputStream("data01.txt");
// 2.得到字節(jié)輸出流對(duì)應(yīng)的通道Channel
FileChannel channel = fos.getChannel();
// 3.分配緩沖區(qū)
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("Hello World!".getBytes());
// 4.把緩沖區(qū)切換成寫出模式
buffer.flip();
channel.write(buffer);
channel.close();
System.out.println("寫數(shù)據(jù)到文件中!");
} catch (IOException e) {
e.printStackTrace();
}
}
2)本地文件讀數(shù)據(jù)
@Test
public void read() {
try {
// 1.定義一個(gè)文件字節(jié)輸入流與源文件接通
FileInputStream fis = new FileInputStream("data01.txt");
// 2.需要得到文件輸入流的文件通道
FileChannel channel = fis.getChannel();
// 3.定義一個(gè)緩沖區(qū)
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 4.讀取數(shù)據(jù)到緩沖區(qū)
channel.read(buffer);
buffer.flip();
// 5.讀取出緩沖區(qū)中的數(shù)據(jù)并輸出
String rs = new String(buffer.array(), 0, buffer.remaining());
System.out.println(rs);
} catch (IOException e) {
e.printStackTrace();
}
}
3)使用Buffer完成文件復(fù)制
@Test
public void copy() {
try {
// 源文件
File srcFile = new File("data01.txt");
// 目標(biāo)文件
File destFile = new File("data02.txt");
// 得到字節(jié)輸入流
FileInputStream fis = new FileInputStream(srcFile);
// 得到字節(jié)輸出流
FileOutputStream fos = new FileOutputStream(destFile);
// 得到文件通道
FileChannel isChannel = fis.getChannel();
FileChannel osChannel = fos.getChannel();
// 分配緩沖區(qū)
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true) {
// 必須先清空緩沖區(qū)再寫入數(shù)據(jù)到緩沖區(qū)
buffer.clear();
// 開始讀取一次數(shù)據(jù)
int flag = isChannel.read(buffer);
if (flag == -1) {
break;
}
// 已經(jīng)讀取了數(shù)據(jù),把緩沖區(qū)的模式切換成可讀模式
buffer.flip();
// 把數(shù)據(jù)寫出
osChannel.write(buffer);
}
isChannel.close();
osChannel.close();
System.out.println("復(fù)制完成!");
} catch (IOException e) {
e.printStackTrace();
}
}
4)分散(Scatter)和聚集(Gather)
分散讀取(Scatter):是指把Channel通道的數(shù)據(jù)讀入到多個(gè)緩沖區(qū)中去
聚集寫入(Gather):是指將多個(gè)Buffer中的數(shù)據(jù)聚集到Channel
@Test
public void test() {
try {
// 1.字節(jié)輸入管道
FileInputStream fis = new FileInputStream("data01.txt");
FileChannel isChannel = fis.getChannel();
// 2.字節(jié)輸出管道
FileOutputStream fos = new FileOutputStream("data03.txt");
FileChannel osChannel = fos.getChannel();
// 3.定義多個(gè)緩沖區(qū)做數(shù)據(jù)分散
ByteBuffer buffer1 = ByteBuffer.allocate(6);
ByteBuffer buffer2 = ByteBuffer.allocate(1024);
ByteBuffer[] buffers = {buffer1, buffer2};
// 4.從通道中讀取數(shù)據(jù)分散到各個(gè)緩沖區(qū)
isChannel.read(buffers);
// 5.從每個(gè)緩沖區(qū)中查詢是否有數(shù)據(jù)讀取到
for (ByteBuffer buffer : buffers) {
// 切換到讀數(shù)據(jù)模式
buffer.flip();
System.out.println(new String(buffer.array(), 0, buffer.remaining()));
}
// 6.聚集寫入到通道
osChannel.write(buffers);
isChannel.close();
osChannel.close();
System.out.println("復(fù)制完成!");
} catch (IOException e) {
e.printStackTrace();
}
}
5)transferFrom()
從目標(biāo)通道中去復(fù)制原通道數(shù)據(jù)
@Test
public void test02() {
try {
// 1.字節(jié)輸入管道
FileInputStream fis = new FileInputStream("data01.txt");
FileChannel isChannel = fis.getChannel();
// 2.字節(jié)輸出管道
FileOutputStream fos = new FileOutputStream("data04.txt");
FileChannel osChannel = fos.getChannel();
// 3.復(fù)制數(shù)據(jù)
osChannel.transferFrom(isChannel, isChannel.position(), isChannel.size());
isChannel.close();
osChannel.close();
System.out.println("復(fù)制完成!");
} catch (IOException e) {
e.printStackTrace();
}
}
6)transferTo()
把原通道數(shù)據(jù)復(fù)制到目標(biāo)通道
@Test
public void test03() {
try {
// 1.字節(jié)輸入管道
FileInputStream fis = new FileInputStream("data01.txt");
FileChannel isChannel = fis.getChannel();
// 2.字節(jié)輸出管道
FileOutputStream fos = new FileOutputStream("data05.txt");
FileChannel osChannel = fos.getChannel();
// 3.復(fù)制數(shù)據(jù)
isChannel.transferTo(isChannel.position(), isChannel.size(), osChannel);
isChannel.close();
osChannel.close();
System.out.println("復(fù)制完成!");
} catch (IOException e) {
e.printStackTrace();
}
}
6、NIO核心三:選擇器(Selector)
1)、Selector概述
選擇器(Selector)是非阻塞IO的核心,是SelectableChannel對(duì)象的多路復(fù)用器,Selector可以同時(shí)監(jiān)控多個(gè)SelectableChannel的IO狀況,也就是說,利用Selector可使一個(gè)單獨(dú)的線程管理多個(gè)Channel
NIO用非阻塞的IO方式。可以用一個(gè)線程,處理多個(gè)的客戶端連接,就會(huì)使用到SelectorSelector能夠檢測(cè)多個(gè)注冊(cè)的通道上是否有事件發(fā)生(多個(gè)Channel以事件的方式可以注冊(cè)到同一個(gè)Selector),如果有事件發(fā)生,便獲取事件然后針對(duì)每個(gè)事件進(jìn)行相應(yīng)的處理。這樣就可以只用一個(gè)單線程去管理多個(gè)通道,也就是管理多個(gè)連接和請(qǐng)求只有在連接或通道有讀寫事件發(fā)生時(shí),才會(huì)進(jìn)行讀寫,就大大地減少了系統(tǒng)開銷,并且不必為每個(gè)連接都創(chuàng)建一個(gè)線程,不用去維護(hù)多個(gè)線程避免了多線程之間的上下文切換導(dǎo)致的開銷
2)、Selector的應(yīng)用
創(chuàng)建Selector:通過調(diào)用Selector.open()方法創(chuàng)建一個(gè)Selector
Selector selector = Selector.open();
向選擇器注冊(cè)通道:SelectableChannel.register(Selector sel, int ops)
// 1.獲取通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
// 2.切換非阻塞模式
ssChannel.configureBlocking(false);
// 3.綁定連接
ssChannel.bind(new InetSocketAddress(9898));
// 4.獲取選擇器
Selector selector = Selector.open();
// 5.將通道注冊(cè)到選擇器上,并且指定監(jiān)聽接收事件
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
當(dāng)調(diào)用register(Selector sel, int ops)將通道注冊(cè)選擇器時(shí),選擇器對(duì)通道的監(jiān)聽事件,需要通過第二個(gè)參數(shù)ops指定??梢员O(jiān)聽的事件類型(使用SelectionKey的四個(gè)常量表示):
讀:SelectionKey.OP_READ(1)寫:SelectionKey.OP_WRITE(4)連接:SelectionKey.OP_CONNECT(8)接收:SelectionKey.OP_ACCEPT(16)
若注冊(cè)時(shí)不止監(jiān)聽一個(gè)事件,則可以使用位或操作符連接
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE
7、NIO非阻塞式網(wǎng)絡(luò)通信原理分析
1)、Selector特點(diǎn)說明
Selector可以實(shí)現(xiàn):一個(gè)I/O線程可以并發(fā)處理N個(gè)客戶端連接和讀寫操作,這從根本上解決了傳統(tǒng)同步阻塞I/O一連接一線程模型,架構(gòu)的性能、彈性伸縮能力和可靠性都得到了極大的提升
2)、服務(wù)端流程
1)當(dāng)客戶端連接服務(wù)端時(shí),服務(wù)端會(huì)通過ServerSocketChannel得到SocketChannel:獲取通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
2)切換非阻塞模式
ssChannel.configureBlocking(false);
3)綁定連接
ssChannel.bind(new InetSocketAddress(9999));
4)獲取選擇器
Selector selector = Selector.open();
5)將通道注冊(cè)到選擇器上,并且指定監(jiān)聽接收事件
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
6)輪詢式的獲取選擇器上已經(jīng)準(zhǔn)備就緒的事件
// 輪詢式的獲取選擇器上已經(jīng)準(zhǔn)備就緒的事件
while (selector.select() > 0) {
// 7)獲取當(dāng)前選擇器中所有注冊(cè)的選擇鍵(已就緒的監(jiān)聽事件)
Iterator
while (it.hasNext()) {
// 8)獲取準(zhǔn)備就緒的是事件
SelectionKey sk = it.next();
// 9)判斷具體是什么事件準(zhǔn)備就緒
if (sk.isAcceptable()) {
// 10)若接收就緒,獲取客戶端連接
SocketChannel sChannel = ssChannel.accept();
//11)切換非阻塞模式
sChannel.configureBlocking(false);
// 12)將該通道注冊(cè)到選擇器上
sChannel.register(selector, SelectionKey.OP_READ);
} else if (sk.isReadable()) {
// 13)獲取當(dāng)前選擇器上讀就緒狀態(tài)的通道
SocketChannel sChannel = (SocketChannel) sk.channel();
// 14)讀取數(shù)據(jù)
ByteBuffer buf = ByteBuffer.allocate(1024);
int len = 0;
while ((len = sChannel.read(buf)) > 0) {
buf.flip();
System.out.println(new String(buf.array(), 0, len));
buf.clear();
}
}
// 15)取消選擇鍵SelectionKey
it.remove();
}
}
3)、客戶端流程
1)獲取通道
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9999));
2)切換非阻塞模式
sChannel.configureBlocking(false);
3)分配指定大小的緩沖區(qū)
ByteBuffer buf = ByteBuffer.allocate(1024);
4)發(fā)送數(shù)據(jù)給服務(wù)端
Scanner scan = new Scanner(System.in);
while (scan.hasNext()) {
String str = scan.nextLine();
buf.put((new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(System.currentTimeMillis())
+ "\n" + str).getBytes());
buf.flip();
sChannel.write(buf);
buf.clear();
}
// 關(guān)閉通道
sChannel.close();
8、NIO非阻塞式網(wǎng)絡(luò)通信入門案例
1)、服務(wù)端實(shí)現(xiàn)
public class Server {
public static void main(String[] args) throws IOException {
// 1.獲取通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
// 2.切換非阻塞模式
ssChannel.configureBlocking(false);
// 3.綁定連接
ssChannel.bind(new InetSocketAddress(9999));
// 4.獲取選擇器
Selector selector = Selector.open();
// 5.將通道注冊(cè)到選擇器上,并且指定監(jiān)聽接收事件
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
// 6.使用Selector選擇器輪詢已經(jīng)就緒好的事件
while (selector.select() > 0) {
// 7.獲取選擇器中的所有注冊(cè)的通道中已經(jīng)就緒好的事件
Iterator
// 8.開始遍歷這些準(zhǔn)備好的事件
while (it.hasNext()) {
SelectionKey sk = it.next();
// 9.判斷這個(gè)事件具體是什么
if (sk.isAcceptable()) {
// 10.直接獲取當(dāng)前接入的客戶端通道
SocketChannel channel = ssChannel.accept();
// 11.切換非阻塞模式
channel.configureBlocking(false);
// 12.將該通道注冊(cè)到選擇器上
channel.register(selector, SelectionKey.OP_READ);
} else if (sk.isReadable()) {
// 13.獲取當(dāng)前選擇器上讀就緒狀態(tài)的通道
SocketChannel sChannel = (SocketChannel) sk.channel();
// 14.讀取數(shù)據(jù)
ByteBuffer buf = ByteBuffer.allocate(1024);
int len = 0;
while ((len = sChannel.read(buf)) > 0) {
buf.flip();
System.out.println(new String(buf.array(), 0, buf.remaining()));
buf.clear();
}
}
// 15.取消選擇鍵SelectionKey
it.remove();
}
}
}
}
2)、客戶端實(shí)現(xiàn)
public class Client {
public static void main(String[] args) throws IOException {
// 1.獲取通道
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9999));
// 2.切換非阻塞模式
sChannel.configureBlocking(false);
// 3.分配指定大小的緩沖區(qū)
ByteBuffer buf = ByteBuffer.allocate(1024);
// 4.發(fā)送數(shù)據(jù)給服務(wù)端
Scanner scan = new Scanner(System.in);
while (scan.hasNext()) {
String str = scan.nextLine();
buf.put((new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(System.currentTimeMillis())
+ "\n" + str).getBytes());
buf.flip();
sChannel.write(buf);
buf.clear();
}
// 5.關(guān)閉通道
sChannel.close();
}
}
NIO非阻塞通信模型圖:
9、NIO網(wǎng)絡(luò)編程應(yīng)用實(shí)例-群聊系統(tǒng)
1)、需求
編寫一個(gè)NIO群聊系統(tǒng),實(shí)現(xiàn)客戶端與客戶端的通信需求(非阻塞)
服務(wù)器端:可以監(jiān)測(cè)用戶上線、離線,并實(shí)現(xiàn)消息轉(zhuǎn)發(fā)功能
客戶端:通過Channel可以無阻塞發(fā)送消息給其它所有客戶端用戶,同時(shí)可以接受其它客戶端用戶通過服務(wù)端轉(zhuǎn)發(fā)來的消息
2)、服務(wù)端實(shí)現(xiàn)
public class Server {
private Selector selector;
private ServerSocketChannel ssChannel;
private static final int PORT = 9999;
public Server() {
try {
selector = Selector.open();
ssChannel = ServerSocketChannel.open();
ssChannel.bind(new InetSocketAddress(PORT));
ssChannel.configureBlocking(false);
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 監(jiān)聽
*/
public void listen() {
try {
while (selector.select() > 0) {
Iterator
while (it.hasNext()) {
SelectionKey sk = it.next();
if (sk.isAcceptable()) {
SocketChannel sChannel = ssChannel.accept();
sChannel.configureBlocking(false);
System.out.println(sChannel.getRemoteAddress() + "上線");
sChannel.register(selector, SelectionKey.OP_READ);
} else if (sk.isReadable()) {
readClientData(sk);
}
it.remove();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 讀取客戶端消息
*
* @param sk
*/
private void readClientData(SelectionKey sk) {
SocketChannel sChannel = null;
try {
sChannel = (SocketChannel) sk.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = sChannel.read(buffer);
if (count > 0) {
buffer.flip();
String msg = new String(buffer.array(), 0, buffer.remaining());
System.out.println("接收到了客戶端消息:" + msg);
// 向其它的客戶端轉(zhuǎn)發(fā)消息(去掉自己)
sendInfoToOtherClients(msg, sChannel);
}
} catch (IOException e) {
e.printStackTrace();
try {
System.out.println(sChannel.getRemoteAddress() + "離線了..");
sk.cancel();
sChannel.close();
} catch (IOException e2) {
e2.printStackTrace();
}
}
}
/**
* 轉(zhuǎn)發(fā)消息給其它客戶端
*
* @param msg
* @param sChannel
*/
private void sendInfoToOtherClients(String msg, SocketChannel sChannel) throws IOException {
System.out.println("服務(wù)器轉(zhuǎn)發(fā)消息中...");
// 遍歷所有注冊(cè)到selector上的SocketChannel并排除sChannel
for (SelectionKey key : selector.keys()) {
Channel targetChannel = key.channel();
// 排除自己
if (targetChannel instanceof SocketChannel && targetChannel != sChannel) {
SocketChannel dest = (SocketChannel) targetChannel;
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
dest.write(buffer);
}
}
}
public static void main(String[] args) {
Server server = new Server();
server.listen();
}
}
3)、客戶端實(shí)現(xiàn)
public class Client {
private Selector selector;
private SocketChannel socketChannel;
private String username;
private static final String IP = "127.0.0.1";
private static final int PORT = 9999;
public Client() {
try {
selector = Selector.open();
socketChannel = SocketChannel.open(new InetSocketAddress(IP, PORT));
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
username = socketChannel.getLocalAddress().toString().substring(1);
System.out.println("當(dāng)前客戶端準(zhǔn)備完成...");
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 讀取從服務(wù)器端回復(fù)的消息
*
* @throws IOException
*/
private void readInfo() throws IOException {
while (selector.select() > 0) {
Iterator
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
sc.read(buffer);
System.out.println(new String(buffer.array()).trim());
}
iterator.remove();
}
}
}
/**
* 向服務(wù)器發(fā)送消息
*
* @param s
*/
private void sendToServer(String s) {
s = username + "說:" + s;
try {
socketChannel.write(ByteBuffer.wrap(s.getBytes()));
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Client client = new Client();
new Thread(() -> {
try {
client.readInfo();
} catch (IOException e) {
e.printStackTrace();
}
}).start();
Scanner sc = new Scanner(System.in);
while (sc.hasNextLine()) {
String s = sc.nextLine();
client.sendToServer(s);
}
}
}
四、AIO簡(jiǎn)介
Java AIO(NIO 2.0)異步非阻塞,服務(wù)器實(shí)現(xiàn)模式為一個(gè)有效請(qǐng)求一個(gè)線程,客戶端的I/O請(qǐng)求都是由操作系統(tǒng)先完成了再通知服務(wù)器應(yīng)用去啟動(dòng)線程進(jìn)行處理
BIONIOAIOSocketSocketChannelAsynchronousSocketChannelServerSocketServerSocketChannelAsynchronousServerSocketChannel
與NIO不同,當(dāng)進(jìn)行讀寫操作時(shí),只須直接調(diào)用API的read或write方法即可,這兩種方法均為異步的,對(duì)于讀操作而言,當(dāng)有流可讀取時(shí),操作系統(tǒng)會(huì)將可讀的流傳入read方法的緩沖區(qū),對(duì)于寫操作而言,當(dāng)操作系統(tǒng)將write方法傳遞的流寫入完畢時(shí),操作系統(tǒng)主動(dòng)通知應(yīng)用程序
即可以理解為,read/write方法都是異步的,完成后會(huì)主動(dòng)調(diào)用回調(diào)函數(shù)。在JDK1.7中,這部分內(nèi)容被稱作NIO 2.0,主要在Java.nio.channels包下增加了下面四個(gè)異步通道:
AsynchronousSocketChannelAsynchronousServerSocketChannelAsynchronousFileChannelAsynchronousDatagramChannel
推薦資料:
IO模式講解(AIO&BIO&NIO)
柚子快報(bào)激活碼778899分享:BIO、NIO、AIO詳解
精彩鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。