柚子快報(bào)邀請碼778899分享:十、kotlin的協(xié)程
柚子快報(bào)邀請碼778899分享:十、kotlin的協(xié)程
協(xié)程
基本概念定義組成掛起和恢復(fù)結(jié)構(gòu)化并發(fā)協(xié)程構(gòu)建器作用域構(gòu)建器掛起函數(shù)阻塞與非阻塞runBlocking全局協(xié)程像守護(hù)線程
Job的生命周期
常用函數(shù)延時(shí)和等待啟動(dòng)和取消啟動(dòng)取消
暫停
協(xié)程啟動(dòng)調(diào)度器啟動(dòng)方式啟動(dòng)模式線程上下文繼承的定義繼承的公式
協(xié)程取消與超時(shí)取消掛起點(diǎn)取消失敗可以取消
釋放資源finally塊不能取消超時(shí)withTimeoutwithTimeoutOrNull
協(xié)程的異常處理異常的傳播異常的傳播特性superviseJob異常的捕獲全局異常處理取消與異常異常聚合
異步流Flow作用概念冷流流的連續(xù)性流構(gòu)建器流上下文
流的取消取消檢測
背壓操作符轉(zhuǎn)換操作符末端操作符組合操作符展平操作符
異常處理流的完成
通道和多路復(fù)用channelproduce與actorchannel的關(guān)閉BroadcastChannel
多路復(fù)用SelectClause
并發(fā)安全避免訪問外部可變狀態(tài)
基本概念
定義
協(xié)程基于線程,是輕量級的線程。
我們用GlobalScope啟動(dòng)了一個(gè)新的協(xié)程,這意味著新協(xié)程的生命周期只受整個(gè)應(yīng)用程序的生命周期限制。
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
fun main() {
GlobalScope.launch { // 在后臺(tái)啟動(dòng)一個(gè)新的協(xié)程并繼續(xù)
delay(300) // 等待300毫秒
"rustfisher.com".forEach {
print(it)
delay(200) // 每次打印都等待一下
}
}
println("RustFisher")
Thread.sleep(3000) // 阻塞主線程防止過快退出
}
// 輸出結(jié)果:
//RustFisher
//rustfisher.com
協(xié)程不一定在同一個(gè)線程中,它們有在同一個(gè)線程的可能性。
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import java.lang.Thread.sleep
fun main() {
println("main線程信息 ${Thread.currentThread().id}")
for (i in 1..20) { // 多啟動(dòng)幾次協(xié)程
GlobalScope.launch {
println("協(xié)程啟動(dòng)#$i 所在線程id: ${Thread.currentThread().id}")
}
}
sleep(5000) // 阻塞主線程防止過快退出
println("RustFisher 示例結(jié)束")
}
輸出結(jié)果:
協(xié)程啟動(dòng)#13 所在線程id: 34
協(xié)程啟動(dòng)#2 所在線程id: 22 ---
協(xié)程啟動(dòng)#9 所在線程id: 30
......
協(xié)程啟動(dòng)#20 所在線程id: 27
協(xié)程啟動(dòng)#19 所在線程id: 22 ---
RustFisher 示例結(jié)束
組成
kotlin的協(xié)程實(shí)現(xiàn)分為兩個(gè)層次:
基礎(chǔ)設(shè)施層:標(biāo)準(zhǔn)庫的協(xié)程API,主要對協(xié)程提供了概念和語義上最基本的支持(如:kotlin.contracts.*)業(yè)務(wù)框架層:協(xié)程的上層框架支持(如:kotlinx.contracts.*)
掛起和恢復(fù)
常規(guī)函數(shù)的基本操作包括:invoke(或call)和 return,協(xié)程新增了 suspend 和 resume。
suspend:掛起/暫停,用于暫停當(dāng)前協(xié)程,并保存所有基本變量resume:用于讓已暫停的協(xié)程從暫停處繼續(xù)執(zhí)行
結(jié)構(gòu)化并發(fā)
結(jié)構(gòu)化并發(fā)(Structured Concurrency)是一種編程范式,用于編寫易讀、易維護(hù)的并發(fā)程序。在Kotlin協(xié)程中,結(jié)構(gòu)化并發(fā)特別指的是協(xié)程之間的協(xié)作是有組織、有紀(jì)律的。這種并發(fā)模式允許開發(fā)者明確地定義協(xié)程的入口和出口,并管理協(xié)程之間的依賴關(guān)系和生命周期。
結(jié)構(gòu)化并發(fā)的核心思想是,當(dāng)一個(gè)協(xié)程內(nèi)部創(chuàng)建了其他協(xié)程(子協(xié)程)時(shí),這些子協(xié)程的生命周期應(yīng)與父協(xié)程同步。具體來說,如果所有子協(xié)程在父協(xié)程的作用域結(jié)束前都已完成執(zhí)行,則認(rèn)為當(dāng)前協(xié)程具備結(jié)構(gòu)化并發(fā)。當(dāng)父協(xié)程結(jié)束時(shí),如果其子協(xié)程仍在運(yùn)行,則父協(xié)程會(huì)阻塞自己,等待子協(xié)程運(yùn)行完成后才退出。這種機(jī)制確保了即使在并發(fā)環(huán)境中,也能保持代碼的清晰性和可維護(hù)性。
在Kotlin協(xié)程中,結(jié)構(gòu)化并發(fā)主要依賴于CoroutineScope來實(shí)現(xiàn)。
當(dāng)我們使用結(jié)構(gòu)化并發(fā)后,可以做到:
取消任務(wù):當(dāng)某項(xiàng)任務(wù)不再需要時(shí)可以取消它追蹤任務(wù):當(dāng)任務(wù)正在執(zhí)行時(shí),可以追蹤它發(fā)出錯(cuò)誤信號:當(dāng)協(xié)程失敗時(shí),會(huì)發(fā)出錯(cuò)誤信號表面有錯(cuò)誤發(fā)生
定義協(xié)程時(shí)必須指定其 CoroutineScope,它是一個(gè)用于管理協(xié)程生命周期的接口,提供了創(chuàng)建和取消協(xié)程的方式。
常用的一些CoroutineScope:
GlobalScope:進(jìn)程級別的CoroutineScope,與應(yīng)用進(jìn)程同級;在GlobalScope中啟動(dòng)的協(xié)程不受任何特定生命周期的限制,即使Activity被銷毀,協(xié)程任務(wù)也可以繼續(xù)執(zhí)行。通常用于后臺(tái)任務(wù),如網(wǎng)絡(luò)請求、定時(shí)器等。MainScope:它與Activity的生命周期綁定,在MainScope中啟動(dòng)的協(xié)程會(huì)在Activity的onDestroy生命周期函數(shù)中取消。通常用于在UI線程上執(zhí)行協(xié)程,如更新UIViewModelScopeLifecycleScope
注意:使用協(xié)程時(shí),雖然它很輕量,并且不使用主線程,但仍會(huì)消耗一些內(nèi)存資源。如果忘記保持對新啟動(dòng)的協(xié)程的引用,它還會(huì)繼續(xù)運(yùn)行,導(dǎo)致內(nèi)存泄漏、資源泄露等問題。
協(xié)程構(gòu)建器
launch:它會(huì)立即返回一個(gè)Job對象,并在后臺(tái)執(zhí)行協(xié)程任務(wù)async:這個(gè)函數(shù)用于啟動(dòng)一個(gè)異步協(xié)程,并返回一個(gè)Deferred(繼承Job)對象,你可以通過調(diào)用await()方法來獲取異步協(xié)程的結(jié)果?!?/p>
作用域構(gòu)建器
以下三種作用域構(gòu)建器可以直接使用,它們都會(huì)繼承父協(xié)程的 coroutineScope。自己創(chuàng)建的協(xié)程作用域?qū)ο螅瑒t是使用自己的作用域。
使用自己的作用域的例如:
CoroutineScope實(shí)例對象Global.launch
可以記?。盒懽帜搁_頭會(huì)繼承,大寫的不會(huì)
以下三種作用域構(gòu)建器的異同,在 這里 有詳細(xì)說明。
runBlockingcoroutineScopesupervisorScope
import kotlinx.coroutines.*
fun main() = runBlocking {
val scope = CoroutineScope(Dispatchers.Default)
scope.launch {
delay(1000)
println("runBlocking 會(huì)等待CoroutineScope下協(xié)程的執(zhí)行嗎")
}
val job = GlobalScope.launch {
delay(1000)
println("runBlocking 會(huì)等待GlobalScope下協(xié)程的執(zhí)行嗎")
}
}
// 控制臺(tái)無打印結(jié)果,說明這兩種方法的作用域沒有繼承父協(xié)程的
fun main() = runBlocking {
coroutineScope {
delay(1000)
println("runBlocking 會(huì)等待coroutineScope下協(xié)程的執(zhí)行嗎")
}
supervisorScope {
delay(1000)
println("runBlocking 會(huì)等待supervisorScope下協(xié)程的執(zhí)行嗎")
}
}
// 控制臺(tái)把兩句話都打印了,明這兩種方法的作用域會(huì)繼承父協(xié)程的
掛起函數(shù)
掛起函數(shù)(Suspend Function)是一個(gè)特殊類型的函數(shù),它被標(biāo)記為suspend,并且只能在協(xié)程中調(diào)用。掛起函數(shù)的主要特點(diǎn)是它們能夠在執(zhí)行過程中掛起(暫停)和恢復(fù)(繼續(xù)執(zhí)行),而不會(huì)阻塞當(dāng)前線程。這使得掛起函數(shù)能夠以一種非阻塞的方式執(zhí)行異步操作,同時(shí)保持代碼的清晰性和可讀性。
掛起函數(shù)只能在協(xié)程體內(nèi)或其他掛起函數(shù)內(nèi)調(diào)用。
例如:將 launch { …… } 內(nèi)部的代碼塊提取到獨(dú)立的函數(shù)中。提取出來的函數(shù)需要 suspend 修飾符,它是掛起函數(shù)。
阻塞與非阻塞
runBlocking
delay 是非阻塞的, Thread.sleep 是阻塞的。顯式使用 runBlocking 協(xié)程構(gòu)建器來阻塞。
import kotlinx.coroutines.*
fun main() {
GlobalScope.launch { // 在后臺(tái)啟動(dòng)一個(gè)新的協(xié)程并繼續(xù)
delay(200)
"rustfisher.com".forEach {
print(it)
delay(280)
}
}
println("主線程中的代碼會(huì)立即執(zhí)行")
runBlocking { // 這個(gè)表達(dá)式阻塞了主線程
delay(3000L) //阻塞主線程防止過快退出
}
println("\n示例結(jié)束")
}
可以看到, runBlocking 里使用了 delay 來延遲。用了 runBlocking 的線程會(huì)一直阻塞直到 runBlocking 內(nèi)部的協(xié)程執(zhí)行完畢。 也就是 runBlocking{ delay } 實(shí)現(xiàn)了阻塞的效果。
我們也可以用 runBlocking 來包裝主函數(shù),runBlocking 中的Unit目前可以省略,并且runBlocking 也可用在測試中。
import kotlinx.coroutines.*
fun main() = runBlocking {
delay(100) // 在這里可以用delay了
GlobalScope.launch {
delay(100)
println("Fisher")
}
print("Rust ")
delay(3000)
}
全局協(xié)程像守護(hù)線程
我們在線程介紹中知道,如果進(jìn)程中只剩下了守護(hù)線程,那么虛擬機(jī)會(huì)退出。
前文那個(gè)打印 rustfisher.com 的例子,其實(shí)也能看到,字符沒打印完程序就結(jié)束了。 在 GlobalScope 中啟動(dòng)的活動(dòng)協(xié)程并不會(huì)使進(jìn)程?;睢K鼈兙拖袷刈o(hù)線程。
Job的生命周期
對于每一個(gè)創(chuàng)建的協(xié)程,會(huì)返回一個(gè)Job實(shí)例,該實(shí)例是協(xié)程的唯一標(biāo)識(shí),并且負(fù)責(zé)管理協(xié)程的生命周期。
New(新創(chuàng)建):協(xié)程對象剛剛通過launch或async等函數(shù)創(chuàng)建,但尚未開始執(zhí)行。Active(活躍):協(xié)程已經(jīng)開始執(zhí)行,但尚未完成。在這個(gè)階段,協(xié)程可能會(huì)執(zhí)行自己的任務(wù),也可能會(huì)啟動(dòng)子協(xié)程。Completing(完成中):協(xié)程已經(jīng)完成了自己的任務(wù),但可能還在等待其子協(xié)程完成。這個(gè)階段是短暫的,通常很快會(huì)過渡到下一個(gè)狀態(tài)。Completed(已完成):這是協(xié)程的最終狀態(tài),表示協(xié)程已經(jīng)成功執(zhí)行完畢或已被取消。Cancelling(取消中):協(xié)程正在等待取消操作完成。這通常發(fā)生在調(diào)用Job.cancel()方法后,但取消操作可能需要一些時(shí)間才能完成。Cancelled(已取消):協(xié)程已經(jīng)被取消。這可能是因?yàn)閰f(xié)程運(yùn)行出錯(cuò),或者顯式調(diào)用了Job.cancel()方法。在已取消狀態(tài)下,協(xié)程的執(zhí)行將被終止,并且不會(huì)再次啟動(dòng)。
需要注意的是,雖然上述狀態(tài)描述了協(xié)程的生命周期,但并非所有狀態(tài)都是直接可訪問的。相反,我們可以通過訪問Job對象的屬性(如isActive、isCancelled 和 isCompleted)來了解協(xié)程的當(dāng)前狀態(tài)。例如,如果 isActive 為 false 且 isCancelled 為 true,則表示協(xié)程處于取消中(Cancelling)狀態(tài);
注意:如果 isCompleted 為 true,則表示協(xié)程已完成(Completed)或已取消(Cancelled)。
常用函數(shù)
延時(shí)和等待
delay:可以達(dá)到延時(shí)的效果,是一個(gè)特殊的掛起函數(shù),它不會(huì)造成線程阻塞,但是會(huì)掛起協(xié)程,并且只能在協(xié)程中使用job.join():該方法會(huì)掛起當(dāng)前協(xié)程,等待job協(xié)程執(zhí)行完成,可以用于協(xié)程之間的順序執(zhí)行joinAll(job...):可以同時(shí)讓多個(gè)Job調(diào)用join方法job.await():該方法被join類似,但可以獲取協(xié)程完成的結(jié)果
啟動(dòng)和取消
啟動(dòng)
launch 和 async 協(xié)程構(gòu)建器都用于啟動(dòng)新協(xié)程。
lauch:返回一個(gè) Job 并且不附帶任何結(jié)果值。async:返回一個(gè) Deferred,它也是 Job,但可以使用 await() 來獲取協(xié)程執(zhí)行完后的返回值。
取消
Job.cancel():用于取消協(xié)程。Job.cancelAndJoin():同時(shí)具有 cancel() 和 join() 的作用。
暫停
yield():用于讓當(dāng)前協(xié)程暫停執(zhí)行,并將執(zhí)行權(quán)交還給協(xié)程調(diào)度器,以便讓其他協(xié)程有機(jī)會(huì)運(yùn)行。
當(dāng)你在協(xié)程中使用 yield() 時(shí),當(dāng)前協(xié)程會(huì)進(jìn)入掛起狀態(tài),但不會(huì)釋放其占用的資源(如內(nèi)存棧)。當(dāng)協(xié)程調(diào)度器決定再次執(zhí)行該協(xié)程時(shí),它會(huì)從 yield() 調(diào)用點(diǎn)恢復(fù)執(zhí)行。
這個(gè)函數(shù)通常用于實(shí)現(xiàn)非阻塞的并發(fā)編程,特別是在處理密集計(jì)算的場景時(shí),通過 yield() 可以避免長時(shí)間占用線程,從而提高應(yīng)用的響應(yīng)性和性能。
注意:yield() 并不保證一定會(huì)導(dǎo)致協(xié)程切換。它的行為取決于當(dāng)前的調(diào)度策略和協(xié)程調(diào)度器的實(shí)現(xiàn)。
import kotlinx.coroutines.*
fun main() = runBlocking {
launch {
delay(1000L) // 等待1秒
println("World!")
}
repeat(1000) { i ->
println("Hello $i")
//在每次打印500個(gè)Hello后,暫停當(dāng)前協(xié)程,讓hello有機(jī)會(huì)輸出
if (i % 500 == 0) {
yield()
}
}
}
協(xié)程啟動(dòng)
調(diào)度器
所有協(xié)程必須在調(diào)度器中運(yùn)行。
Dispatchers.Main:這是主線程調(diào)度器,用于處理UI交互和一些輕量級任務(wù)(調(diào)用掛起函數(shù)、UI函數(shù),更新LiveData)Dispatchers.Default:這是默認(rèn)調(diào)度器,通常用于CPU密集型任務(wù),如大量計(jì)算或數(shù)據(jù)處理。它會(huì)使用共享后臺(tái)線程的公共池來執(zhí)行協(xié)程。Dispatchers.IO:這是用于IO密集型任務(wù)的調(diào)度器,如文件讀寫、網(wǎng)絡(luò)請求等。它同樣使用共享線程池,但專注于IO操作。Dispatchers.Unconfined:這是一個(gè)不限制協(xié)程執(zhí)行線程的調(diào)度器。它不會(huì)將協(xié)程綁定到特定的線程或線程池,而是允許協(xié)程在任意線程中執(zhí)行。通常,這個(gè)調(diào)度器在某些特殊場景下使用,比如你需要完全控制協(xié)程的執(zhí)行線程。
啟動(dòng)方式
launch:這是最常用的啟動(dòng)協(xié)程的方式。它會(huì)立即返回一個(gè)Job對象,并在后臺(tái)執(zhí)行協(xié)程任務(wù)。如果在啟動(dòng)協(xié)程時(shí)使用了try-catch塊,異常會(huì)被捕獲;否則,異常會(huì)傳遞給未捕獲異常處理器進(jìn)行處理。async:這個(gè)函數(shù)用于啟動(dòng)一個(gè)異步協(xié)程,并返回一個(gè)Deferred對象,你可以通過調(diào)用await()方法來獲取異步協(xié)程的結(jié)果。runBlocking:這是一個(gè)阻塞式函數(shù),通常用于測試和調(diào)試協(xié)程代碼。它會(huì)啟動(dòng)一個(gè)新的協(xié)程,并等待其協(xié)程體以及所有子協(xié)程結(jié)束執(zhí)行完畢后才會(huì)返回。在調(diào)用runBlocking時(shí),當(dāng)前線程會(huì)被阻塞,直到協(xié)程執(zhí)行完畢。coroutineScope:它會(huì)創(chuàng)建一個(gè)協(xié)程作用域,并等待其協(xié)程體以及所有子協(xié)程結(jié)束。如果一個(gè)子協(xié)程失敗了,當(dāng)前域和域內(nèi)的協(xié)程都會(huì)被取消。(子協(xié)程的異常會(huì)傳播到父協(xié)程)supervisorScope:與 coroutineScope 一樣,但它在子協(xié)程失敗時(shí),不會(huì)影響域內(nèi)其他協(xié)程的執(zhí)行(子協(xié)程的異常不會(huì)傳播到父協(xié)程)
注: runBlocking 是常規(guī)函數(shù),會(huì)堵塞住當(dāng)前線程;coroutineScope 和 supervisorScope 是掛起函數(shù),不會(huì)堵塞住當(dāng)前線程。
import kotlinx.coroutines.*
import java.lang.RuntimeException
suspend fun main() {
// 換成supervisorScope后,使用log都會(huì)被輸出
coroutineScope {
val job1 = launch {
delay(5000)
// 以下輸出無法執(zhí)行
println("Job1 完成")
}
launch {
println("job2 執(zhí)行了")
throw RuntimeException()
}
delay(1000)
// 以下輸出無法執(zhí)行
println("會(huì)執(zhí)行我嗎?")
}
}
啟動(dòng)模式
DEFAULT:這是協(xié)程的默認(rèn)啟動(dòng)模式。當(dāng)協(xié)程創(chuàng)建后,它會(huì)立即開始調(diào)度。如果在調(diào)度前協(xié)程被取消,它會(huì)直接進(jìn)入取消狀態(tài)。ATOMIC:協(xié)程創(chuàng)建后會(huì)立即開始調(diào)度,協(xié)程在執(zhí)行到第一個(gè)掛起點(diǎn)之前不響應(yīng)取消。LAZY:協(xié)程只有在被需要時(shí)才會(huì)開始調(diào)度,如主動(dòng)調(diào)用協(xié)程的start、join或await等函數(shù)時(shí)。如果協(xié)程在調(diào)度前被取消,它將直接進(jìn)入異常結(jié)束狀態(tài)。UNDISPATCHED:協(xié)程創(chuàng)建后會(huì)在當(dāng)前函數(shù)調(diào)用棧中立即執(zhí)行,直到遇到第一個(gè)真正掛起的點(diǎn)。
如果想要你的協(xié)程立刻執(zhí)行,而不是等待調(diào)度,可以使用最后一個(gè)模式。
注:調(diào)度不等于執(zhí)行。調(diào)度(scheduling)是指決定協(xié)程何時(shí)在哪個(gè)線程上開始或恢復(fù)執(zhí)行的過程,而執(zhí)行(execution)是指協(xié)程代碼的實(shí)際運(yùn)行。協(xié)程什么時(shí)候執(zhí)行取決于調(diào)度器的當(dāng)前狀態(tài)和其他協(xié)程的優(yōu)先級。
import kotlinx.coroutines.*
fun main() = runBlocking {
// 默認(rèn)模式下程序立刻就結(jié)束了,沒有打印出log
//ATOMIC模式下,log被打印
val job1 = launch(start = CoroutineStart.ATOMIC) {
Thread.sleep(5000)
println("Job 完成")
}
// job2立刻就執(zhí)行了,使用其他模式需要等待job1睡眠結(jié)束
// 這是使用runBlocking啟動(dòng)方式的情況,使用coroutineScope則會(huì)立刻執(zhí)行job2
val job2 = launch(start = CoroutineStart.UNDISPATCHED) {
println("立刻執(zhí)行了")
}
job1.cancel()
}
注:使用UNDISPATCHED模式可以讓你的協(xié)程調(diào)度器即使為Dispatchers.IO類型(使用后臺(tái)線程),仍在主線程中執(zhí)行。
線程上下文
CoroutineContext 是一個(gè)接口,用于描述協(xié)程的運(yùn)行環(huán)境,包含了與協(xié)程執(zhí)行相關(guān)的各種參數(shù)和配置信息。
CoroutineContext 主要包含以下幾個(gè)方面的元素:
Job:代表協(xié)程的生命周期。通過 Job,你可以控制協(xié)程的啟動(dòng)、取消和等待其完成。 CoroutineDispatcher:協(xié)程調(diào)度器,用于向合適的線程分發(fā)任務(wù)。它決定了協(xié)程在哪個(gè)線程上執(zhí)行。 CoroutineName:協(xié)程的名稱,主要用于調(diào)試目的。 CoroutineExceptionHandler:處理協(xié)程中發(fā)生的(未被捕捉)異常。
有時(shí)我們需要在協(xié)程上下文中定義多個(gè)元素,可以用 + 操作符來實(shí)現(xiàn)。
例如:為一個(gè)協(xié)程指定一個(gè)調(diào)度器和名稱
fun main() = runBlocking
launch(Dispatchers.Default + CoroutineName("test")) {
println("我工作在:${Thread.currentThread().name}")
}
}
繼承的定義
新創(chuàng)建的協(xié)程,它的 CoroutineContext 會(huì)包含一個(gè)全新的Job,并返回,用于控制新協(xié)程的生命周期。而它上下文中剩下的元素會(huì)從 創(chuàng)建該協(xié)程的CoroutineScope 或 父協(xié)程的 CoroutineContext 繼承。
import kotlinx.coroutines.*
fun main() = runBlocking
val scope = CoroutineScope(Job() + Dispatchers.IO + CoroutineName("test"))
// 這里調(diào)用launch方法時(shí)繼承了scope的上下文
val job = scope.launch {
println("job-launch:${coroutineContext[CoroutineName]} ${Thread.currentThread().name}")
// 這里調(diào)用launch方法時(shí)繼承了父協(xié)程的上下文
val childJob = async {
println("childJob-launch:${coroutineContext[CoroutineName]} ${Thread.currentThread().name}")
coroutineContext[Job]
}
println("新協(xié)程的上下文中的Job對象 等于 返回的Job對象嗎:" + (childJob == childJob.await())) // 為true
}
job.join()
}
繼承的公式
協(xié)程的上下文 = 默認(rèn)值 + 繼承的CoroutineScope + 參數(shù)
默認(rèn)值:如 CoruoutineDispatchers 的默認(rèn)值為 Dispatchers.Default繼承的繼承的CoroutineScope是CoroutineScope或父協(xié)程的CoroutineContext傳入?yún)f(xié)程構(gòu)建器的參數(shù),其優(yōu)先級高于繼承的上下文參數(shù),會(huì)覆蓋對應(yīng)的參數(shù)值
協(xié)程取消與超時(shí)
取消
我們可以在協(xié)程尚未結(jié)束時(shí)主動(dòng)取消協(xié)程,協(xié)程在處于掛起點(diǎn)的時(shí)候就會(huì)被取消。
取消作用域時(shí),會(huì)把它的子協(xié)程都取消被取消的子協(xié)程不會(huì)影響其余兄弟協(xié)程協(xié)程通過拋出一個(gè)異常 CancellationException 來處理取消操作kotlinx.coroutines 中的掛起函數(shù)都是可被取消的取消協(xié)程時(shí),拋出的異常會(huì)被靜默處理,當(dāng)作正常完成
掛起點(diǎn)
當(dāng)掛起函數(shù)被調(diào)用時(shí),它們會(huì)暫停當(dāng)前協(xié)程的執(zhí)行,直到掛起函數(shù)的操作完成或需要等待某個(gè)條件滿足。這些暫停點(diǎn)被稱為掛起點(diǎn)(Suspension Points)。
協(xié)程只有在掛起點(diǎn)(即協(xié)程暫停執(zhí)行并等待某些條件滿足的點(diǎn))才會(huì)檢查其取消狀態(tài)。這些掛起點(diǎn)通常是由掛起函數(shù)(如 delay、withContext 等)產(chǎn)生的。如果協(xié)程在掛起點(diǎn)發(fā)現(xiàn)它已經(jīng)被取消,那么它通常會(huì)立即停止執(zhí)行并拋出 CancellationException。
需要注意的是,掛起點(diǎn)不僅僅是掛起函數(shù)本身產(chǎn)生的,還包括了掛起函數(shù)內(nèi)部可能調(diào)用的其他掛起函數(shù)。一個(gè)協(xié)程可能會(huì)在多個(gè)掛起點(diǎn)之間來回切換,直到最終完成。
另外,不是所有標(biāo)記為 suspend 的函數(shù)都會(huì)產(chǎn)生掛起點(diǎn)。有些掛起函數(shù)可能會(huì)立即返回結(jié)果,而不會(huì)導(dǎo)致協(xié)程掛起。這取決于函數(shù)內(nèi)部的實(shí)現(xiàn)和調(diào)用時(shí)的上下文。
取消失敗
如果協(xié)程在執(zhí)行計(jì)算(cpu密集型)任務(wù),并且沒檢查取消的話,那我們的取消嘗試會(huì)失敗。
import kotlinx.coroutines.*
fun main() = runBlocking {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0 // 模擬的控制循環(huán)數(shù)量
while (i < 5) { // 模擬耗時(shí)計(jì)算
if (System.currentTimeMillis() >= nextPrintTime) {
println("[job] 模擬耗時(shí)計(jì)算中 ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(800) // 等待一會(huì)
println("[rustfisher] 嘗試取消協(xié)程")
job.cancelAndJoin()
println("程序退出 bye~")
}
[job] 模擬耗時(shí)計(jì)算中 0 ...
[job] 模擬耗時(shí)計(jì)算中 1 ...
[rustfisher] 嘗試取消協(xié)程
[job] 模擬耗時(shí)計(jì)算中 2 ...
[job] 模擬耗時(shí)計(jì)算中 3 ...
[job] 模擬耗時(shí)計(jì)算中 4 ...
程序退出 bye~
可以看到,模擬耗時(shí)計(jì)算直到4,整個(gè)程序退出。而調(diào)用 cancelAndJoin() 并沒有成功取消掉協(xié)程。
可以取消
讓協(xié)程可被取消的方法
顯式的檢查取消狀態(tài),例如檢查 isActive 變量使用 ensureActive 方法,如果 Job 處于非活躍狀態(tài),這個(gè)方法就會(huì)立即拋出異常 CancellationException使用 yield 方法,它會(huì)檢查協(xié)程的狀態(tài),如果狀態(tài)為 已取消 ,則拋出異常 CancellationException;它還會(huì)讓出線程的執(zhí)行權(quán)補(bǔ):使用 delay(值>0),它會(huì)讓協(xié)程處于掛起點(diǎn)
注:實(shí)際調(diào)用取消方法后,如果協(xié)程在掛起點(diǎn)則會(huì)拋出異常進(jìn)行取消。原因是 Job 對象的 isCancelled 變?yōu)?true后,調(diào)用會(huì)使協(xié)程掛起的函數(shù)時(shí)都會(huì)拋出異常而成功取消掉協(xié)程。
對上面的代碼進(jìn)行一些改進(jìn)。把 while (i < 5) 循環(huán)中的條件改成 while (isActive) 。修改后的代 碼如下:
import kotlinx.coroutines.*
fun main() = runBlocking {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (i < 5 && isActive) { // 模擬耗時(shí)計(jì)算
// 或者在這調(diào)用ensureActive、yield、delay(1)都會(huì)拋出異常取消掉任務(wù),其他會(huì)使當(dāng)前協(xié)程處于掛起點(diǎn)的函數(shù)
if (System.currentTimeMillis() >= nextPrintTime) {
println("[job] 模擬耗時(shí)計(jì)算中 ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(800) // 等待一會(huì)
println("[rustfisher] 嘗試取消協(xié)程")
job.cancelAndJoin()
println("程序退出 bye~")
}
[job] 模擬耗時(shí)計(jì)算中 0 ...
[job] 模擬耗時(shí)計(jì)算中 1 ...
[rustfisher] 嘗試取消協(xié)程
程序退出 bye~
釋放資源
finally塊
取消協(xié)程時(shí),掛起函數(shù)(使用suspend修飾的函數(shù))會(huì)拋出異常:CancellationException。我們可以使用try-catch-finally來處理。并且在 finally塊中釋放資源。
import kotlinx.coroutines.*
fun main() = runBlocking {
val job = launch {
try {
repeat(1000) { i ->
println("[job]模擬計(jì)算次數(shù) $i ...")
delay(300L)
}
} catch (e: CancellationException) {
println("[job] CancellationException ${e.message}")
} finally {
println("[job][finally] 釋放資源..")
}
}
delay(800) // 等待一會(huì)
println("[rustfisher] 嘗試取消協(xié)程")
job.cancelAndJoin()
println("[rustfisher] 程序退出 bye~")
}
不能取消
有時(shí)候,我們需要運(yùn)行不能取消的代碼塊。
withCotext(context){}:使用給定的協(xié)程上下文調(diào)用指定的掛起塊,掛起直到完成,然后返回結(jié)果。
withContext(NonCancellable) 可以創(chuàng)建一個(gè)無法取消的協(xié)程作用域,確保在這個(gè)作用域內(nèi)執(zhí)行的掛起函數(shù)不會(huì)被取消。這通常在資源釋放或清理操作的上下文中使用,這些操作可能需要在協(xié)程被取消后仍然執(zhí)行。
實(shí)際上,這里在finally塊中調(diào)用了delay方法,它會(huì)檢查協(xié)程 isCancelled 的值,發(fā)現(xiàn)為true就會(huì)拋出異常,導(dǎo)致執(zhí)行完delay方法后面的代碼無法執(zhí)行。若把delay方法換成Thread.sleep方法,或在finally塊中再捕捉一次一次,即使沒使用withContext(NonCancellable) 也能保證finally塊中的代碼都被執(zhí)行。
import kotlinx.coroutines.*
fun main() = runBlocking {
val job = launch {
try {
repeat(1000) { i ->
println("[job]模擬計(jì)算次數(shù) $i ...")
delay(300L)
}
} catch (e: CancellationException) {
println("[job] CancellationException ${e.message}")
} finally {
withContext(NonCancellable) {
println("[job][finally] 進(jìn)入NonCancellable")
delay(1000) // 假設(shè)這里還有一些耗時(shí)操作
println("[job][finally] NonCancellable完畢")
}
println("[job][finally] 結(jié)束")
}
}
delay(800) // 等待一會(huì)
println("[rustfisher] 嘗試取消協(xié)程")
job.cancelAndJoin()
println("[rustfisher] 程序退出 bye~")
}
運(yùn)行結(jié)果如下:
[job]模擬計(jì)算次數(shù) 0 ...
[job]模擬計(jì)算次數(shù) 1 ...
[job]模擬計(jì)算次數(shù) 2 ...
[rustfisher] 嘗試取消協(xié)程
[job] CancellationException StandaloneCoroutine was cancelled
[job][finally] 進(jìn)入NonCancellable
# 如果沒有使用withContext(NonCancellable),則無法輸出下面兩行
[job][finally] NonCancellable完畢
[job][finally] 結(jié)束
[rustfisher] 程序退出 bye~
超時(shí)
withTimeout
我們可以用 withTimeout(long) 來指定超時(shí)時(shí)間。
超時(shí)后會(huì)拋出 TimeoutCancellationException,它是CancellationException 的子類。
如果沒有使用try-catch,控制臺(tái)是看不到該異常的,因?yàn)樵诒蝗∠膮f(xié)程中 CancellationException 會(huì)被認(rèn)為是協(xié)程執(zhí)行結(jié)束的正常原因。
import kotlinx.coroutines.*
fun main() = runBlocking
launch {
try {
withTimeout(400L) {
val startTime = System.currentTimeMillis()
repeat(1000) { i ->
println("[job] 運(yùn)行: $i, 累積運(yùn)行時(shí)間:${System.currentTimeMillis() - startTime}毫秒")
delay(100L)
}
}
} catch (e: Exception) {
println("異常: $e")
}
}
}
withTimeoutOrNull
withTimeoutOrNull 方法會(huì)在超時(shí)后返回null,如果成功執(zhí)行則返回我們指定的值。
import kotlinx.coroutines.*
fun main() = runBlocking
launch {
val result1 = withTimeoutOrNull(1300L) {
repeat(1000) { i ->
println("[job1] 運(yùn)行 $i ...")
delay(500L)
}
"[1] Done" // 根據(jù)超時(shí)設(shè)置 執(zhí)行不到這里
}
println("Result1: $result1")
val result2 = withTimeoutOrNull(1300L) {
repeat(2) { i ->
println("[job2] 運(yùn)行 $i ...")
delay(500L)
}
"[2] Done" // 成功執(zhí)行完畢后到這里
}
println("Result2: $result2")
}
}
運(yùn)行結(jié)果:
[job1] 運(yùn)行 0 ...
[job1] 運(yùn)行 1 ...
[job1] 運(yùn)行 2 ...
Result1: null
[job2] 運(yùn)行 0 ...
[job2] 運(yùn)行 1 ...
Result2: [2] Done
協(xié)程的異常處理
異常的傳播
協(xié)程構(gòu)建器有兩種形式:自動(dòng)傳播異常(launch于actor),向用戶暴露異常(async和produce)。
(1)當(dāng)這些構(gòu)建器用于創(chuàng)建一個(gè)根協(xié)程(該協(xié)程不是另一個(gè)協(xié)程的子協(xié)程)時(shí):
前者:異常會(huì)在它發(fā)生的第一時(shí)間拋出后者:依賴用戶最終來消費(fèi)異常,例如通過 await 和 receive
fun main() = runBlocking
val job1 = GlobalScope.launch {
println("job1 --> 拋出異常")
throw RuntimeException()
}
try {
job1.join()
} catch (e: Exception) {
// 最終還是拋出異常,這里捕捉失敗
println("job1 --> 捕捉了異常")
}
val job2 = GlobalScope.async {
println("job2 --> 拋出異常")
throw RuntimeException()
}
try {
job2.await()
} catch (e: Exception) {
// 這里捕捉異常成功
println("job2 --> 捕捉了異常")
}
}
(2)非根協(xié)程總是會(huì)被傳播,拋給父級
異常的傳播特性
當(dāng)一個(gè)協(xié)程由于一個(gè)異常運(yùn)行失敗時(shí),它會(huì)傳播這個(gè)異常并傳遞給的父級。接下來父級會(huì)進(jìn)行幾部操作:
取消它自己的子級取消它自己將異常傳播并傳遞給他的父級
superviseJob
使用superviseJob時(shí),子協(xié)程的失敗不會(huì)影響到其他子協(xié)程。它不會(huì)傳播異常給他的父級,而是讓子協(xié)程自己處理異常。
fun main() = runBlocking
val scope = CoroutineScope(SupervisorJob())
val job1 = scope.launch {
delay(100)
println("job --> 1")
throw RuntimeException()
}
val job2 = scope.launch {
try {
delay(Long.MAX_VALUE)
} finally {
// 參數(shù)為Job()時(shí)輸出,為Supervisor()不輸出
println("job --> 2")
}
}
joinAll(job1, job2)
}
或者使用 supervisorScope 作用域構(gòu)建器
異常的捕獲
當(dāng)使用CoroutineExceptionHandler對協(xié)程的異常進(jìn)行捕獲時(shí),以下條件被滿足時(shí),異常才會(huì)被捕獲:
協(xié)程的上下文包含CoroutineExceptionHandler對象
根協(xié)程拋出異常:直接作為參數(shù)傳入該協(xié)程的上下文子協(xié)程拋出異常:要傳入根協(xié)程的上下文,只傳入子協(xié)程無法捕捉
如果該協(xié)程有爸爸和爺爺,那么爺爺才是根協(xié)程 異常是被自動(dòng)拋出異常的協(xié)程所拋出的(是launch,非async)
fun main() = runBlocking
val handler = CoroutineExceptionHandler { _, throwable ->
println("捕捉到:$throwable")
}
val job1 = GlobalScope.launch(handler) {
throw RuntimeException("job --> 1") // 成功打印
}
val job2 = GlobalScope.launch {
// handler不在根協(xié)程中,異常捕捉不到,無打印
launch(handler) {
launch {
throw RuntimeException("job --> 2")
}
}
}
// 協(xié)程時(shí)async構(gòu)建器創(chuàng)建的,異常捕捉不到,無打印
val job3 = GlobalScope.async(handler) {
throw RuntimeException("job --> 3")
}
joinAll(job1, job2, job3)
// job3需要用await方法才會(huì)在控制臺(tái)顯示其異常信息
}
全局異常處理
全局異常處理器可以獲取到所有協(xié)程未處理的未捕獲異常,但它并不會(huì)對異常進(jìn)行捕獲,所以不能阻止程序崩潰。但是它在程序程序調(diào)試和異常上報(bào)等場景中仍然有很大的用處。
添加方法:
在classpath(例如resources目錄)下面新建 META-INF/services 目錄。在該目錄下新建文件名為 kotlinx.coroutines.CoroutineExceptionHandler 的文件,文件內(nèi)容填全局異常處理器的全類名。定義全局異常處理器,名稱隨意,需要繼承 CoroutineExceptionHandler
class GlobalCoroutineExceptionHandler: CoroutineExceptionHandler {
override val key = CoroutineExceptionHandler
override fun handleException(context: CoroutineContext, exception: Throwable) {
println("沒有被捕捉的異常:$exception")
}
}
取消與異常
協(xié)程內(nèi)部使用 CancellationException 來進(jìn)行取消,這個(gè)異常會(huì)被忽略。(沒有手動(dòng)捕捉時(shí)會(huì)自己靜默處理)子協(xié)程被取消時(shí),不會(huì)取消它的父協(xié)程如果一個(gè)協(xié)程遇到了 CancellationExceptino 以外的異常,則它會(huì)使用該異常取消它的父協(xié)程;父協(xié)程處理該異常時(shí),需要先取消所有子協(xié)程,才會(huì)去處理異常
fun main() = runBlocking
val handler = CoroutineExceptionHandler { _, e ->
println("子協(xié)程都取消完成后,父協(xié)程開始處理異常")
}
val job1 = GlobalScope.launch(handler) {
launch {
try {
delay(Long.MAX_VALUE)
} finally {
withContext(NonCancellable) {
println("1號子協(xié)程 --> 開始被取消了")
delay(100)
println("1號子協(xié)程 --> 取消完畢")
}
}
}
launch {
println("2號子協(xié)程 --> 開始拋出異常")
throw RuntimeException()
}
}
joinAll(job1)
}
結(jié)果:
2號子協(xié)程 --> 開始拋出異常
1號子協(xié)程 --> 開始被取消了
1號子協(xié)程 --> 取消完畢
子協(xié)程都取消完成后,父協(xié)程開始處理異常
異常聚合
當(dāng)協(xié)程的多個(gè)子協(xié)程因?yàn)楫惓6r(shí),一般情況下取第一個(gè)異常進(jìn)行處理。在第一個(gè)異常之后發(fā)生的其他異常,都將會(huì)被綁定到第一個(gè)異常上。
fun main() = runBlocking
val handler = CoroutineExceptionHandler { _, e ->
println("捕捉到異常:$e,${e.suppressed.contentToString()}")
}
val job1 = GlobalScope.launch(handler) {
launch {
try {
delay(Long.MAX_VALUE)
} finally {
throw RuntimeException("1號子協(xié)程")
}
}
launch {
delay(100)
throw RuntimeException("2號子協(xié)程")
}
}
joinAll(job1)
}
// 輸出 --> 捕捉到異常:java.lang.RuntimeException: 2號子協(xié)程,[java.lang.RuntimeException: 1號子協(xié)程]
異步流Flow
作用
掛起函數(shù)只可以異步的返回單個(gè)值,但如果想要異步返回多個(gè)計(jì)算好的值則需要用到Flow。
flow構(gòu)建器使用后會(huì)返回一個(gè)Flow對象
快速構(gòu)建流可以用 asFlow 方法 flow{…} 塊中的代碼可以掛起(可以自由使用掛起函數(shù))
序列只能使用它自己的掛起函數(shù)(例如無法用delay) 使用了它的函數(shù)不需要被標(biāo)記為 suspendflow流 使用 emit 函數(shù)發(fā)射值,使用 collect 函數(shù)收集值 集合和序列也可以返回多個(gè)值,但它們每個(gè)值的返回不是異步的。
fun simpleFlow() = flow
for (i in 1..3) {
delay(1000)
emit(i) // 產(chǎn)生一個(gè)元素
}
}
fun simpleSeq() = sequence
for (i in 1..3) {
Thread.sleep(1000)
yield(i) // 產(chǎn)生一個(gè)元素
}
}
fun main() = runBlocking
launch {
for (i in 1..3) {
println("我沒有被堵塞")
delay(1500)
}
}
// 快速構(gòu)建流:(1..3).asFlow()
// 使用flow時(shí),主協(xié)程沒有被堵塞,兩個(gè)任務(wù)都在執(zhí)行
simpleFlow().collect(::println)
// 使用序列時(shí),則發(fā)生堵塞,兩個(gè)任務(wù)只能順序執(zhí)行
simpleSeq().forEach(::println)
}
概念
冷流
Flow是一種類似于序列的冷流,flow構(gòu)建器中的代碼直到被收集時(shí)才會(huì)開始執(zhí)行(調(diào)用 collect 方法時(shí)才執(zhí)行,類似懶加載)。
與之相對的是熱流,StateFlow 和 ShareFlow 是熱流,在垃圾回收之前,都是存在于內(nèi)存之中,并且處于活躍狀態(tài)的。
Flow中的收集可以重復(fù)進(jìn)行,即 collect 方法可以多次調(diào)用。
流的連續(xù)性
流的每次單獨(dú)收集都是按順序執(zhí)行的,除非使用特殊操作符。
從上游到下游,每個(gè)過渡操作符都會(huì)處理每個(gè)發(fā)射出的值,然后再交給末端操作符。
流構(gòu)建器
flow:直接使用 flow {...}flowOf:該構(gòu)建器定義了一個(gè)發(fā)射固定值集的流.asFlow:擴(kuò)展函數(shù),可以將各種集合與序列轉(zhuǎn)換為流
fun main() = runBlocking
flowOf("one", "two", "three")
.onEach { delay(1000) }.collect(::println)
(1..3).asFlow()
.onEach { delay(1000) }.collect(::println)
}
流上下文
流的收集總是在調(diào)用協(xié)程的上下文中發(fā)生,流的該屬性稱為上下文保存flow{} 構(gòu)建器中的代碼必須遵循上下文保存屬性,并且不允許從其他上下文中發(fā)射(emit)flowOn操作符,該函數(shù)用于更改流發(fā)射的上下文launchIn,使用該函數(shù)替換collect,可以更改流收集時(shí)的上下文
即Flow的發(fā)射和收集默認(rèn)會(huì)遵循同樣的上下文,這將導(dǎo)致如果在主協(xié)程調(diào)用了收集方法,則它的發(fā)射也會(huì)在主協(xié)程里執(zhí)行。這會(huì)影響主協(xié)程其他任務(wù)的執(zhí)行。一般發(fā)射部分的代碼是耗時(shí)操作,需要在后臺(tái)協(xié)程中執(zhí)行,就可能需要更改其上下文。
如果在對流進(jìn)行收集時(shí),還需要對流再進(jìn)行一些耗時(shí)的操作(如過濾),則也需要更改其上下文。
fun simpleFlow() = flow
println("Flow start:${Thread.currentThread().name}")
for (i in 1..2) {
delay(1000)
emit(i) // 產(chǎn)生一個(gè)元素
}
}.flowOn(Dispatchers.IO) //IO型任務(wù)
fun main() = runBlocking
println("main:${Thread.currentThread().name}")
simpleFlow()
.onEach { println("Flow collect:${Thread.currentThread().name}") }
.launchIn(CoroutineScope(Dispatchers.Default)) //CPU密集型任務(wù)
.join() // 改變了收集的上下文,則該協(xié)程不受父級作用域管理,作用域指定this則不需要調(diào)用該方法
}
運(yùn)行結(jié)果:
main:main
Flow start:DefaultDispatcher-worker-2
Flow collect:DefaultDispatcher-worker-1
Flow collect:DefaultDispatcher-worker-2
流的取消
流采用與協(xié)程同樣的協(xié)作取消。流的收集可以在流所在協(xié)程被掛起時(shí)(處于掛起點(diǎn))取消。
取消檢測
Flow其實(shí)為 emit 方法附加上了 ensureActive 方法,這意味著從 Flow 塊發(fā)出的繁忙循環(huán)是可以取消的。(emit方法進(jìn)行了取消檢測)
處于性能原因,大多數(shù)其他的流操作是不會(huì)進(jìn)行取消檢測的,所有在協(xié)程處于繁忙循環(huán)時(shí),最好顯式的進(jìn)行手動(dòng)檢測。
通過 cancellable 操作符可以進(jìn)行取消檢測,但會(huì)影響性能
fun simpleFlow() = flow
for (i in 1..5) {
delay(1000)
emit(i) // 調(diào)用了emit方法,可以取消成功
}
}
fun main() = runBlocking
simpleFlow().collect {
println(it)
if (it == 3) cancel()
}
// 沒有調(diào)用emit方法,取消失敗
(1..5).asFlow().collect {
println(it)
if (it == 3) cancel()
}
// 使用了cancellable操作符,取消成功
(1..5).asFlow().cancellable().collect {
println(it)
if (it == 3) cancel()
}
}
背壓
當(dāng)數(shù)據(jù)生產(chǎn)者的生產(chǎn)速率高于數(shù)據(jù)消費(fèi)者的處理速率時(shí),就會(huì)產(chǎn)生背壓。這意味著生產(chǎn)者產(chǎn)生的數(shù)據(jù)量超過了消費(fèi)者能夠處理的數(shù)據(jù)量,造成了一種“阻塞”現(xiàn)象,數(shù)據(jù)在緩沖區(qū)中積壓,從而產(chǎn)生了壓力。
背壓問題通常有兩種解決方案:
降低數(shù)據(jù)生產(chǎn)者的生產(chǎn)速率,使其與消費(fèi)者的處理速率相匹配。 提高數(shù)據(jù)消費(fèi)者的處理速率,使其能夠更快地處理生產(chǎn)者產(chǎn)生的數(shù)據(jù)。
Flow中有以下方法可以來解決背壓問題:
buffer(int): 創(chuàng)建一個(gè)具有指定容量的緩沖區(qū),當(dāng)數(shù)據(jù)生產(chǎn)者的速度超過消費(fèi)者的速度時(shí),數(shù)據(jù)會(huì)被緩沖。(并發(fā))當(dāng)必須更改上下文時(shí),flowOn操作符使用了相同的緩存機(jī)制,但buffer函數(shù)是顯式的請求緩存而不改變執(zhí)行上下文。(并發(fā))onflate(): 合并發(fā)射項(xiàng),不對每個(gè)值進(jìn)行處理(不懂)collectLatest():會(huì)自動(dòng)丟棄那些還未被消費(fèi)者處理的數(shù)據(jù)項(xiàng),只保留最新的數(shù)據(jù)項(xiàng)。(只拿最新的)
fun simpleFlow() = flow
for (i in 1..3) {
delay(100)
emit(i)
println("start:${Thread.currentThread().name} --> $i")
}
}
fun main() = runBlocking
var time = measureTimeMillis {
simpleFlow().collect {
delay(300)
println("collect:${Thread.currentThread().name} --> $it")
}
}
println("花費(fèi)了:$time ms") //1254 ms = (300 + 100) * 3
time = measureTimeMillis {
simpleFlow().buffer().collect {
delay(300)
println("collect:${Thread.currentThread().name} --> $it")
}
}
println("花費(fèi)了:$time ms") //1068 ms = 300 * 3 + 100
time = measureTimeMillis {
simpleFlow().flowOn(Dispatchers.Default).collect {
delay(300)
println("collect:${Thread.currentThread().name} --> $it")
}
}
println("花費(fèi)了:$time ms") //1049 ms = 300 * 3 + 100
time = measureTimeMillis {
simpleFlow().conflate().collect {
delay(300)
println("collect:${Thread.currentThread().name} --> $it")
}
}
// 元素2被丟棄了
println("花費(fèi)了:$time ms") //778 ms
time = measureTimeMillis {
simpleFlow().collectLatest {
delay(300)
println("collect:${Thread.currentThread().name} --> $it")
}
}
// 元素1、2都被丟棄了
println("花費(fèi)了:$time ms") //660 ms
}
操作符
轉(zhuǎn)換操作符
**轉(zhuǎn)換操作符(Transformation Operators)**用于轉(zhuǎn)換流,它應(yīng)用于上游流,并返回下游流。
這些操作符也是冷操作符,運(yùn)行速度快,本身不是掛起函數(shù),返回新的轉(zhuǎn)換流的定義。
map:將流中的每個(gè)元素轉(zhuǎn)換為另一種形式(1對1) transform:可以轉(zhuǎn)換發(fā)射的元素,可以一對多(通過emit方法) take:只接收指定數(shù)量的元素。 filter:只保留滿足特定條件的元素。 drop:跳過指定數(shù)量的元素。 distinctUntilChanged:只發(fā)出與上一個(gè)值不同的元素。
末端操作符
末端操作符是在流中用于啟動(dòng)流收集的掛起函數(shù)。collect 是最基礎(chǔ)的末端操作符。
轉(zhuǎn)換為各種集合:toList 和 toSet獲取第一個(gè)元素:first確保流發(fā)射單個(gè)值:single將流規(guī)約到某值:
reduce:將流中的元素累積(或減少)為單個(gè)值fold:類似reduce,但可以指定初始值和更改累加器的類型
組合操作符
zip:用于將兩個(gè)流中的元素組合成對。(異步的組合)combine:用于將多個(gè)流中的元素組合成一個(gè)結(jié)果。
展平操作符
流表示異步接收的值序列,容易到這樣的情況:值序列1中每一個(gè)值都會(huì)觸發(fā)對另一個(gè)值序列的請求。由于流是異步的,因此就需要不同的展平模式。(組合)
假設(shè)值序列1為:a,b,c ;值序列2為:A,B
flatMapConcat:連接模式 —> aA,aB,bA,bB,cA,cBflatMapMerge:合并模式 —> aA,bA,cA,aB,bB,cBflatMapLatest:最新展平模式 —> aA,bA,cA,cB
異常處理
當(dāng)運(yùn)算符中的發(fā)射器或代碼拋出異常時(shí),有兩種處理異常的方法:
try/catch 塊:更適合捕捉下游異常catch 函數(shù):更適合捕捉上游異常(可以進(jìn)行恢復(fù)處理,如emit值)
Flow的設(shè)計(jì)原則是要保證異常的透明性的,即上游的異常也能傳播到下游
流的完成
當(dāng)流完成后(普通情況或異常情況),它可能需要執(zhí)行一些操作。
命令式finally塊:聲明式onCompletion函數(shù):非正常完成時(shí),可以獲取異常信息,但無法捕獲
通道和多路復(fù)用
channel
channel實(shí)際上是一個(gè)并發(fā)安全的隊(duì)列,它可以用來連接協(xié)程,實(shí)現(xiàn)不同協(xié)程的通信。 當(dāng)channel隊(duì)列中的緩沖區(qū)(默認(rèn)大小0)滿了,如果沒有人調(diào)用 receive 取走值,send 函數(shù)就會(huì)掛起,直到有人調(diào)用 receive,才會(huì)繼續(xù) send。
fun main() = runBlocking
val channel = Channel
val producer = GlobalScope.launch {
var i = 0
while (true) {
delay(1000)
channel.send(++i)
println("send:$i")
}
}
val consumer = GlobalScope.launch {
while (true) {
// 消費(fèi)效率比生產(chǎn)效率慢,但并沒有出現(xiàn)生產(chǎn)幾個(gè)后才消費(fèi)的情況
delay(2000)
val ele = channel.receive()
println("receive:$ele")
}
}
joinAll(producer, consumer)
}
channel是可以迭代的,可以獲取它的 iterator 進(jìn)行迭代。迭代channel一般用于生產(chǎn)效率高于消費(fèi)效率時(shí)。
produce與actor
Produce和actor是構(gòu)建生產(chǎn)者和消費(fèi)者的便捷方法。
通過 produce 方法可以啟動(dòng)一個(gè)生產(chǎn)者協(xié)程,并返回一個(gè) ReceiveChannel,其他協(xié)程可以用此channel來接收數(shù)據(jù)。使用 actor 方法則可以啟動(dòng)一個(gè)消費(fèi)者協(xié)程,并返回一個(gè) SendChannel,其他協(xié)程可以用詞channel來發(fā)送數(shù)據(jù)。
channel的關(guān)閉
produce 和 actor 返回的channel都會(huì)隨著對應(yīng)協(xié)程的執(zhí)行完畢而關(guān)閉,因此 channel 被稱為 熱數(shù)據(jù)流。
對于一個(gè)channel,如果調(diào)用它的 close 方法,它會(huì)立刻停止接收新元素(此時(shí)它的 isCloseForSend 會(huì)立刻返回true)。但由于channel存在緩沖區(qū),可能還有一些元素在緩沖區(qū)中沒有處理完,因此要等所有的元素都被讀取后 isCloseForReceive 才會(huì)返回true。
channel的生命周期最后有主導(dǎo)方來維護(hù),建議由主導(dǎo)方實(shí)現(xiàn)關(guān)閉。
BroadcastChannel
發(fā)送端和接收端在Channel中可以存在一對多的情況,但就算有多個(gè)接收端存在,同一個(gè)數(shù)據(jù)只會(huì)被一個(gè)接收端接收。廣播則可以讓多個(gè)接收端接收到同一個(gè)數(shù)據(jù)。
注意:此 API 自 1.5.0 起已過時(shí),自 1.7.0 起已棄用以刪除 它被替換為 SharedFlow 和 StateFlow。
多路復(fù)用
多路復(fù)用(Multiplexing)是通信技術(shù)中的一個(gè)基本概念,它指的是在同一傳輸介質(zhì)上同時(shí)傳輸多個(gè)不同信號源發(fā)出的信號,并且這些信號之間互不影響。這種技術(shù)的主要目的是提高介質(zhì)的利用率,從而達(dá)到節(jié)省信道資源、降低傳輸成本和提高傳輸效率的目的。
在 Kotlin 協(xié)程中,多路復(fù)用通常指的是同時(shí)執(zhí)行多個(gè)協(xié)程,并有效地管理和調(diào)度這些協(xié)程的執(zhí)行。在協(xié)程中,可以通過復(fù)用多個(gè) await 的方式實(shí)現(xiàn)多路復(fù)用。這意味著可以在一個(gè)協(xié)程中等待多個(gè)其他協(xié)程的完成,并根據(jù)需要選擇性地處理它們的結(jié)果。
一個(gè)常見的應(yīng)用場景是從不同的數(shù)據(jù)源(如網(wǎng)絡(luò)和本地緩存)同時(shí)獲取數(shù)據(jù)。例如,協(xié)程 A 可以從網(wǎng)絡(luò)獲取數(shù)據(jù),而協(xié)程 B 可以從本地緩存獲取數(shù)據(jù)。通過使用 select 代碼塊,可以同時(shí)執(zhí)行這兩個(gè)協(xié)程,并根據(jù)哪個(gè)協(xié)程先返回結(jié)果來選擇性地使用它的數(shù)據(jù)。這種方式有效地利用了系統(tǒng)資源,提高了程序的響應(yīng)速度和效率。
uspend fun CoroutineScope.getFromLocal() = async(Dispatchers.IO) {
delay(1000)
"本地文件讀取成功"
}
suspend fun CoroutineScope.getFromRemote() = async(Dispatchers.IO) {
delay(1200)
"網(wǎng)絡(luò)文件讀取成功"
}
fun main() = runBlocking
GlobalScope.launch {
val localJob = getFromLocal()
val remoteJob = getFromRemote()
// 只會(huì)返回一個(gè)結(jié)果,返回最快的那個(gè)
val result = select {
localJob.onAwait {it}
remoteJob.onAwait {it}
}
println(result)
}.join()
}
// 這是await多路復(fù)用,還有channel多路復(fù)用
// Flow的多路復(fù)用:使用Flow的merge方法
SelectClause
可以被 select 的事件返回值都是 SelectClauseN 類型的,包括:
SelectClause0:對應(yīng)事件沒有返回值,例如 join --> onJoinSelectClause1:對應(yīng)事件有返回值,例如 onAwait 和 onReceiveSelectClause2:對應(yīng)事件有返回值,并需要額外參數(shù),如 onSend
如果我們想要確認(rèn)掛起函數(shù)是否支持select,只需要查看其是否存在返回值類型為 SelectClauseN 的函數(shù)即可。
并發(fā)安全
使用線程時(shí)會(huì)存在并發(fā)問題,如對某值的累加(因?yàn)樗皇窃硬僮鳎?。kotlin協(xié)程中同樣會(huì)出現(xiàn)并發(fā)問題。
fun main() = runBlocking
var count = 0
List(1000) {
GlobalScope.launch { count++ }
}.joinAll()
println(count) // 值不為1000
val count2 = AtomicInteger(0)
List(1000) {
GlobalScope.launch { count2.incrementAndGet() }
}.joinAll()
println(count2.get()) // 值為1000
}
除了我們在線程中常用的解決并發(fā)問題的手段之外(例如java提供的原子類),協(xié)程框架也提供了一下并發(fā)安全的工具,包括:
Channel:并發(fā)安全的消息通道Mutex:輕量級鎖,它的 lock 和 unlock 從語義上與協(xié)程鎖比較類似,之所以輕量是因?yàn)樗讷@取不到鎖時(shí)不會(huì)堵塞線程,而是掛起等待鎖的釋放Semaphore:輕量級信號量,信號量可以有多個(gè),協(xié)程獲取到信號量后即可執(zhí)行并發(fā)操作。但Semaphore的參數(shù)為1時(shí),效果等價(jià)于Mutex
fun main() = runBlocking
var count = 0
val mutex = Mutex()
List(1000) {
GlobalScope.launch {
mutex.withLock { count++ }
}
}.joinAll()
println(count) // 值為1000
count = 0
val semaphore = Semaphore(1)
List(1000) {
GlobalScope.launch {
semaphore.withPermit { count++ }
}
}.joinAll()
println(count) // 值為1000
}
避免訪問外部可變狀態(tài)
編寫函數(shù)時(shí)要求它不得訪問外部狀態(tài),只能基于參數(shù)做運(yùn)算,通過返回值提供運(yùn)算結(jié)果。
fun main() = runBlocking
val count = 0
val result = count + List(1000) {
GlobalScope.async { 1 }
}.map { it.await() }.sum()
println(r) // 值為1000
}
柚子快報(bào)邀請碼778899分享:十、kotlin的協(xié)程
文章來源
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。