柚子快報激活碼778899分享:java NIO專題學(xué)習(xí)(一)
柚子快報激活碼778899分享:java NIO專題學(xué)習(xí)(一)
一、BIO/NIO/AIO介紹
1. 背景說明
在Java的軟件設(shè)計開發(fā)中,通信架構(gòu)是不可避免的。我們在進(jìn)行不同系統(tǒng)或者不同進(jìn)程之間的數(shù)據(jù)交互,或者在高并發(fā)的通信場景下都需要用到網(wǎng)絡(luò)通信相關(guān)的技術(shù)。
對于一些經(jīng)驗豐富的程序員來說,Java早期的網(wǎng)絡(luò)通信架構(gòu)存在一些缺陷,其中最令人惱火的是基于性能低下的同步阻塞式的I/O通信(BIO)。隨著互聯(lián)網(wǎng)開發(fā)下通信性能的高要求,Java在2002年開始支持了非阻塞式的I/O通信技術(shù)(NIO)。大多數(shù)讀者在學(xué)習(xí)網(wǎng)絡(luò)通信相關(guān)技術(shù)的時候,都只是接觸到零碎的通信技術(shù)點,沒有完整的技術(shù)體系架構(gòu),以至于對Java的通信場景總是沒有清晰的解決方案。
本文將通過大量清晰直接的案例從最基礎(chǔ)的BlO式通信開始介紹到NIO、AIO,讀者可以清晰的了解到阻塞 、同步 、異步 的現(xiàn)象、概念和特征以及優(yōu)缺點。本文結(jié)合了大量的案例讓讀者可以快速了解每種通信架構(gòu)的使用 。
2. 技術(shù)要求
不適合0基礎(chǔ)需要掌握:Java SE基礎(chǔ)編程,如:Java多線程、Java IO編程、Java網(wǎng)絡(luò)基礎(chǔ)知識,還要了解一些Java設(shè)計模式能熟練掌握OOP編程,有一定的編程思維。
3. 通信技術(shù)整體上解決的問題
局域網(wǎng)內(nèi)的通信要求多系統(tǒng)間的底層消息傳遞機制高并發(fā)下,大數(shù)據(jù)量的通信場景需要,如:netty游戲行業(yè),無論是手游,還是大型的網(wǎng)絡(luò)游戲,Java語言越來越被廣泛應(yīng)用。
二、Java的IO演進(jìn)之路
1.I/O模型基本說明
I/O模型:就是用什么樣的通道或者說是通信模式和架構(gòu)進(jìn)行數(shù)據(jù)的傳輸和接收,很大程度上決定了程序通信的性能,Java共支持3種網(wǎng)絡(luò)編程的I/O模型:BlO. NIO. AlO
實際通信需求下,要根據(jù)不同的業(yè)務(wù)場景和性能需求決定選擇不同的I/O模型
2. I/O模型介紹(3種)
①Java BIO
同步阻塞(傳統(tǒng)阻塞型),服務(wù)器實現(xiàn)模式為一個連接一個線程,即客戶端有連接請求時服務(wù)器端就需要啟動一個線程進(jìn)行處理,如果這個連接不做任何事情就會造成不必要的線程開銷,以下是簡單示意圖:
②Java NIO
同步非阻塞,服務(wù)器實現(xiàn)模式為一個線程處理多個請求(連接),即客戶端發(fā)送的連接請求都會注冊到多路復(fù)用器上,多路復(fù)用器輪詢到連接有I/O請求就進(jìn)行處理【簡單示意圖】
③Java AIO
也叫NIO.2,異步非阻塞,服務(wù)器實現(xiàn)模式為一個有效請求一個線程,客戶端的I/O請求都是由OS先完成了再通知服務(wù)器應(yīng)用去啟動線程進(jìn)行處理,一般適用于連接數(shù)較多且連接時間較長的應(yīng)用
3. BIO、NIO、AIO適用場景分析
①BIO 方式適用于連接數(shù)目比較小且固定的架構(gòu),這種方式對服務(wù)器資源要求比較高,并局限于應(yīng)用中, jDK1.4以前的唯一選擇,但程序簡單易理解。
②NIO 方式適用于連接數(shù)目多且連接比較短(輕操作)的架構(gòu),比如聊天服務(wù)器,彈幕系統(tǒng),服務(wù)器間通訊等。 編程比較復(fù)雜,jDK1.4開始支持。
③AIO 方式使用于連接數(shù)目多且連接比較長(重操作)的架構(gòu),比如相冊服務(wù)器,充分調(diào)用OS參與并發(fā)操作, 編程比較復(fù)雜,JDK7開始支持。
三、JAVA BIO深入剖析
1. Java BIO基本介紹
Java BlO就是傳統(tǒng)的Java IO編程,其相關(guān)的類和接口在Java.io包中。
BIO(blocking I/O): 同步阻塞,服務(wù)器實現(xiàn)模式為一個連接一個線程,即客戶端有連接請求時服務(wù)器端就需要啟動一個線程進(jìn)行處理,如果這個連接不做任何事情會造成不必要的線程開銷,可以通過線程池機制改善 (實現(xiàn)多個客戶連接服務(wù)器)。
2. Java BIO工作機制
服務(wù)端和客戶端都是socket 服務(wù)端:
通過ServerSocket注冊端口服務(wù)端通過調(diào)用accept方法用于監(jiān)聽客戶端的Socket請求從Socket種獲取字節(jié)輸入或者輸出流進(jìn)行數(shù)據(jù)的讀寫操作 客戶端:
通過Socket對象請求與服務(wù)端的連接從Socket中得到字節(jié)輸入或者字節(jié)輸出流進(jìn)行數(shù)據(jù)的讀寫操作
3. 傳統(tǒng)的BIO編程實例回顧
網(wǎng)絡(luò)編程的基本模型是Client/Server模型,也就是兩個進(jìn)程之間進(jìn)行相互通信,其中服務(wù)端提供位置信息(綁定IP地址和端口),客戶端通過連接操作向服務(wù)端監(jiān)聽的端口地址發(fā)起連接請求,基于TCP協(xié)議下進(jìn)行三次握手連接,連接成功后,雙方通過網(wǎng)絡(luò)套接字(Socket)進(jìn)行通信。
傳統(tǒng)的同步阻塞模型開發(fā)中,服務(wù)端ServerSocket負(fù)責(zé)綁定IP地址,啟動監(jiān)聽端口;客戶端Socket負(fù)責(zé)發(fā)起連接操作。連接成功后,雙方通過輸入和輸出流進(jìn)行同步阻塞式通信。
基于BIO模式下的通信,客戶端-服務(wù)端是完全同步,完全藕合的。
服務(wù)端代碼:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
//服務(wù)端,目標(biāo):客戶端發(fā)送消息,服務(wù)端接收消息
public class Server {
public static void main(String[] args) {
System.out.println("======服務(wù)端啟動=======");
try {
//1.定義一個ServerSocket對象進(jìn)行服務(wù)端的端口注冊
ServerSocket ss = new ServerSocket(9999);
//2.監(jiān)聽客戶端的Socket連接請求
Socket socket = ss.accept();
//3.從Socket管道中得到一個字節(jié)輸入流對象
InputStream is = socket.getInputStream();
//4.把字節(jié)輸入流包裝成一個緩沖字符輸入流
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String msg;
if ((msg = reader.readLine()) != null) {
System.out.println("服務(wù)端接收到:"+msg);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
客戶端代碼:
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.Socket;
//客戶端,連接上服務(wù)端,然后發(fā)送消息到服務(wù)端
public class Client {
public static void main(String[] args) {
//1.創(chuàng)建和服務(wù)端的連接
try {
Socket socket = new Socket("127.0.0.1",9999);
//2.從socket對象中獲取一個字節(jié)輸出流
OutputStream os = socket.getOutputStream();
//3.把字節(jié)輸出流包裝成一個打印流
PrintStream ps = new PrintStream(os);
//4.輸出消息
ps.println("hello world!服務(wù)端,你好!");
ps.flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
啟動服務(wù)端后啟動客戶端,效果如下:
======服務(wù)端啟動=======
服務(wù)端接收到:hello world!服務(wù)端,你好!
小結(jié):
在以上通信中,服務(wù)端會一直等待客戶端的消息,如果客戶端沒有進(jìn)行消息的發(fā)送,服務(wù)端將一直進(jìn)入阻塞狀態(tài);同時服務(wù)端是按照行獲取消息的,這意味著客戶端也必須按照行進(jìn)行消息的發(fā)送,否則服務(wù)端將進(jìn)入等待消息的阻塞狀態(tài)!
4. BIO模式下多發(fā)和多收消息
上面的案例只能實現(xiàn)單發(fā)和單收消息,并不能實現(xiàn)反復(fù)的收消息和發(fā)消息。我們只需要在客戶端案例中,加上反復(fù)按照行發(fā)送消息的邏輯即可!案例代碼如下:
服務(wù)端代碼:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
//服務(wù)端,目標(biāo):服務(wù)端可以反復(fù)的接收消息,客戶端可以反復(fù)的發(fā)送消息
public class Server {
public static void main(String[] args) {
System.out.println("======服務(wù)端啟動=======");
try {
//1.定義一個ServerSocket對象進(jìn)行服務(wù)端的端口注冊
ServerSocket ss = new ServerSocket(9999);
//2.監(jiān)聽客戶端的Socket連接請求
Socket socket = ss.accept();
//3.從Socket管道中得到一個字節(jié)輸入流對象
InputStream is = socket.getInputStream();
//4.把字節(jié)輸入流包裝成一個緩沖字符輸入流
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
String msg;
while ((msg = reader.readLine()) != null) {
System.out.println("服務(wù)端接收到:"+msg);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
客戶端代碼:
public class Client {
public static void main(String[] args) {
//1.創(chuàng)建和服務(wù)端的連接
try {
Socket socket = new Socket("127.0.0.1",9999);
//2.從socket對象中獲取一個字節(jié)輸出流
OutputStream os = socket.getOutputStream();
//3.把字節(jié)輸出流包裝成一個打印流
PrintStream ps = new PrintStream(os);
Scanner scanner = new Scanner(System.in);
String msg;
while (true){
//4.輸出消息
System.out.println("請說:");
msg = scanner.nextLine();
ps.println(msg);
ps.flush();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
效果: 啟動服務(wù)端->啟動客戶端->在客戶端打字,服務(wù)端收到消息如下:
======服務(wù)端啟動=======
服務(wù)端接收到:你好
服務(wù)端接收到:今天在干嘛
服務(wù)端接收到:好的
5. BIO模式下接收多個客戶端
在上述的案例中,一個服務(wù)端只能接收一個客戶端的通信請求,那么如果服務(wù)端需要處理很多個客戶端的消息通信請求應(yīng)該如何處理呢 ,此時我們就需要在服務(wù)端引入線程了,也就是說客戶端每發(fā)起一個請求,服務(wù)端就創(chuàng)建一個新的線程 來處理這個客戶端的請求,這樣就實現(xiàn)了一個客戶端一個線程的模型,圖解模式如下:
服務(wù)端代碼:
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
//目標(biāo):實現(xiàn)服務(wù)端可以同時接收到多個客戶端的Socket通信需求
//思路:服務(wù)端每接收一個客戶端的socket請求對象之后都交給一個獨立的線程來處理客戶端的數(shù)據(jù)交互需求
public class Server {
public static void main(String[] args) {
try {
//1.注冊端口
ServerSocket ss = new ServerSocket(9999);
//2.定義一個死循環(huán),不斷接收客戶端的socket連接請求
while(true){
Socket socket = ss.accept();
//3.創(chuàng)建一個獨立的線程來處理與這個客戶端的socket通信需求
new ServerThreadReader(socket).start();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
線程處理代碼:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Socket;
public class ServerThreadReader extends Thread {
private Socket socket;
public ServerThreadReader(Socket socket){
this.socket = socket;
}
@Override
public void run() {
try {
//從socket對象中得到一個字節(jié)輸入流
InputStream is = socket.getInputStream();
//使用緩存字符輸入流包裝字節(jié)輸入流
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String msg;
while ((msg = br.readLine()) != null){
System.out.println(msg);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
客戶端代碼:
import java.io.PrintStream;
import java.net.Socket;
import java.util.Scanner;
public class Client {
public static void main(String[] args) {
try {
//1.請求與服務(wù)端的Socket對象連接
Socket socket = new Socket("127.0.0.1",9999);
//2. 得到一個打印流
PrintStream ps = new PrintStream(socket.getOutputStream());
//3. 使用循環(huán)不斷的發(fā)送消息給服務(wù)端接收
Scanner sc = new Scanner(System.in);
while (true){
System.out.println("請說:");
String msg =sc.nextLine();
ps.println(msg);
ps.flush();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
啟動服務(wù)端,啟動客戶端1、客戶端2,然后通過客戶端1和客戶端2分別發(fā)送消息,服務(wù)端效果如下:
客戶端1
客戶端2
小結(jié)
每個Socket接收到,都會創(chuàng)建一個線程,線程的競爭、切換上下文影響性能每個線程都會占用??臻g和CPU資源并不是每個socket都進(jìn)行l(wèi)O操作,無意義的線程處理客戶端的并發(fā)訪問增加時。服務(wù)端將呈現(xiàn)1:1的線程開銷,訪問量越大,系統(tǒng)將發(fā)生線程棧溢出線程創(chuàng)建失敗,最終導(dǎo)致進(jìn)程宕機或者僵死,從而不能對外提供服務(wù)
6. 偽異步I/O編程
在上述案例中:客戶端的并發(fā)訪問增加時。服務(wù)端將呈現(xiàn)1:1的線程開銷,訪問量越大,系統(tǒng)將發(fā)生線程棧溢出,線程創(chuàng)建失敗,最終導(dǎo)致進(jìn)程宕機或者僵死,從而不能對外提供服務(wù)。
接下來我們采用一個偽異步I/O的通信框架,采用線程池和任務(wù)隊列實現(xiàn),當(dāng)客戶端接入時,將客戶端的Socket封裝成一個Task(該任務(wù)實現(xiàn)Java. lang. Runnable(線程任務(wù)接口)交給后端的線程池中進(jìn)行處理。JDK的線程池維護(hù)一個消息隊列和N個活躍的線程,對消息隊列中Socket任務(wù)進(jìn)行處理,由于線程池可以設(shè)置消息隊列的大小和最大線程數(shù),因此,它的資源占用是可控的,無論多少個客戶端并發(fā)訪問,都不會導(dǎo)致資源的耗盡和宕機。
如下圖所示:
服務(wù)端代碼:
import java.net.ServerSocket;
import java.net.Socket;
//目標(biāo):開發(fā)實現(xiàn)偽異步通訊架構(gòu)
//思路:服務(wù)端每接收到一個客戶端socket請求對象之后都交給一個獨立的線程來處理客戶端的數(shù)據(jù)交互需求
public class Server {
public static void main(String[] args) {
try {
//1.注冊端口
ServerSocket ss = new ServerSocket(9999);
//2.定義一個死循環(huán),負(fù)責(zé)不斷的接收客戶端的Socket的連接請求
//初始化一個線程池對象
HandlerSocketServerPool pool = new HandlerSocketServerPool(3, 10);
while (true) {
Socket socket = ss.accept();
//3.把socket對象交給一個線程池進(jìn)行處理
//把socket封裝成一個任務(wù)對象交給線程池處理
Runnable target = new ServerRunnableTarget(socket);
pool.execute(target);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
定義線程池代碼:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class HandlerSocketServerPool {
//1. 創(chuàng)建一個線程池的成員變量用于存儲一個線程池對象
private ExecutorService executorService;
/**
* 2.創(chuàng)建這個類的的對象的時候就需要初始化線程池對象
* public ThreadPoolExecutor(int corePoolSize,
* int maximumPoolSize,
* long keepAliveTime,
* TimeUnit unit,
* BlockingQueue
*/
public HandlerSocketServerPool(int maxThreadNum, int queueSize) {
executorService = new ThreadPoolExecutor(3, maxThreadNum, 120,
TimeUnit.SECONDS, new ArrayBlockingQueue
}
/**
* 3.提供一個方法來提交任務(wù)給線程池的任務(wù)隊列來暫存,等待線程池來處理
*/
public void execute(Runnable target) {
executorService.execute(target);
}
}
Socket任務(wù)類:
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Socket;
public class ServerRunnableTarget implements Runnable {
private Socket socket;
public ServerRunnableTarget(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
//處理接收到的客戶端socket通信需求
try {
//1.從socket管道中得到一個字節(jié)輸入流對象
InputStream is = socket.getInputStream();
//2.把字節(jié)輸入流包裝成一個緩存字符輸入流
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String msg;
while ((msg = br.readLine()) != null) {
System.out.println("服務(wù)端收到:" + msg);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
客戶端代碼:
import java.io.PrintStream;
import java.net.Socket;
import java.util.Scanner;
public class Client {
public static void main(String[] args) {
try {
//1.請求與服務(wù)端的Socket對象連接
Socket socket = new Socket("127.0.0.1",9999);
//2. 得到一個打印流
PrintStream ps = new PrintStream(socket.getOutputStream());
//3. 使用循環(huán)不斷的發(fā)送消息給服務(wù)端接收
Scanner sc = new Scanner(System.in);
while (true){
System.out.println("請說:");
String msg =sc.nextLine();
ps.println(msg);
ps.flush();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
效果:
可以接收到3個客戶端的消息,當(dāng)開啟第四個客戶端的時候,不報錯,但是服務(wù)端不會接收到第四個客戶端發(fā)送的消息。
小結(jié)
偽異步采用了線程池 實現(xiàn),因此避免了為每個請求創(chuàng)建一個獨立線程造成線程資源耗盡的問題,但由于底層依然是采用的同步阻塞模型,因此無法從根本上解決問題。
如果單個消息處理的緩慢,或者服務(wù)器線程池中的全部線程都被阻塞,那么后續(xù)socket的I/O消息都將在隊列中排隊。新的Socket請求將被拒絕,客戶端會發(fā)生大量連接超時。
7. 基于BIO形式下的文件上傳
目標(biāo): 支持任意類型文件形式的上傳
客戶端代碼:
import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
//目標(biāo):實現(xiàn)客戶端上傳任意類型的文件數(shù)據(jù)給服務(wù)端保存起來
public class Client {
public static void main(String[] args) {
try(InputStream is = new FileInputStream("D:\\Desktop\\抖音的推薦機制.docx")) {
//1.請求與服務(wù)端的socket連接
Socket socket = new Socket("127.0.0.1", 8888);
//2.把字節(jié)輸出流包裝成一個數(shù)據(jù)輸出流
DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
//3.先發(fā)送文件的后綴名給服務(wù)端
dos.writeUTF(".docx");
//4.把文件數(shù)據(jù)發(fā)送給服務(wù)端進(jìn)行接收
byte[] buffer = new byte[1024];
int len;
while ((len = is.read(buffer)) > 0) {
dos.write(buffer, 0, len);
}
dos.flush();
//通知服務(wù)端,我客戶端這邊的數(shù)據(jù)已經(jīng)發(fā)送完畢了
socket.shutdownOutput();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
服務(wù)端代碼:
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
public class Server {
public static void main(String[] args) {
System.out.println("======服務(wù)端啟動=======");
try {
ServerSocket ss = new ServerSocket(8888);
while (true) {
Socket socket = ss.accept();
//交給一個獨立的線程來處理與這個客戶端的文件通信需求
new ServerReadThread(socket).start();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Socket線程處理類:
import java.io.DataInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.UUID;
public class ServerReadThread extends Thread{
private Socket socket;
public ServerReadThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
//1.得到一個數(shù)據(jù)輸入流來讀取客戶端發(fā)送過來的數(shù)據(jù)
DataInputStream dis = new DataInputStream(socket.getInputStream());
//2.讀取客戶端發(fā)送過來的文件類型
String suffix = dis.readUTF();
System.out.println("服務(wù)端已經(jīng)成功接收到文件類型:"+suffix);
//3.定義一個字節(jié)輸出管道,負(fù)責(zé)將客戶端發(fā)送來的數(shù)據(jù)寫出去
OutputStream os = new FileOutputStream("D:\\Desktop\\server\\"+ UUID.randomUUID().toString() + suffix);
//4.從數(shù)據(jù)輸入流中讀取文件數(shù)據(jù),寫出到字節(jié)輸出流中去
byte[] buffer = new byte[1024];
int len;
while ((len = dis.read(buffer)) > 0) {
os.write(buffer, 0, len);
}
os.close();
System.out.println("服務(wù)端接收文件保存成功!");
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
啟動Server,后啟動Client,效果如下:
小結(jié)
同步阻塞模式下(BIO),客戶端怎么發(fā),服務(wù)端就必須對應(yīng)的怎么收。如客戶端用的是DataOutputStream,那么服務(wù)端就該用DataInputStream,客戶端dos.writeUTF(“.jpg”);服務(wù)端就該String suffix = dis.readUTF();客戶端發(fā)完數(shù)據(jù)后必須通知服務(wù)端自己已經(jīng)發(fā)完socket.shutdownOutput(),否則服務(wù)端會一直等待。
8. Java BIO模式下的端口轉(zhuǎn)發(fā)思想
需求:需要實現(xiàn)一個客戶端的消息可以發(fā)送給所有的客戶端去接收。(群聊實現(xiàn))
服務(wù)端代碼:
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
/**
* 目標(biāo):BIO模式下的端口轉(zhuǎn)發(fā)思想-服務(wù)端實現(xiàn)
* 服務(wù)端實現(xiàn)需求:
* 1.注冊端口
* 2.接收客戶端的socket連接,交給一個獨立的線程來處理
* 3.把當(dāng)前連接的客戶端socket存入到一個所謂的在線socket集合中保存
* 4.接收客戶端的消息,然后推動給當(dāng)前所有的在線socket接收
*/
public class Server {
public static List
public static void main(String[] args) {
try {
ServerSocket ss = new ServerSocket(9999);
while (true) {
Socket socket = ss.accept();
//把登陸的客戶端socket存入到一個在線集合上
allSocketOnline.add(socket);
//為當(dāng)前登錄成功的socket分配一個獨立的線程來處理與之通信
new ServerReadThread(socket).start();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
服務(wù)端線程處理類:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.Socket;
public class ServerReadThread extends Thread{
private Socket socket;
public ServerReadThread(Socket socket){
this.socket = socket;
}
@Override
public void run() {
try {
//1.從socket中獲取當(dāng)前客戶端的輸入流
BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String msg;
while ((msg = br.readLine())!=null) {
//2.服務(wù)端接收到了客戶端的消息之后,是需要推送給當(dāng)前所有的在線socket
sendMsgToAllClient(msg, socket);
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("當(dāng)前有人下線了");
//從在線socket集合中移除本socket
Server.allSocketOnline.remove(socket);
}
}
/**
* 把當(dāng)前客戶端發(fā)來的消息推送給全部在線的socket
* @param msg
*/
private void sendMsgToAllClient(String msg, Socket socket) throws IOException {
for (Socket socket1 : Server.allSocketOnline) {
//只發(fā)送給除自己以外的客戶端
if (socket != socket1) {
PrintStream ps = new PrintStream(socket1.getOutputStream());
ps.println(msg);
ps.flush();
}
}
}
}
客戶端代碼:
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.Socket;
import java.util.Scanner;
public class Client {
public static void main(String[] args) {
try {
//1.請求與服務(wù)端的Socket對象連接
Socket socket = new Socket("127.0.0.1", 9999);
//收消息
Thread clientThread = new ClientReaderThread(socket);
clientThread.start();
while (true) {
//發(fā)消息
OutputStream os = socket.getOutputStream();
PrintStream ps = new PrintStream(os);
//3. 使用循環(huán)不斷的發(fā)送消息給服務(wù)端接收
Scanner sc = new Scanner(System.in);
//System.out.print("client send message:");
String msg = sc.nextLine();
ps.println(msg);
ps.flush();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
客戶端線程處理類:
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Socket;
public class ClientReaderThread extends Thread {
private Socket socket;
public ClientReaderThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
while (true) {
InputStream is = socket.getInputStream();
//4.把字節(jié)輸入流包裝成一個緩存字符輸入流
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String msg;
if ((msg = br.readLine()) != null) {
System.out.println(msg);
}
}
} catch (Exception e) {
}
}
}
效果
先啟動服務(wù)端Server,然后啟動3個客戶端Client
9. 基于BIO模式下即時通訊
基于BIO模式下的即時通信,我們需要解決客戶端到客戶端的通信,也就是需要實現(xiàn)客戶端與客戶端的端口消息轉(zhuǎn)發(fā)邏輯。
項目案例說明
本項目案例為即時通信的軟件項目,適合基礎(chǔ)加強的大案例,具備綜合性。學(xué)習(xí)本項目案例至少需要具備如下java SE技術(shù)點:
java面向?qū)ο笤O(shè)計,語法設(shè)計多線程技術(shù)IO流技術(shù)網(wǎng)絡(luò)通信相關(guān)技術(shù)集合框架項目開發(fā)思維java常用api使用
……
功能清單簡單說明
客戶端登錄功能
可以啟動客戶端進(jìn)行登錄,客戶端登錄只需要輸入用戶名和服務(wù)端IP地址即可。
在線人數(shù)實時更新
客戶端用戶登錄后,需要同步更新所有客戶端的聯(lián)系人信息欄。
離線人數(shù)更新
檢測到有客戶端下線后,需要同步更新所有客戶端的聯(lián)系人信息欄。
群聊
任意一個客戶端的消息,可以推動給當(dāng)前所有的客戶端接收。
私聊
任意一個客戶端消息,可以推送給當(dāng)前所有客戶端接收。
@消息
可以選擇某個員工,然后發(fā)出的消息可以@該用戶,但是其他所有人都能收到消息。
消息用戶和消息時間點
服務(wù)端可以實時記錄該用戶的消息時間點,然后進(jìn)行消息的多路轉(zhuǎn)發(fā)或則選擇。
技術(shù)選型分析
Java GUI,BIO
服務(wù)端設(shè)計
服務(wù)端接收多個客戶端邏輯
服務(wù)端需要接收多個客戶端,目前我們采取的策略是 一個客戶端對應(yīng)一個服務(wù)端線程服務(wù)端除了要注冊端口以外,還需要為每個客戶端分配 一個獨立線程處理與之通信
服務(wù)端主體代碼,主要進(jìn)行端口注冊,和接收客戶端,分配線程處理該客戶端請求
服務(wù)端接收登陸消息以及監(jiān)測離線
在上面我們實現(xiàn)了服務(wù)端可以接收多個客戶端連接,接下來我們要接收客戶端的登陸消息。
我們需要在服務(wù)端處理客戶端線程的登陸消息。需要注意的是,服務(wù)端需要接收客戶端的消息可能有很多種,分別是登陸消息,群聊消息,私聊消息和@消息。這里需要約定如果客戶端發(fā)送消息之前需要先發(fā)送消息的類型,類型我們使用信號值標(biāo)志(1,2,3)。
1代表接收的是登陸消息2代表群發(fā)| @消息3代表了私聊消息
服務(wù)端的線程中有異常校驗機制,一旦發(fā)現(xiàn)客戶端下線會在異常機制中處理,然后移除當(dāng)前客戶端用戶,把最新的用戶列表 發(fā)回給全部客戶端進(jìn)行在線人數(shù)更新。
服務(wù)端接收群聊消息
在上面實現(xiàn)了接收客戶端的登陸消息,然后提取當(dāng)前在線的全部的用戶名稱和當(dāng)前登陸的用戶名稱,發(fā)送給全部在線用戶更新自己的在線人數(shù)列表。接下來要接收客戶端發(fā)來的群聊消息,然后推送給當(dāng)前在線的所有客戶端。
服務(wù)端接收私聊消息
解決私聊消息的推送邏輯,私聊消息需要知道推送給某個具體的客戶端。我們可以接收到客戶端發(fā)來的私聊用戶名稱,根據(jù)用戶名稱定位該用戶的Socket管道,然后單獨推送消息給該Socket管道。
代碼實現(xiàn)
常量類:
public class Constants {
//端口
public static final int PORT = 7778;
//逗號分割
public static final String SPLIT = ",";
}
ServerChat:
import com.linxc.bio.instant_message.util.Constants;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
public class ServerChat {
//定義一個集合存放所有在線的socket
//在線集合只需要一個,存儲客戶端socket的同時還需要知道這個socket的客戶端名稱
public static Map
public static void main(String[] args) {
//注冊端口
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(Constants.PORT);
//循環(huán)一直等待所有可能的客戶端連接
while (true) {
Socket socket = serverSocket.accept();
//把客戶端的socket管道單獨配置一個線程來處理
new ServerReader(socket).start();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
ServerReader:
import com.linxc.bio.instant_message.util.Constants;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.Socket;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Set;
public class ServerReader extends Thread {
private Socket socket;
public ServerReader(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
DataInputStream dis = null;
try {
dis = new DataInputStream(socket.getInputStream());
//1.循環(huán)一直等待客戶端的消息
while (true) {
//2.讀取當(dāng)前的消息類型:登錄,群發(fā),私聊,@消息
int flag = dis.readInt();
System.out.println("服務(wù)flag:"+flag);
if (flag == 1) {
//先將當(dāng)前登錄的客戶端socket存到在線人數(shù)的socket集合中
String name =dis.readUTF();
System.out.println(name + "----->" + socket.getRemoteSocketAddress());
ServerChat.onLineSockets.put(socket, name);
}
writeMsg(flag, dis);
}
} catch (Exception e) {
System.out.println("---有人下線了---");
ServerChat.onLineSockets.remove(socket);
try {
//重新更新在線人數(shù)并發(fā)給所有客戶端
writeMsg(1, dis);
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
private void writeMsg(int flag, DataInputStream dis) throws Exception {
//定義一個變量存放最終的消息形式
String msg = null;
if (flag == 1) {
//讀取所有在線人數(shù)發(fā)給所有客戶端去更新自己的在線人數(shù)列表
StringBuilder rs = new StringBuilder();
Collection
//判斷是否存在在線人數(shù)
if (onlineNames != null && onlineNames.size() > 0) {
for (String name : onlineNames) {
rs.append(name + Constants.SPLIT);
}
//去掉最后一個分隔符
msg = rs.substring(0, rs.lastIndexOf(Constants.SPLIT));
//將消息發(fā)送給所有的客戶端
sendMsgToAll(flag, msg);
}
}else if (flag == 2||flag ==3) {
//讀到消息群發(fā)的或者是@消息
String newMsg = dis.readUTF();//消息
String sendName = ServerChat.onLineSockets.get(socket);
//內(nèi)容
StringBuilder msgFinal = new StringBuilder();
//時間
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss EEE");
if (flag == 2) {//群發(fā)消息和@消息
msgFinal.append(sendName).append("").append(sdf.format(System.currentTimeMillis()*2)).append("\r\n");
msgFinal.append(" ").append(newMsg).append("\r\n");
sendMsgToAll(flag, msgFinal.toString());
} else if (flag == 3) {//私聊消息
msgFinal.append(sendName).append("").append(sdf.format(System.currentTimeMillis()*2)).append(" 對您私發(fā)\r\n");
msgFinal.append("").append(newMsg).append("\r\n");
//私發(fā)
//得到發(fā)給誰
String destName = dis.readUTF();
sendMsgToOne(destName, msgFinal.toString());
}
System.out.println("發(fā)送消息:"+msgFinal);
}
}
/**
* @param destName 對誰私發(fā)
* @param msg 發(fā)的消息內(nèi)容
* @throws Exception
*/
private void sendMsgToOne(String destName, String msg) throws Exception {
//拿到所有在線的socket管道 給這些管道寫出消息
Set
for (Socket sk : allOnLineSockets) {
//得到當(dāng)前需要私發(fā)的Socket
//只對這個名字對應(yīng)的socket私發(fā)消息
if (ServerChat.onLineSockets.get(sk).trim().equals(destName)) {
DataOutputStream dos = new
DataOutputStream(sk.getOutputStream());
dos.writeInt(2);//消息類型
dos.writeUTF(msg);
dos.flush();
}
}
}
private void sendMsgToAll(int flag, String msg) throws Exception {
//拿到所有的在線socket管道 給這些管道寫出消息
Set
for (Socket sk : allOnLineSockets) {
DataOutputStream dos = new DataOutputStream(sk.getOutputStream());
dos.writeInt(flag);//消息類型
System.out.println(msg);
dos.writeUTF(msg);
dos.flush();
}
}
}
客戶端設(shè)計
啟動客戶端界面,登錄,刷新在線
客戶端界面主要是GUI設(shè)計,主體頁面分為登陸界面和聊天窗口,以及在線用戶列表。
登陸輸入服務(wù)端ip和用戶名后,要請求與服務(wù)端的登陸,然后立即為當(dāng)前客戶端分配一個讀線程處理客戶端的讀數(shù)據(jù)消息。因為客戶端可能隨時會接收到服務(wù)端那邊轉(zhuǎn)發(fā)過來的各種即時消息信息。客戶端登陸完成,服務(wù)端收到登陸的用戶名后,會立即發(fā)來最新的用戶列表給客戶端更新。
客戶端發(fā)送消息邏輯
客戶端發(fā)送群聊消息,@消息,以及私聊消息。
實現(xiàn)步驟: 客戶端啟動后,在聊天界面需要通過發(fā)送按鈕推送群聊消息,@消息,以及私聊消息。
代碼實現(xiàn)
Client:
import com.linxc.bio.instant_message.util.Constants;
import javax.swing.*;
import java.awt.*;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.io.DataOutputStream;
import java.net.Socket;
public class ClientChat implements ActionListener {
//設(shè)計界面
private JFrame win = new JFrame();
//消息內(nèi)容框架
public JTextArea smsContent = new JTextArea(23,50);
//發(fā)送消息的框
private JTextArea smsSend = new JTextArea(4,40);
//在線人數(shù)的區(qū)域
public JList
//是否私聊按鈕
private JCheckBox isPrivateBn = new JCheckBox("私聊");
//消息按鈕
private JButton sendBn = new JButton("發(fā)送");
//登錄界面
private JFrame loginView;
private JTextField ipEt, nameEt, idEt;
private Socket socket;
public static void main(String[] args) {
new ClientChat().initView();
}
private void initView() {
/** 初始化聊天窗口的界面 */
win.setSize(650, 600);
/** 展示登錄界面 */
displayLoginView();
}
private void displayChatView() {
JPanel bottomPanel = new JPanel(new BorderLayout());
//將消息框和按鈕添加到窗口的底端
win.add(bottomPanel, BorderLayout.SOUTH);
bottomPanel.add(smsSend);
JPanel btns = new JPanel(new FlowLayout(FlowLayout.LEFT));
btns.add(sendBn);
btns.add(isPrivateBn);
bottomPanel.add(btns, BorderLayout.EAST);
//-----------------------------------------------
//給發(fā)送消息按鈕綁定點擊事件監(jiān)聽器
//將展示消息區(qū)centerPanel添加到窗口的中間
smsContent.setBackground(new Color(0xdd, 0xdd, 0xdd));
//讓展示消息區(qū)可以滾動
win.add(new JScrollPane(smsContent), BorderLayout.CENTER);
smsContent.setEditable(false);
//-------------------------------------------------
//用戶列表和是否私聊放到窗口的最右邊
Box rightBox = new Box(BoxLayout.Y_AXIS);
onlineUsers.setFixedCellWidth(120);
onlineUsers.setVisibleRowCount(13);
rightBox.add(new JScrollPane(onlineUsers));
win.add(rightBox, BorderLayout.EAST);
//-------------------------------------------------
//關(guān)閉窗口退出當(dāng)前程序
win.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
win.pack();//swing 加上這句 就可以擁有關(guān)閉窗口的功能
/** 設(shè)置窗口居中,顯示出來 */
setWindowCenter(win, 650, 600, true);
//發(fā)送按鈕綁定點擊事件
sendBn.addActionListener(this);
}
private void displayLoginView() {
/**
* 先讓用戶進(jìn)行登錄
* 服務(wù)端ip
* 用戶名
* id
*/
/** 顯示一個qq的登錄框 */
loginView = new JFrame("登錄");
loginView.setLayout(new GridLayout(3, 1));
loginView.setSize(400, 230);
JPanel ip = new JPanel();
JLabel label = new JLabel("IP:");
ip.add(label);
ipEt = new JTextField(20);
ip.add(ipEt);
loginView.add(ip);
JPanel name = new JPanel();
JLabel label1 = new JLabel("姓名:");
name.add(label1);
nameEt = new JTextField(20);
name.add(nameEt);
loginView.add(name);
JPanel btnView = new JPanel();
JButton login = new JButton("登錄");
btnView.add(login);
JButton cancle = new JButton("取消");
btnView.add(cancle);
loginView.add(btnView);
//關(guān)閉窗口退出當(dāng)前程序
loginView.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
setWindowCenter(loginView, 400, 260, true);
/** 給登錄和取消綁定點擊事件 */
login.addActionListener(this);
cancle.addActionListener(this);
}
private void setWindowCenter(JFrame frame, int width, int height, boolean
flag) {/** 得到所在系統(tǒng)所在屏幕的寬高 */
Dimension ds = frame.getToolkit().getScreenSize();
/** 拿到電腦的寬 */
int width1 = ds.width;
/** 高 */
int height1 = ds.height;
System.out.println(width1 + "*" + height1);
/** 設(shè)置窗口的左上角坐標(biāo) */
frame.setLocation(width1 / 2 - width / 2, height1 / 2 - height / 2);
frame.setVisible(flag);
}
@Override
public void actionPerformed(ActionEvent e) {
//得到點擊的事件源
JButton btn = (JButton) e.getSource();
switch (btn.getText()) {
case "登錄":
String ip = ipEt.getText().toString();
String name = nameEt.getText().toString();
//校驗參數(shù)是否為空
//錯誤提示
String msg = "";
//12.1.2.0
// \d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\
if (ip == null ||
!ip.matches("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}")) {
msg = "請輸入合法的服務(wù)器ip地址";
} else if (name == null || !name.matches("\\S{1,}")) {
msg = "姓名必須1個字符以上";
}
if (!msg.equals("")) {
//msg有內(nèi)容說明參數(shù)有為空
//參數(shù)一:彈出放到那個窗口里面
JOptionPane.showMessageDialog(loginView, msg);
} else {
try {
//參數(shù)都合法了
//當(dāng)前登錄的用戶,去服務(wù)器登錄
/** 先把當(dāng)前用戶的名稱展示到界面*/
win.setTitle(name);
//去服務(wù)端登錄連接一個socket管道
socket = new Socket(ip, Constants.PORT);
//為客戶端的socket分配一個線程 專門負(fù)責(zé)收消息
new ClientReader(this, socket).start();
//帶上用戶信息過去
DataOutputStream dos = new
DataOutputStream(socket.getOutputStream());
dos.writeInt(1);//登錄消息
dos.writeUTF(name.trim());
dos.flush();
//關(guān)閉當(dāng)前窗口 彈出聊天界面
loginView.dispose();//登錄窗口銷毀
displayChatView();//展示了聊天窗口了
} catch (Exception e1) {
e1.printStackTrace();
}
}
break;
case "取消":
/** 退出系統(tǒng)*/
System.exit(0);
break;
case "發(fā)送":
//得到發(fā)送消息的內(nèi)容
String msgSend = smsSend.getText().toString();
if (!msgSend.trim().equals("")) {
//發(fā)消息給服務(wù)端
try {
//判斷是否對誰發(fā)消息
String selectName = onlineUsers.getSelectedValue();
int flag = 2;// 群發(fā) @消息
if (selectName != null && !selectName.equals("")) {
msgSend = ("@" + selectName + "," + msgSend);
//判斷是否選中私發(fā)
if (isPrivateBn.isSelected()) {
//私發(fā)
flag = 3;//私發(fā)消息
}
}
DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
dos.writeInt(flag);//群發(fā)消息 發(fā)送給所有人
dos.writeUTF(msgSend);
if (flag == 3) {
//告訴服務(wù)器端我對誰私發(fā)
dos.writeUTF(selectName.trim());
}
dos.flush();
} catch (Exception e1) {
e1.printStackTrace();
}
}
smsSend.setText(null);
break;
}
}
}
ClientReader:
import com.linxc.bio.instant_message.util.Constants;
import java.io.DataInputStream;
import java.net.Socket;
import java.util.Arrays;
public class ClientReader extends Thread {
private Socket socket;
private ClientChat clientChat;
public ClientReader(ClientChat clientChat, Socket socket) {
this.clientChat = clientChat;
this.socket = socket;
}
@Override
public void run() {
try {
DataInputStream dis = new DataInputStream(socket.getInputStream());
/** 循環(huán)一直等待客戶端的消息 */
while (true) {
/** 讀取當(dāng)前的消息類型: 登錄,群發(fā),私聊,@消息 */
int flag = dis.readInt();
if (flag == 1) {
//在線人數(shù)消息回來了
String nameDatas = dis.readUTF();
System.out.println(nameDatas);
//展示到在線人數(shù)的界面
String[] names = nameDatas.split(Constants.SPLIT);
System.out.println(Arrays.toString(names));
clientChat.onlineUsers.setListData(names);
} else if (flag == 2) {
//群發(fā),私聊 ,@消息 都是直接顯示的
String msg = dis.readUTF();
clientChat.smsContent.append(msg);
//讓消息界面滾動到低端
clientChat.smsContent.setCaretPosition(clientChat.smsContent.getText().length());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
我的博客:NIO專題(一)
柚子快報激活碼778899分享:java NIO專題學(xué)習(xí)(一)
推薦鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。