柚子快報(bào)邀請(qǐng)碼778899分享:簡(jiǎn)述 BIO 、NIO 模型
柚子快報(bào)邀請(qǐng)碼778899分享:簡(jiǎn)述 BIO 、NIO 模型
BIO : 同步阻塞I/O(Block IO)
?????????服務(wù)器實(shí)現(xiàn)模式為每一個(gè)連接一個(gè)線程,即客戶端有連接請(qǐng)求時(shí)服務(wù)器就需要啟動(dòng)一個(gè)線程進(jìn)行處理,如果這個(gè)連接不做任何事情會(huì)造成不必要的線程開(kāi)銷,此處可以通過(guò)線程池機(jī)制進(jìn)行優(yōu)化。
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
/**
* 多人聊天室 - 服務(wù)端
*/
public class BioServer {
static List
public static void main(String[] args) throws Exception {
int port = 8080;
ServerSocket serverSocket = new ServerSocket(port);
while (true) {
Socket client = serverSocket.accept();
System.out.println("客戶端: " + client.getPort() + " 連接成功!");
clientList.add(client);
forwardProcess(client);
}
}
/**
* 轉(zhuǎn)發(fā)處理
*/
public static void forwardProcess(Socket socket) {
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
forwardMsg(socket);
}
}
}).start();
}
/**
* 轉(zhuǎn)發(fā)消息
*/
public static void forwardMsg(Socket socket) {
try {
String msg = readMsg(socket);
System.out.println(msg);
for (Socket client : clientList) {
if (client != socket) {
writeMsg(client, msg);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 寫(xiě)入消息
*/
public static void writeMsg(Socket socket, String msg) throws IOException {
PrintStream ps = new PrintStream(socket.getOutputStream());
ps.println(msg);
ps.flush();
}
/**
* 讀取消息
*/
public static String readMsg(Socket socket) throws IOException {
InputStream inputStream = socket.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
String msg;
if ((msg = br.readLine()) != null) {
msg = socket.getPort() + " 說(shuō): " + msg;
}
return msg;
}
}
import java.io.*;
import java.net.Socket;
import java.util.Scanner;
public class BilClient {
public static void main(String[] args) throws IOException {
String ip = "127.0.0.1";
int port = 8080;
Socket client = new Socket(ip, port);
readProcess(client);
OutputStream os = client.getOutputStream();
PrintStream ps = new PrintStream(os);
Scanner scanner = new Scanner(System.in);
while (true) {
String input = scanner.nextLine();
ps.println(input);
ps.flush();
}
}
/**
* 讀取處理
*/
public static void readProcess(Socket socket) {
new Thread(() -> {
while (true) {
try {
System.out.println(readMsg(socket));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}).start();
}
/**
* 讀取消息
*/
public static String readMsg(Socket socket) throws IOException {
InputStream inputStream = socket.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
String msg;
if ((msg = br.readLine()) != null) {
}
return msg;
}
}
?NIO: 同步非阻塞式IO,服務(wù)器實(shí)現(xiàn)模式為一個(gè)線程處理多個(gè)請(qǐng)求(連接),即客戶端發(fā)送的連接請(qǐng)求會(huì)被注冊(cè)到多路復(fù)用器上,多路復(fù)用器輪詢到有 I/O 請(qǐng)求就會(huì)進(jìn)行處理。 Channel,翻譯過(guò)來(lái)就是“通道”,就是數(shù)據(jù)傳輸?shù)墓艿?,類似于“流”,但是與“流”又有著區(qū)別。
既可以從Channel中讀取數(shù)據(jù),又可以寫(xiě)數(shù)據(jù)到Channel,但流的讀寫(xiě)通常是單向的——輸入輸出流通道可以異步讀寫(xiě)通道中的數(shù)據(jù)總是先讀取到buffer(緩沖區(qū)),或者總是需要從一個(gè)buffer寫(xiě)入,不能直接訪問(wèn)數(shù)據(jù)非阻塞特性:Channel在設(shè)計(jì)上采用了非阻塞的特性,它不會(huì)像傳統(tǒng)的流一樣在讀寫(xiě)操作上阻塞線程,而是立即返回結(jié)果,告訴調(diào)用者當(dāng)前的狀態(tài)。這使得程序可以在等待數(shù)據(jù)準(zhǔn)備的過(guò)程中同時(shí)進(jìn)行其他操作,實(shí)現(xiàn)了非阻塞IO。事件通知機(jī)制:Channel通常搭配選擇器(Selector)來(lái)使用,選擇器能夠檢測(cè)多個(gè)Channel的就緒狀態(tài),如是否可讀、可寫(xiě)等,并通過(guò)事件通知(例如輪詢或回調(diào))及時(shí)地通知程序哪些Channel處于就緒狀態(tài),從而可以進(jìn)行相應(yīng)的讀寫(xiě)操作。這種機(jī)制支持程序?qū)崿F(xiàn)異步IO模型。操作系統(tǒng)底層支持:Channel的異步讀寫(xiě)也依賴于操作系統(tǒng)底層的異步IO支持。Java NIO中的Channel實(shí)際上是對(duì)操作系統(tǒng)底層異步IO的封裝和抽象,利用了操作系統(tǒng)提供的異步IO機(jī)制來(lái)實(shí)現(xiàn)其自身的異步讀寫(xiě)功能。Buffer是一個(gè)對(duì)象,里面是要寫(xiě)入或者讀出的數(shù)據(jù),在java.nio庫(kù)中,所有的數(shù)據(jù)都是用緩沖區(qū)處理的。
在讀取數(shù)據(jù)時(shí),它是直接讀到緩沖區(qū)中的;在寫(xiě)入數(shù)據(jù)時(shí),也是直接寫(xiě)到緩沖區(qū)中,任何時(shí)候訪問(wèn)Channel中的數(shù)據(jù),都是通過(guò)緩沖區(qū)進(jìn)行操作的。緩沖區(qū)實(shí)質(zhì)上是一個(gè)數(shù)組,通常是一個(gè)字節(jié)數(shù)組ByteBuffer,當(dāng)然也有其他類型的:Selector被稱為選擇器,Selector會(huì)不斷地輪詢注冊(cè)在其上的Channel,如果某個(gè)Channel上發(fā)生讀或?qū)懯录?,這個(gè)Channel就被判定處于就緒狀態(tài),會(huì)被Selector輪詢出來(lái),然后通過(guò)SelectionKey可以獲取到就緒Channel的集合,進(jìn)行后續(xù)的I/O操作。
一個(gè)多路復(fù)用器Selector可以同時(shí)輪詢多個(gè)Channel,JDK使用了epoll()代替了傳統(tǒng)的select實(shí)現(xiàn),所以并沒(méi)有最大連接句柄的限制,這意味著只需要一個(gè)線程負(fù)責(zé)Selector的輪詢,就可以接入成千上萬(wàn)的客戶端。
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Set;
/**
* NIO 聊天室 服務(wù)端
*/
public class NioServer {
private Integer port;
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
ByteBuffer writerBuffer = ByteBuffer.allocate(1024);
private Charset charset = Charset.forName("UTF-8");
public NioServer(Integer port) {
this.port = port;
}
public static void main(String[] args) {
NioServer nioServer = new NioServer(8080);
nioServer.start();
}
private void start() {
try {
// 開(kāi)啟socket
ServerSocketChannel server = ServerSocketChannel.open();
// 設(shè)置非阻塞
server.configureBlocking(false);
// 綁定端口
server.socket().bind(new InetSocketAddress(port));
// 開(kāi)啟通道, 得到 Selector (選擇器)
Selector selector = Selector.open();
// 注冊(cè) selector 監(jiān)聽(tīng)事件
server.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("啟動(dòng)服務(wù)器, 監(jiān)聽(tīng)端口:" + port + "...");
while (true) {
// 阻塞監(jiān)控所有注冊(cè)的通道,當(dāng)有對(duì)應(yīng)的事件操作時(shí), 會(huì)將SelectionKey放入 集合內(nèi)部并返回事件數(shù)量
selector.select();
// 返回存有SelectionKey的集合
Set
for (SelectionKey selectionKey : selectionKeys) {
handle(selectionKey, selector);
}
// 處理后清理 selectionKeys
selectionKeys.clear();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 事件處理
*/
private void handle(SelectionKey key, Selector selector) throws IOException {
// SelectionKey 常用方法
//isAcceptable() 是否是連接繼續(xù)事件
//isConnectable() 是否是連接就緒事件
//isReadable() 是否是讀就緒事件
//isWritable() 是否是寫(xiě)就緒事件
// SelectionKey 常用事件
//SelectionKey.OP_ACCEPT 接收連接繼續(xù)事件,表示服務(wù)器監(jiān)聽(tīng)到了客戶連接,服務(wù)器可以接收這個(gè)連接了
//SelectionKey.OP_CONNECT 連接就緒事件,表示客戶端與服務(wù)器的連接已經(jīng)建立成功
//SelectionKey.OP_READ 讀就緒事件,表示通道中已經(jīng)有了可讀的數(shù)據(jù),可以執(zhí)行讀操作了(通道目前有數(shù)據(jù),可以進(jìn)行讀操作了)
//SelectionKey.OP_WRITE 寫(xiě)就緒事件,表示已經(jīng)可以向通道寫(xiě)數(shù)據(jù)了(通道目前可以用于寫(xiě)操作)
//處理連接
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
System.out.println(client.socket().getPort() + " 已建立連接 ...");
}
// 讀取消息
else if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
String msg = client.socket().getPort() + " 說(shuō): " + readMsg(client);
System.out.println(msg);
forwardMsg(msg, client, selector);
}
}
/**
* 讀取通道消息
*/
private String readMsg(SocketChannel client) throws IOException {
readBuffer.clear();
while (client.read(readBuffer) > 0) ;
readBuffer.flip();
return String.valueOf(charset.decode(readBuffer));
}
/**
* 轉(zhuǎn)發(fā)
*/
private void forwardMsg(String msg, SocketChannel client, Selector selector) throws IOException {
for (SelectionKey key : selector.keys()) {
Channel connectedClient = key.channel();
if (connectedClient instanceof ServerSocketChannel) {
continue;
}
if (key.isValid() && !client.equals(connectedClient)) {
writerBuffer.clear();
writerBuffer.put(charset.encode(msg));
writerBuffer.flip();
while (writerBuffer.hasRemaining())
((SocketChannel) connectedClient).write(writerBuffer);
}
}
}
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.Set;
/**
* NIO 聊天室 客戶端
*/
public class NioClient {
private String ip;
private Integer port;
private ByteBuffer writerBuffer = ByteBuffer.allocate(1024);
private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
private Charset charset = Charset.forName("UTF-8");
public NioClient(String ip, Integer port) {
this.ip = ip;
this.port = port;
}
public static void main(String[] args) {
NioClient nioClient = new NioClient("127.0.0.1", 8080);
nioClient.start();
}
public void start() {
try {
// 開(kāi)啟通道
SocketChannel client = SocketChannel.open();
// 設(shè)置非阻塞
client.configureBlocking(false);
Selector selector = Selector.open();
client.register(selector, SelectionKey.OP_CONNECT);
client.connect(new InetSocketAddress(ip, port));
while (true) {
selector.select();
Set
for (SelectionKey selectionKey : selectionKeys) {
handle(selectionKey, selector);
}
selectionKeys.clear();
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void handle(SelectionKey key, Selector selector) throws IOException {
// 處理連接事件
if (key.isConnectable()) {
SocketChannel client = (SocketChannel) key.channel();
if (client.isConnectionPending()) {
client.finishConnect();
// 處理用戶輸入
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String msg = scanner.nextLine();
writerBuffer.clear();
writerBuffer.put(charset.encode(msg));
writerBuffer.flip();
while (writerBuffer.hasRemaining()) {
try {
client.write(writerBuffer);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}).start();
}
client.register(selector, SelectionKey.OP_READ);
}
// 讀取消息信息
else if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
String s = readMsg(client);
System.out.println(s);
}
}
private String readMsg(SocketChannel client) throws IOException {
readBuffer.clear();
while (client.read(readBuffer) > 0) ;
readBuffer.flip();
return String.valueOf(charset.decode(readBuffer));
}
}
柚子快報(bào)邀請(qǐng)碼778899分享:簡(jiǎn)述 BIO 、NIO 模型
推薦文章
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。