柚子快報激活碼778899分享:緩存 Java多線程2
柚子快報激活碼778899分享:緩存 Java多線程2
1. 線程通信
1.1. 線程通信引入
應用場景:生產(chǎn)者和消費者問題
假設倉庫中只能存放一件產(chǎn)品,生產(chǎn)者將生產(chǎn)出來的產(chǎn)品放入倉庫,消費者將倉庫中產(chǎn)品取走消費如果倉庫中沒有產(chǎn)品,則生產(chǎn)者將產(chǎn)品放入倉庫,否則停止生產(chǎn)并等待,直到倉庫中的產(chǎn)品被消費者取走為止如果倉庫中放有產(chǎn)品,則消費者可以將產(chǎn)品取走消費,否則停止消費并等待,直到倉庫中再次放入產(chǎn)品為止
分析
這是一個線程同步問題,生產(chǎn)者和消費者共享同一個資源,并且生產(chǎn)者和消費者之間相互依賴,互為條件 對于生產(chǎn)者,沒有生產(chǎn)產(chǎn)品之前,要通知消費者等待。生產(chǎn)產(chǎn)品之后,還需要通知消費者消費 對于消費者,在消費之后,要通知生產(chǎn)者已經(jīng)消費結(jié)束,生產(chǎn)者需要繼續(xù)生產(chǎn)新產(chǎn)品以供消費者 在生產(chǎn)者消費者問題中,僅有synchronized是不夠的
synchronized可阻止并發(fā)更新同一個共享資源,實現(xiàn)了同步synchronized不能用來實現(xiàn)不同線程之間的消息傳遞(通信) Java提供了3個方法解決線程之間的通信問題
均是java.lang.Object類的方法都只能在同步方法或同步代碼塊中使用,否則會拋出異常
public class Product {
private String name;
private String color;
public Product() {}
@Override
public String toString() {
return "Product{" +
"name='" + name + '\'' +
", color='" + color + '\'' +
'}';
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getColor() {
return color;
}
public void setColor(String color) {
this.color = color;
}
public Product(String name, String color) {
this.name = name;
this.color = color;
}
}
/*
* 生產(chǎn)者線程
* */
public class ProduceRunnable implements Runnable {
private Product product;
public void setProduct(Product product) {
this.product = product;
}
@Override
public void run() {
int i = 0;
while (true) {
if(i % 2 == 0) {
product.setName("饅頭");
product.setColor("白色");
}else {
product.setName("玉米餅");
product.setColor("黃色");
}
System.out.println("生產(chǎn)者生產(chǎn)商品:" + product.getName() + " " + product.getColor());
i++;
}
}
}
/*
* 消費者線程
* */
public class ConsumeRunnable implements Runnable {
private Product product;
public ConsumeRunnable() {}
public ConsumeRunnable(Product product) {
this.product = product;
}
@Override
public void run() {
while (true) {
System.out.println("消費者消費商品:" + product.getName() + " " + product.getColor());
}
}
}
public class Test {
public static void main(String[] args) {
Product product = new Product();
ProduceRunnable runnable = new ProduceRunnable();
runnable.setProduct(product);
Thread thread = new Thread(runnable);
ConsumeRunnable runnable1 = new ConsumeRunnable(product);
Thread thread1 = new Thread(runnable1);
thread.start();
thread1.start();
}
}
注意:必須保證生產(chǎn)者消費者操作的是一個商品對象,否則就會出現(xiàn)消費者消費null的情況
1.2. 使用同步代碼塊實現(xiàn)線程同步
public class ProduceRunnable1 implements Runnable {
private Product product;
public void setProduct(Product product) {
this.product = product;
}
@Override
public void run() {
int i = 0;
while (true) {
synchronized (product) {
if(i % 2 == 0) {
product.setName("饅頭");
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
product.setColor("白色");
}else {
product.setName("玉米餅");
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
product.setColor("黃色");
}
System.out.println("生產(chǎn)者生產(chǎn)商品:" + product.getName() + " " + product.getColor());
}
i++;
}
}
}
public class ConsumeRunnable1 implements Runnable {
private Product product;
@Override
public void run() {
while (true) {
synchronized (product) {
System.out.println("消費者消費商品:" + product.getName() + " " + product.getColor());
}
}
}
}
注意:不僅生產(chǎn)者要加鎖,消費者也要加鎖,并且必須是一把鎖(不僅是一個引用變量,而且必須指向同一個對象)
public class ProduceRunnable2 implements Runnable {
private Product product;
public void setProduct(Product product) {
this.product = product;
}
@Override
public void run() {
int i = 0;
while (true) {
synchronized (product) {
// 如果已有商品,就等待
if(product.flag) {
try {
// 必須調(diào)用同步監(jiān)視器的通信方法
product.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 生產(chǎn)商品
if(i % 2 == 0) {
product.setName("饅頭");
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
product.setColor("白色");
}else {
product.setName("玉米餅");
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
product.setColor("黃色");
}
System.out.println("生產(chǎn)者生產(chǎn)商品:" + product.getName() + " " + product.getColor());
// 修改商品的狀態(tài)
product.flag = true;
// 通知消費者來消費
product.notify();;
}
i++;
}
}
}
public class ConsumeRunnable2 implements Runnable {
private Product product;
public ConsumeRunnable2(Product product) {
this.product = product;
}
@Override
public void run() {
while (true) {
synchronized (product) {
// 如果沒有商品,就等待
if(!product.flag) {
try {
product.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 消費商品
System.out.println("消費者消費商品:" + product.getName() + " " + product.getColor());
// 修改商品狀態(tài)
product.flag = false;
// 通知生產(chǎn)者進行生產(chǎn)
product.notifyAll();
}
}
}
}
}
線程同步的細節(jié)
細節(jié)1: 進行線程通信的多個線程,要使用同一個同步監(jiān)視器(product),還必須要調(diào)用該同步監(jiān)視器的wait()、notify()、notifyAll()細節(jié)2: 線程通信的三個方法
wait():在其他線程調(diào)用此對象的notify()方法或notifyAll()方法前,導致當前線程等待。當前線程必須擁有此對象監(jiān)視器wait(time):在其他線程調(diào)用此對象的notify()方法或notifyAll()方法或超過指定的時間量前,導致當前線程等待。當前線程必須擁有此對象監(jiān)視器notify():喚醒在此對象監(jiān)視器上等待的單個線程。如果所有線程都在此對象上等待,則會選擇喚醒其中一個線程。選擇是任意性的,在對實現(xiàn)作出決定時發(fā)生的notifyAll():喚醒在此對象監(jiān)視器上等待的所有線程。被喚醒的線程將以常規(guī)方式與在該對象上主動同步的其他所有線程進行競爭 細節(jié)3:完整的線程生命周期 阻塞狀態(tài)有三種
普通的阻塞:sleep、join、Scanner、input.next()同步阻塞(鎖池隊列):沒有獲取同步監(jiān)視器的線程的隊列等待阻塞(阻塞隊列):被調(diào)用了wait()后釋放鎖,然后進行該隊列 細節(jié)4: sleep()和wait()的區(qū)別
sleep()線程會讓出CPU進入阻塞狀態(tài),但不會釋放對象鎖;wait()線程會讓出CPU進入阻塞狀態(tài),也會放棄對象鎖,進入等待此對象的等待鎖定池進入的阻塞狀態(tài)也是不同的隊列wait只能在同步控制方法或者同步控制塊內(nèi)使用,而sleep可以在任何地方使用
2. 線程通信
2.1. 使用同步方法實現(xiàn)線程通信
public class Product1 {
private String name;
private String color;
// 默認沒有商品
boolean flag = false;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getColor() {
return color;
}
public void setColor(String color) {
this.color = color;
}
public synchronized void produce(String name, String color) {
// 如果已有商品,就等待
if(flag) {
try {
// 讓出CPU,會同時釋放鎖
this.wait(); // 必須調(diào)用同步監(jiān)視器的通信方法
}catch (InterruptedException e) {
e.printStackTrace();
}
}
// 生產(chǎn)商品
this.name = name;
try {
// 讓出了CPU,不釋放鎖
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.color = color;
System.out.println("生產(chǎn)者生產(chǎn)商品:" + getName() + " " + getColor());
// 修改商品的狀態(tài)
flag = true;
// 通知消費者來消費
this.notify();
}
public synchronized void consume() {
// 如果沒有商品,就等待
if(!flag) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 消費商品
System.out.println("消費者消費商品:" + name + " " + color);
// 修改商品的狀態(tài)
flag = false;
// 通知生產(chǎn)者進行生產(chǎn)
this.notifyAll();
}
}
public class ConsumeRunnable3 implements Runnable{
private Product1 product;
public void setProduct(Product1 product) {
this.product = product;
}
@Override
public void run() {
while (true) {
product.consume();
}
}
}
public class ProduceRunnable3 implements Runnable {
private Product1 product;
public void setProduct(Product1 product) {
this.product = product;
}
@Override
public void run() {
int i = 0;
while(true) {
if(i % 2 == 0) {
product.produce("饅頭", "白色");
}else {
product.produce("玉米餅", "黃色");
}
i++;
}
}
}
同步方法的同步監(jiān)視器都是this,所以需要將produce()和consume()放入一個類Product中,保證是同一把鎖必須調(diào)用this的wait()、notify()、notifyAll()方法,this可以省略,因為同步監(jiān)視器是this
2.2. 使用Lock鎖實現(xiàn)線程通信
之前實現(xiàn)線程通信時,是生產(chǎn)者和消費者在一個等待隊列中,會存在本來打算喚醒消費者,卻喚醒了一個生產(chǎn)者的問題,要讓生產(chǎn)者和消費者線程在不同的隊列中等待,就要使用Lock鎖
public class Product2 {
private String name;
private String color;
boolean flag = false;
Lock lock = new ReentrantLock();
Condition produceCondition = lock.newCondition();
Condition consumeCondition = lock.newCondition();
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getColor() {
return color;
}
public void setColor(String color) {
this.color = color;
}
public void produce(String name, String color) {
lock.lock();
try {
// 如果已有商品,就等待
if(flag) {
try {
produceCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 生產(chǎn)商品
this.name = name;
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.color = color;
System.out.println("生產(chǎn)者生產(chǎn)商品:" + getName() + " " + getColor());
// 修改商品的狀態(tài)
flag = true;
// 通知消費者來消費
consumeCondition.signal();
} finally {
lock.unlock();
}
}
public void consume() {
lock.lock();
try {
// 如果沒有商品,就等待
if(!flag) {
try {
consumeCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("消費者消費商品:" + name + " " + color);
// 修改商品狀態(tài)
flag = false;
// 通知生產(chǎn)者生產(chǎn)
produceCondition.signalAll();
} finally {
lock.unlock();
}
}
}
一個Object的監(jiān)視器模型上,一個對象擁有一個同步隊列和等待隊列,而Lock(同步器)擁有一個同步隊列和多個等待隊列
2.3. Condition
Condition是在Java1.5中出現(xiàn)的,用來替代傳統(tǒng)的Object的wait()、notify()實現(xiàn)線程間的協(xié)作,相比使用Object的wait()、notify(),使用Condition的await()、signal()實現(xiàn)線程間協(xié)作更加安全和高效。 Condition能構(gòu)更加精細的控制多線程的休眠與喚醒。對于同一個鎖,可以創(chuàng)建多個Condition,在不同情況下使用不同的Condition。 Object的wait()、notify()、notifyAll()方法是和同步鎖關(guān)鍵字synchronized捆綁使用的;而Condition是需要與互斥鎖/共享鎖捆綁使用的 調(diào)用Condition的await()、signal()、signalAll()方法,都必須在lock保護之內(nèi),就是說必須在lock.lock()和lock.unlock()之間才可以使用
Condition中的await()對應Object的wait() Condition中的signal()對應Object的notify() Condition中的signalAll()對應Object的notifyAll() void await() throws InterruptedException:造成當前線程在接到信號或被中斷之前一直處于等待狀態(tài) 與此Condition相關(guān)的鎖以原子方式釋放,并且出于線程調(diào)度的目的,將禁用當前線程,且在發(fā)生以下四種情況之一之前,當前線程一直處于休眠狀態(tài):
其他某個線程調(diào)用此Condition的signal()方法,并且碰巧將當前線程選為被喚醒著的線程其他某個線程調(diào)用此Condition的signalAll()方法其他某個線程中斷當前線程,且支持中斷線程的掛起發(fā)生虛假喚醒 在所有情況下,在此方法可以返回當前線程之前,都必須重新獲取與此條件有關(guān)的鎖。在線程返回時,可以保證它保持此鎖。 void signal():喚醒一個等待線程。 如果所有的線程都在等待此條件,則選擇其中的一個喚醒。在從await返回之前,該線程必須重新獲得鎖 void signalAll():喚醒所有等待線程。 如果所有的線程都在等待此條件,則喚醒所有線程。在從await返回之前,每個線程都必須重新獲取鎖
3. 線程池ThreadPoolExecutor
3.1. 線程池ThreadPoolExecutor引入
什么是線程池
創(chuàng)建和銷毀對象是非常耗費時間的創(chuàng)建對象:需要分配內(nèi)存等資源銷毀對象:雖然不需要程序員操作,但是垃圾回收器會一直在后臺跟蹤并銷毀對于經(jīng)常創(chuàng)建和銷毀、使用量特別大的資源,比如并發(fā)情況下的線程,對性能影響很大思路:創(chuàng)建好多個線程,放入線程池中,使用時直接獲取引用,不使用時放回池中??梢员苊忸l繁創(chuàng)建銷毀、實現(xiàn)重復利用生活案例:共享單車技術(shù)案例:線程池、數(shù)據(jù)庫連接池JDK1.5起,提供了內(nèi)置線程池 線程池的好處
提高響應速度(減少了創(chuàng)建新線程的時間)降低資源消耗(重復利用線程池中線程,不需要每次都創(chuàng)建)提高線程的管理性:避免線程無限制創(chuàng)建、從而消耗系統(tǒng)資源,降低系統(tǒng)穩(wěn)定性,甚至內(nèi)存溢出或CPU耗盡 線程池的應用場合
需要大量線程,并且完成任務的時間短對性能要求苛刻接受突發(fā)性的大量請求
3.2. 使用線程池執(zhí)行大量的Runnable命令
public class TestThreadPool1 {
public static void main(String[] args) {
// 創(chuàng)建一個線程池
// 池中只有1個線程,保證線程一直存在
// ExecutorService pool = Executors.newSingleThreadExecutor();
// 池中有固定數(shù)量的線程
// ExecutorService pool = Executors.newFixedThreadPool(10);
// 池中線程的數(shù)量可以動態(tài)變化
ExecutorService pool = Executors.newCachedThreadPool();
// 使用線程池執(zhí)行大量的Runnable命令
for (int i = 0; i < 20; i++) {
final int n = i;
// 指定Runnable命令
Runnable command = new Runnable() {
@Override
public void run() {
System.out.println("開始執(zhí)行" + n);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("執(zhí)行結(jié)束" + n);
}
};
// 不需要new Thread(command),使用線程池命令
pool.shutdown();
}
}
}
3.3. 使用線程池執(zhí)行大量的Callable任務
public class TestThreadPool2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(10);
// 使用線程池
List
for (int i = 0; i < 20000; i++) {
// 指定一個Callable任務
Callable
// 將任務交給線程池
Future
// 將Future加入到List
list.add(future);
}
for (Future f : list) {
System.out.println(f.get());
}
// 關(guān)閉線程池
pool.shutdown();
}
}
class MyCallable implements Callable
@Override
public Integer call() throws Exception {
Thread.sleep(2000);
return new Random().nextInt(10);
}
}
3.4. 線程池API總結(jié)
Executor:線程池頂級接口,只有一個方法ExecutorService:真正的線程池接口
void execute(Runnable command):執(zhí)行任務/命令,沒有返回值,一般用來執(zhí)行Runnable Future submit(Callable task):執(zhí)行任務,有返回值,一般用來執(zhí)行Callablevoid shutdown():關(guān)閉線程池 AbstractExecutorService:基本實現(xiàn)了ExecutorService的所有方法ThreadPoolExecutor:默認的線程池實現(xiàn)類 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue
corePoolSize:核心池大小
默認創(chuàng)建線程后,線程數(shù)為0,有任務進來后,就會創(chuàng)建一個線程去執(zhí)行任務但是當線程池中線程數(shù)量達到corePoolSize后,就會把到達的任務放到隊列中等待 maximumPoolSize:最大線程數(shù)
corePoolSize和maximumPoolSize之間的線程數(shù)會自動釋放,小于等于corePoolSize的不會釋放。大于時就會將任務由一個丟棄處理機制來處理 keepAliveTime:線程沒有任務時,最多保持多長時間后會終止
默認只限于corePoolSize和maximumPoolSize間的線程TimeUnit: keepAliveTime的時間單位 BlockingQueue:存儲等待執(zhí)行的任務的阻塞隊列,有多種選擇,可以是順序隊列、鏈式隊列等ThreadFactory:線程工廠,默認是DefaultThreadFactory,Executors的靜態(tài)內(nèi)部類RejectExecutionHandler:
拒絕處理任務時的策略。如果線程池的線程已經(jīng)飽和,并且任務隊列也已滿,對新的任務應該采取的措施比如拋出異常、直接舍棄、丟棄隊列中最舊任務等,默認是直接拋出異常
CallerRunsPolicy:如果發(fā)現(xiàn)線程池還在運行,就直接運行這個線程DiscardOldestPolicy:在線程池的等待隊列中,將頭取出一個拋棄,然后將當前線程放進去DiscardPolicy:什么也不做AbortPolicy:java默認,拋出一個異常 ScheduledThreadPoolExecutor:實現(xiàn)周期性任務調(diào)度的線程池Executors:工具類、線程池的工廠類,用于創(chuàng)建并返回不同類型的線程池
Executors.newCachedThreadPool():創(chuàng)建一個可根據(jù)需要創(chuàng)建新線程池的線程池Executors.newFixedThreadPool(n):創(chuàng)建一個可重用固定線程數(shù)的線程池Executors.newSingleThreadExecutor():創(chuàng)建一個只有一個線程的線程池Executors.newScheduledThreadPool(n):創(chuàng)建一個線程池,它可安排在給定延遲后運行命令或定期地執(zhí)行
4. ForkJoin框架
4.1. ForkJoin框架
4.1.1. 應用場景
目前按任務并發(fā)處理并不能完全充分的利用處理器資源,因為一般的應用程序沒有那么多的并發(fā)處理任務?;谶@種現(xiàn)狀,考慮把一個任務拆分成多個單元,每個單元分別得到執(zhí)行,最后合并每個單元的結(jié)果。 Fork/Join框架是Java7提供的一個用于并行執(zhí)行任務的框架,是一個把大任務分割成若干小任務,最后匯總每個小任務結(jié)果得到大任務結(jié)果的框架。
4.1.2. 工作竊取法
一個大任務拆分成多個小任務,為了減少線程間的競爭,把這些子任務分別放到不同的隊列中,并且每個隊列都有單獨的線程來執(zhí)行隊列里的任務,線程和隊列一一對應。 如果A線程處理完成自己隊列的任務,B線程的隊列里還有很多任務要處理,那么A就會從雙端隊列的尾部拿任務執(zhí)行,而B線程永遠是從雙端隊列的頭部拿任務執(zhí)行。
優(yōu)點:利用了線程進行并行計算,減少了線程間的競爭缺點
如果雙端隊列中只有一個任務時,線程間會存在相互競爭竊取算法消耗了更多的系統(tǒng)資源,如會創(chuàng)建多個線程和多個雙端隊列
4.1.3. 主要類
ForkJoinTask:使用該框架,需要創(chuàng)建一個ForkJoin任務,它提供在任務中執(zhí)行fork和join操作的機制。一般情況下,并不需要直接繼承ForkJoinTask類,只需要繼承他的子類
RecursiveActive:用于返回沒有結(jié)果的任務RecursiveTask:用于有返回結(jié)果的任務 ForkJoinPool:任務ForkJoinTask需要通過ForkJoinPool來執(zhí)行ForkJoinWorkerThread:ForkJoinPool線程池中的一個執(zhí)行任務的線程
4.2. ForkJoin框架示例
使用ForkJoin計算1+2+3+…+n = ?
public class SumTask extends RecursiveTask
private int start;
private int end;
// 當任務處理的數(shù)字范圍在step以內(nèi)時,將不再進一步拆分,而是直接進行累加計算。
private final int step = 2000000;
public SumTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
if(end - start <= step) {
// 小于5個數(shù),直接求和
for (int i = start; i <= end; i++) {
sum += i;
}
}else {
// 大于5個數(shù),分解任務
int mid = (end + start) / 2;
SumTask leftTask = new SumTask(start, mid);
SumTask rightTask = new SumTask(mid + 1, end);
// 執(zhí)行子任務
leftTask.fork();
rightTask.fork();
// 子任務執(zhí)行完,得到結(jié)果
long leftSum = leftTask.join();
long rightSum = rightTask.join();
sum = leftSum + rightSum;
}
return sum;
}
public static void main(String[] args) {
// 如果是多核CPU,其實是一個一直使用,其他閑置,使用多線程解決,但是涉及到任務的拆分與合并等細節(jié);現(xiàn)在使用ForkJoin框架,可以較輕松解決
long start = System.currentTimeMillis();
long sum = 0;
for (int i = 0; i < 1000000000; i++) {
sum += i;
}
System.out.println(sum);
long end = System.currentTimeMillis();
System.out.println("for:" + (end - start));
// 使用ForkJoin框架解決
// 創(chuàng)建一個線程池
ForkJoinPool pool = new ForkJoinPool();
// 定義一個任務
SumTask sumTask = new SumTask(1, 1000000000);
// 將任務交給線程池
start = System.currentTimeMillis();
ForkJoinTask
// 得到結(jié)果輸出
try {
Long result = future.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
end = System.currentTimeMillis();
System.out.println("pool:" + (end - start));
}
}
柚子快報激活碼778899分享:緩存 Java多線程2
文章來源
本文內(nèi)容根據(jù)網(wǎng)絡資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。