柚子快報(bào)邀請(qǐng)碼778899分享:Java網(wǎng)絡(luò)編程BIO/NIO
柚子快報(bào)邀請(qǐng)碼778899分享:Java網(wǎng)絡(luò)編程BIO/NIO
Java網(wǎng)絡(luò)編程
網(wǎng)絡(luò)編程的基礎(chǔ)知識(shí)
Socket
Socket是應(yīng)用層與TCP/IP協(xié)議族通信的中間軟件抽象層,它是一組接口。在設(shè)計(jì)模式中,Socket其實(shí)就是一個(gè)門(mén)面模式,它把復(fù)雜的TCP/IP協(xié)議族隱藏在Socket接口后面,對(duì)用戶(hù)來(lái)說(shuō),一組簡(jiǎn)單的接口就是全部,讓Socket去組織數(shù)據(jù),以符合指定的協(xié)議。
主機(jī) A 的應(yīng)用程序要能和主機(jī) B 的應(yīng)用程序通信,必須通過(guò) Socket 建立連接,而建立 Socket 連接必須需要底層TCP/IP 協(xié)議來(lái)建立 TCP 連接。建立 TCP 連接需要底層 IP 協(xié)議來(lái)尋址網(wǎng)絡(luò)中的主機(jī)。
我們知道網(wǎng)絡(luò)層使用的 IP 協(xié)議可以幫助我們根據(jù) IP 地址來(lái)找到目標(biāo)主機(jī),但是一臺(tái)主機(jī)上可能運(yùn)行著多個(gè)應(yīng)用程序,如何才能與指定的應(yīng)用程序通信就要通過(guò) TCP 或 UPD 的地址也就是端口號(hào)來(lái)指定。這樣就可以通過(guò)一個(gè) Socket 實(shí)例唯一代表一個(gè)主機(jī)上的一個(gè)應(yīng)用程序的通信鏈路了。
短連接與長(zhǎng)連接
短連接:
連接->傳輸數(shù)據(jù)->關(guān)閉連接
傳統(tǒng)HTTP是無(wú)狀態(tài)的,瀏覽器和服務(wù)器每進(jìn)行一次HTTP操作,就建立一次連接,但任務(wù)結(jié)束就中斷連接。
也可以這樣說(shuō):短連接是指SOCKET連接后發(fā)送后接收完數(shù)據(jù)后馬上斷開(kāi)連接。
長(zhǎng)連接:
連接->傳輸數(shù)據(jù)->保持連接 -> 傳輸數(shù)據(jù)-> 。。。 ->關(guān)閉連接。
長(zhǎng)連接指建立SOCKET連接后不管是否使用都保持連接。
什么時(shí)候用長(zhǎng)連接,短連接?
長(zhǎng)連接多用于操作頻繁,點(diǎn)對(duì)點(diǎn)的通訊,而且連接數(shù)不能太多情況,。每個(gè)TCP連接都需要三步握手,這需要時(shí)間,如果每個(gè)操作都是先連接,再操作的話那么處理速度會(huì)降低很多,所以每個(gè)操作完后都不斷開(kāi),下次處理時(shí)直接發(fā)送數(shù)據(jù)包就OK了,不用建立TCP連接。例如:數(shù)據(jù)庫(kù)的連接用長(zhǎng)連接, 如果用短連接頻繁的通信會(huì)造成socket錯(cuò)誤,而且頻繁的socket 創(chuàng)建也是對(duì)資源的浪費(fèi)。
而像WEB網(wǎng)站的http服務(wù)一般都用短鏈接,因?yàn)殚L(zhǎng)連接對(duì)于服務(wù)端來(lái)說(shuō)會(huì)耗費(fèi)一定的資源,而像WEB網(wǎng)站這么頻繁的成千上萬(wàn)甚至上億客戶(hù)端的連接用短連接會(huì)更省一些資源。
網(wǎng)絡(luò)通訊流程
在通信編程里提供服務(wù)的叫服務(wù)端,連接服務(wù)端使用服務(wù)的叫客戶(hù)端。
在開(kāi)發(fā)過(guò)程中,如果類(lèi)的名字有Server或者ServerSocket的,表示這個(gè)類(lèi)是給服務(wù)端容納網(wǎng)絡(luò)服務(wù)用的,如果類(lèi)的名字只有Socket的,那么表示這是負(fù)責(zé)具體的網(wǎng)絡(luò)讀寫(xiě)的。那么對(duì)于服務(wù)端來(lái)說(shuō)ServerSocket就只是個(gè)場(chǎng)所(娛樂(lè)場(chǎng)所),具體和客戶(hù)端溝通的還是一個(gè)一個(gè)的socket(娛樂(lè)事件),所以在通信編程里,ServerSocket并不負(fù)責(zé)具體的網(wǎng)絡(luò)讀寫(xiě),ServerSocket就只是負(fù)責(zé)接收客戶(hù)端連接后,新啟一個(gè)socket來(lái)和客戶(hù)端進(jìn)行溝通。這一點(diǎn)對(duì)所有模式的通信編程都是適用的。
在通信編程里,我們關(guān)注的其實(shí)也就是三個(gè)事情
1、連接(客戶(hù)端連接服務(wù)器,服務(wù)器等待和接收連接)
2、讀網(wǎng)絡(luò)數(shù)據(jù)
3、寫(xiě)網(wǎng)絡(luò)數(shù)據(jù)
所有模式的通信編程都是圍繞著這三件事情進(jìn)行的。服務(wù)端提供IP和監(jiān)聽(tīng)端口,客戶(hù)端通過(guò)連接操作想服務(wù)端監(jiān)聽(tīng)的地址發(fā)起連接請(qǐng)求,通過(guò)三次握手連接,如果連接成功建立,雙方就可以通過(guò)套接字進(jìn)行通信。
JDK網(wǎng)絡(luò)編程(BIO)
傳統(tǒng)的同步阻塞模型開(kāi)發(fā)中,ServerSocket負(fù)責(zé)綁定IP地址,啟動(dòng)監(jiān)聽(tīng)端口;Socket負(fù)責(zé)發(fā)起連接操作。連接成功后,雙方通過(guò)輸入和輸出流進(jìn)行同步阻塞式通信。代碼如下:
public class BIOClient {
public static void main(String[] args) {
Socket client = new Socket();
try {
client.connect(new InetSocketAddress("127.0.0.1", 12345));
System.out.println("客戶(hù)端發(fā)送數(shù)據(jù):");
BufferedReader sys=new BufferedReader(new InputStreamReader(System.in));
String msg = sys.readLine();
BufferedWriter bo = new BufferedWriter(new OutputStreamWriter(client.getOutputStream()));
BufferedReader br = new BufferedReader(new InputStreamReader(client.getInputStream()));
bo.write(msg);
bo.newLine();
bo.flush();
msg = br.readLine();
System.out.println("接受服務(wù)器消息:" + msg);
bo.close();
br.close();
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public class BIOServer {
public static void main(String[] args) {
try {
ServerSocket ss = new ServerSocket(12345);
System.out.println("服務(wù)端啟動(dòng)");
while (true) {
new Thread(new ServerTask(ss.accept())).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
class ServerTask implements Runnable {
Socket socket;
public ServerTask(Socket s) {
this.socket = s;
}
@Override
public void run() {
try {
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String msg = br.readLine();
System.out.println("接收客戶(hù)端數(shù)據(jù):" + msg);
bw.write("hello client");
bw.newLine();
bw.flush();
bw.close();
br.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
socket.close();
} catch (Exception e) {
}
}
}
}
NIO網(wǎng)絡(luò)編程
網(wǎng)絡(luò)三種I/O模型
分類(lèi): BIO:(同步 阻塞)jdk1.4以前 java.io包 NIO:(同步 非阻塞)jdk1.4 java.nio包 AIO:(異步 非阻塞)jdk1.7 java.nio包
如何如何理解:
當(dāng)我們?nèi)コ燥埧梢杂幸韵聨追N模式:
飯店 -> 服務(wù)器
飯菜-> 數(shù)據(jù)
飯菜好了-> 數(shù)據(jù)就緒
端菜 /送菜 -> 數(shù)據(jù)讀取
BIO:食堂排隊(duì)打飯模式:排隊(duì)在窗口,打好才走; NIO:點(diǎn)單、等待被叫模式:等待被叫,好了自己去端; AIO:包廂模式:點(diǎn)單后菜直接被端上桌。
阻塞與非阻塞
菜沒(méi)好,要不要死等 -> 數(shù)據(jù)就緒前要不要等待? 阻塞: 沒(méi)有數(shù)據(jù)傳過(guò)來(lái)時(shí),讀會(huì)阻塞直到有數(shù)據(jù);緩沖區(qū)滿(mǎn)時(shí),寫(xiě)操作也會(huì)阻塞。 非阻塞: 遇到這些情況,都是直接返回
同步與異步
菜好了,誰(shuí)端 -> 數(shù)據(jù)就緒后,數(shù)據(jù)操作誰(shuí)完成?
同步: 數(shù)據(jù)就緒后需要自己去讀是同步
異步: 數(shù)據(jù)就緒直接讀好再回調(diào)給程序是異步
NIO和BIO的主要區(qū)別
面向流與面向緩沖
Java NIO和IO之間第一個(gè)最大的區(qū)別是,IO是面向流的,NIO是面向緩沖區(qū)的。 Java IO面向流意味著每次從流中讀一個(gè)或多個(gè)字節(jié),直至讀取所有字節(jié),它們沒(méi)有被緩存在任何地方。此外,它不能前后移動(dòng)流中的數(shù)據(jù)。如果需要前后移動(dòng)從流中讀取的數(shù)據(jù),需要先將它緩存到一個(gè)緩沖區(qū)。
Java NIO的緩沖導(dǎo)向方法略有不同。數(shù)據(jù)讀取到一個(gè)它稍后處理的緩沖區(qū),需要時(shí)可在緩沖區(qū)中前后移動(dòng)。這就增加了處理過(guò)程中的靈活性。但是,還需要檢查是否該緩沖區(qū)中包含所有需要處理的數(shù)據(jù)。而且,需確保當(dāng)更多的數(shù)據(jù)讀入緩沖區(qū)時(shí),不要覆蓋緩沖區(qū)里尚未處理的數(shù)據(jù)。
NIO三大核心組件
NIO有三大核心組件:Selector選擇器、Channel管道、buffer緩沖區(qū)。
Selector
Selector的英文含義是“選擇器”,也可以稱(chēng)為為“輪詢(xún)代理器”、“事件訂閱器”、“channel容器管理機(jī)”都行。
Java NIO的選擇器允許一個(gè)單獨(dú)的線程來(lái)監(jiān)視多個(gè)輸入通道,你可以注冊(cè)多個(gè)通道使用一個(gè)選擇器(Selectors),然后使用一個(gè)單獨(dú)的線程來(lái)操作這個(gè)選擇器,進(jìn)而“選擇”通道:這些通道里已經(jīng)有可以處理的輸入,或者選擇已準(zhǔn)備寫(xiě)入的通道。這種選擇機(jī)制,使得一個(gè)單獨(dú)的線程很容易來(lái)管理多個(gè)通道。
應(yīng)用程序?qū)⑾騍elector對(duì)象注冊(cè)需要它關(guān)注的Channel,以及具體的某一個(gè)Channel會(huì)對(duì)哪些IO事件感興趣。Selector中也會(huì)維護(hù)一個(gè)“已經(jīng)注冊(cè)的Channel”的容器。
Channel
通道,被建立的一個(gè)應(yīng)用程序和操作系統(tǒng)交互事件、傳遞內(nèi)容的渠道(注意是連接到操作系統(tǒng))。那么既然是和操作系統(tǒng)進(jìn)行內(nèi)容的傳遞,那么說(shuō)明應(yīng)用程序可以通過(guò)通道讀取數(shù)據(jù),也可以通過(guò)通道向操作系統(tǒng)寫(xiě)數(shù)據(jù),而且可以同時(shí)進(jìn)行讀寫(xiě)。
所有被Selector(選擇器)注冊(cè)的通道,只能是繼承了SelectableChannel類(lèi)的子類(lèi)。ServerSocketChannel:應(yīng)用服務(wù)器程序的監(jiān)聽(tīng)通道。只有通過(guò)這個(gè)通道,應(yīng)用程序才能向操作系統(tǒng)注冊(cè)支持“多路復(fù)用IO”的端口監(jiān)聽(tīng)。同時(shí)支持UDP協(xié)議和TCP協(xié)議。ScoketChannel:TCP Socket套接字的監(jiān)聽(tīng)通道,一個(gè)Socket套接字對(duì)應(yīng)了一個(gè)客戶(hù)端IP:端口到服務(wù)器IP:端口的通信連接。
通道中的數(shù)據(jù)總是要先讀到一個(gè)Buffer,或者總是要從一個(gè)Buffer中寫(xiě)入。
buffer緩沖區(qū)
網(wǎng)絡(luò)通訊中負(fù)責(zé)數(shù)據(jù)讀寫(xiě)的區(qū)域
NIO工作流程圖
NIO網(wǎng)絡(luò)編程
客戶(hù)端
public class Client {
private static NioClientHandle nioClientHandle;
public static void start() {
if (null == nioClientHandle) {
nioClientHandle = new NioClientHandle("127.0.0.1", 5555);
}
new Thread(nioClientHandle, "client").start();
}
public static boolean sendMsg(String msg) throws IOException {
nioClientHandle.sendMsg(msg);
return true;
}
public static void main(String[] args) throws IOException {
start();
Scanner sc = new Scanner(System.in);
while (Client.sendMsg(sc.next())) ;
}
public class NioClientHandle implements Runnable {
private String ip;
private Integer port;
private Selector selector;
private SocketChannel channel;
private volatile boolean started;
public NioClientHandle(String ip, int port) {
this.ip = ip;
this.port = port;
try {
//創(chuàng)建選擇器
selector = Selector.open();
//打開(kāi)監(jiān)聽(tīng)通道
channel = SocketChannel.open();
//false :非阻塞 true: 阻塞
channel.configureBlocking(false);
started = true;
} catch (IOException e) {
e.printStackTrace();
}
}
public void stop() {
this.started = false;
}
@Override
public void run() {
boolean connect = false;
try {
connect = channel.connect(new InetSocketAddress(ip, port));
if (connect) {
channel.register(selector, SelectionKey.OP_READ);
} else {
channel.register(selector, SelectionKey.OP_CONNECT);
}
} catch (IOException e) {
e.printStackTrace();
}
while (started) {
try {
selector.select();
Set
Iterator
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
handle(key);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handle(SelectionKey key) throws IOException {
if (key.isValid()) {
SocketChannel channel = (SocketChannel) key.channel();
if (key.isConnectable()) {
if (channel.finishConnect()) {
channel.register(selector, SelectionKey.OP_READ);
} else {
System.exit(-1);
}
}
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
int readBytes = channel.read(buffer);
if (readBytes > 0) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
System.out.println(new String(bytes, StandardCharsets.UTF_8));
}
}
}
}
public void sendMsg(String msg) throws IOException {
doWrite(channel, msg);
}
public static void doWrite(SocketChannel channel, String msg) throws IOException {
//消息轉(zhuǎn)成字節(jié)數(shù)據(jù)
byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
//開(kāi)辟一個(gè)內(nèi)存空間
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
//消息寫(xiě)入ByteBuffer
writeBuffer.put(bytes);
//翻轉(zhuǎn),充值ByteBuffer
writeBuffer.flip();
//通過(guò)channel寫(xiě)出數(shù)據(jù)
channel.write(writeBuffer);
}
}
服務(wù)端
public class NioServer {
private static NioServerHandle nioServerHandle;
public static void start(){
if(nioServerHandle !=null) nioServerHandle.stop();
nioServerHandle = new NioServerHandle(5555);
new Thread(nioServerHandle,"Server").start();
}
public static void main(String[] args){
start();
}
}
public class NioServerHandle implements Runnable{
private Selector selector;//reactor
private ServerSocketChannel serverChannel;
private volatile boolean started;
public NioServerHandle(int port) {
try{
//創(chuàng)建選擇器
selector = Selector.open();
//打開(kāi)監(jiān)聽(tīng)通道
serverChannel = ServerSocketChannel.open();
//如果為 true,則此通道將被置于阻塞模式;
// 如果為 false,則此通道將被置于非阻塞模式
serverChannel.configureBlocking(false);//開(kāi)啟非阻塞模式
serverChannel.socket().bind(new InetSocketAddress(port));
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
//標(biāo)記服務(wù)器已開(kāi)啟
started = true;
System.out.println("服務(wù)器已啟動(dòng),端口號(hào):" + port);
}catch(Exception e){
e.printStackTrace();
System.exit(1);
}
}
public void stop(){
started = false;
}
@Override
public void run() {
//循環(huán)遍歷selector
while(started){
try{
//阻塞,只有當(dāng)至少一個(gè)注冊(cè)的事件發(fā)生的時(shí)候才會(huì)繼續(xù).
selector.select();
Set
Iterator
SelectionKey key = null;
while(it.hasNext()){
key = it.next();
it.remove();
try{
handleInput(key);
}catch(Exception e){
if(key != null){
key.cancel();
if(key.channel() != null){
key.channel().close();
}
}
}
}
}catch(Throwable t){
t.printStackTrace();
}
}
//selector關(guān)閉后會(huì)自動(dòng)釋放里面管理的資源
if(selector != null)
try{
selector.close();
}catch (Exception e) {
e.printStackTrace();
}
}
private void handleInput(SelectionKey key) throws IOException{
if(key.isValid()){
//處理新接入的請(qǐng)求消息
if(key.isAcceptable()){
ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
SocketChannel sc = ssc.accept();
System.out.println("建立連接");
sc.configureBlocking(false);
sc.register(selector,SelectionKey.OP_READ);
}
//讀消息
if(key.isReadable()){
System.out.println("socket channel 數(shù)據(jù)準(zhǔn)備完成,可以去讀取");
SocketChannel sc = (SocketChannel) key.channel();
//創(chuàng)建ByteBuffer,并開(kāi)辟一個(gè)1M的緩沖區(qū)
ByteBuffer buffer = ByteBuffer.allocate(1024);
//讀取請(qǐng)求碼流,返回讀取到的字節(jié)數(shù)
int readBytes = sc.read(buffer);
//讀取到字節(jié),對(duì)字節(jié)進(jìn)行編解碼
if(readBytes>0){
//將緩沖區(qū)當(dāng)前的limit設(shè)置為position,position=0,
// 用于后續(xù)對(duì)緩沖區(qū)的讀取操作
buffer.flip();
//根據(jù)緩沖區(qū)可讀字節(jié)數(shù)創(chuàng)建字節(jié)數(shù)組
byte[] bytes = new byte[buffer.remaining()];
//將緩沖區(qū)可讀字節(jié)數(shù)組復(fù)制到新建的數(shù)組中
buffer.get(bytes);
String message = new String(bytes,"UTF-8");
System.out.println("服務(wù)器收到消息:" + message);
//處理數(shù)據(jù)
String result = "服務(wù)器響應(yīng)消息:"+ message ;
//發(fā)送應(yīng)答消息
doWrite(sc,result);
}
//鏈路已經(jīng)關(guān)閉,釋放資源
else if(readBytes<0){
key.cancel();
sc.close();
}
}
}
}
//發(fā)送應(yīng)答消息
private void doWrite(SocketChannel channel,String response)
throws IOException {
//將消息編碼為字節(jié)數(shù)組
byte[] bytes = response.getBytes();
//根據(jù)數(shù)組容量創(chuàng)建ByteBuffer
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
//將字節(jié)數(shù)組復(fù)制到緩沖區(qū)
writeBuffer.put(bytes);
//flip操作
writeBuffer.flip();
//發(fā)送緩沖區(qū)的字節(jié)數(shù)組
channel.write(writeBuffer);
}
}
Reactor模式,核心流程:
注冊(cè)感興趣的事件 ->掃描是否有感興趣的事件發(fā)生 -> 事件發(fā)生后做出相應(yīng)的處理
單線程Reactor
多線程Reactor
主從多線程Reactor
柚子快報(bào)邀請(qǐng)碼778899分享:Java網(wǎng)絡(luò)編程BIO/NIO
精彩內(nèi)容
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。