欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報(bào)邀請碼778899分享:十、kotlin的協(xié)程

柚子快報(bào)邀請碼778899分享:十、kotlin的協(xié)程

http://yzkb.51969.com/

協(xié)程

基本概念定義組成掛起和恢復(fù)結(jié)構(gòu)化并發(fā)協(xié)程構(gòu)建器作用域構(gòu)建器掛起函數(shù)阻塞與非阻塞runBlocking全局協(xié)程像守護(hù)線程

Job的生命周期

常用函數(shù)延時(shí)和等待啟動(dòng)和取消啟動(dòng)取消

暫停

協(xié)程啟動(dòng)調(diào)度器啟動(dòng)方式啟動(dòng)模式線程上下文繼承的定義繼承的公式

協(xié)程取消與超時(shí)取消掛起點(diǎn)取消失敗可以取消

釋放資源finally塊不能取消超時(shí)withTimeoutwithTimeoutOrNull

協(xié)程的異常處理異常的傳播異常的傳播特性superviseJob異常的捕獲全局異常處理取消與異常異常聚合

異步流Flow作用概念冷流流的連續(xù)性流構(gòu)建器流上下文

流的取消取消檢測

背壓操作符轉(zhuǎn)換操作符末端操作符組合操作符展平操作符

異常處理流的完成

通道和多路復(fù)用channelproduce與actorchannel的關(guān)閉BroadcastChannel

多路復(fù)用SelectClause

并發(fā)安全避免訪問外部可變狀態(tài)

基本概念

定義

協(xié)程基于線程,是輕量級的線程。

我們用GlobalScope啟動(dòng)了一個(gè)新的協(xié)程,這意味著新協(xié)程的生命周期只受整個(gè)應(yīng)用程序的生命周期限制。

import kotlinx.coroutines.GlobalScope

import kotlinx.coroutines.delay

import kotlinx.coroutines.launch

fun main() {

GlobalScope.launch { // 在后臺(tái)啟動(dòng)一個(gè)新的協(xié)程并繼續(xù)

delay(300) // 等待300毫秒

"rustfisher.com".forEach {

print(it)

delay(200) // 每次打印都等待一下

}

}

println("RustFisher")

Thread.sleep(3000) // 阻塞主線程防止過快退出

}

// 輸出結(jié)果:

//RustFisher

//rustfisher.com

協(xié)程不一定在同一個(gè)線程中,它們有在同一個(gè)線程的可能性。

import kotlinx.coroutines.GlobalScope

import kotlinx.coroutines.launch

import java.lang.Thread.sleep

fun main() {

println("main線程信息 ${Thread.currentThread().id}")

for (i in 1..20) { // 多啟動(dòng)幾次協(xié)程

GlobalScope.launch {

println("協(xié)程啟動(dòng)#$i 所在線程id: ${Thread.currentThread().id}")

}

}

sleep(5000) // 阻塞主線程防止過快退出

println("RustFisher 示例結(jié)束")

}

輸出結(jié)果:

協(xié)程啟動(dòng)#13 所在線程id: 34

協(xié)程啟動(dòng)#2 所在線程id: 22 ---

協(xié)程啟動(dòng)#9 所在線程id: 30

......

協(xié)程啟動(dòng)#20 所在線程id: 27

協(xié)程啟動(dòng)#19 所在線程id: 22 ---

RustFisher 示例結(jié)束

組成

kotlin的協(xié)程實(shí)現(xiàn)分為兩個(gè)層次:

基礎(chǔ)設(shè)施層:標(biāo)準(zhǔn)庫的協(xié)程API,主要對協(xié)程提供了概念和語義上最基本的支持(如:kotlin.contracts.*)業(yè)務(wù)框架層:協(xié)程的上層框架支持(如:kotlinx.contracts.*)

掛起和恢復(fù)

常規(guī)函數(shù)的基本操作包括:invoke(或call)和 return,協(xié)程新增了 suspend 和 resume。

suspend:掛起/暫停,用于暫停當(dāng)前協(xié)程,并保存所有基本變量resume:用于讓已暫停的協(xié)程從暫停處繼續(xù)執(zhí)行

結(jié)構(gòu)化并發(fā)

結(jié)構(gòu)化并發(fā)(Structured Concurrency)是一種編程范式,用于編寫易讀、易維護(hù)的并發(fā)程序。在Kotlin協(xié)程中,結(jié)構(gòu)化并發(fā)特別指的是協(xié)程之間的協(xié)作是有組織、有紀(jì)律的。這種并發(fā)模式允許開發(fā)者明確地定義協(xié)程的入口和出口,并管理協(xié)程之間的依賴關(guān)系和生命周期。

結(jié)構(gòu)化并發(fā)的核心思想是,當(dāng)一個(gè)協(xié)程內(nèi)部創(chuàng)建了其他協(xié)程(子協(xié)程)時(shí),這些子協(xié)程的生命周期應(yīng)與父協(xié)程同步。具體來說,如果所有子協(xié)程在父協(xié)程的作用域結(jié)束前都已完成執(zhí)行,則認(rèn)為當(dāng)前協(xié)程具備結(jié)構(gòu)化并發(fā)。當(dāng)父協(xié)程結(jié)束時(shí),如果其子協(xié)程仍在運(yùn)行,則父協(xié)程會(huì)阻塞自己,等待子協(xié)程運(yùn)行完成后才退出。這種機(jī)制確保了即使在并發(fā)環(huán)境中,也能保持代碼的清晰性和可維護(hù)性。

在Kotlin協(xié)程中,結(jié)構(gòu)化并發(fā)主要依賴于CoroutineScope來實(shí)現(xiàn)。

當(dāng)我們使用結(jié)構(gòu)化并發(fā)后,可以做到:

取消任務(wù):當(dāng)某項(xiàng)任務(wù)不再需要時(shí)可以取消它追蹤任務(wù):當(dāng)任務(wù)正在執(zhí)行時(shí),可以追蹤它發(fā)出錯(cuò)誤信號:當(dāng)協(xié)程失敗時(shí),會(huì)發(fā)出錯(cuò)誤信號表面有錯(cuò)誤發(fā)生

定義協(xié)程時(shí)必須指定其 CoroutineScope,它是一個(gè)用于管理協(xié)程生命周期的接口,提供了創(chuàng)建和取消協(xié)程的方式。

常用的一些CoroutineScope:

GlobalScope:進(jìn)程級別的CoroutineScope,與應(yīng)用進(jìn)程同級;在GlobalScope中啟動(dòng)的協(xié)程不受任何特定生命周期的限制,即使Activity被銷毀,協(xié)程任務(wù)也可以繼續(xù)執(zhí)行。通常用于后臺(tái)任務(wù),如網(wǎng)絡(luò)請求、定時(shí)器等。MainScope:它與Activity的生命周期綁定,在MainScope中啟動(dòng)的協(xié)程會(huì)在Activity的onDestroy生命周期函數(shù)中取消。通常用于在UI線程上執(zhí)行協(xié)程,如更新UIViewModelScopeLifecycleScope

注意:使用協(xié)程時(shí),雖然它很輕量,并且不使用主線程,但仍會(huì)消耗一些內(nèi)存資源。如果忘記保持對新啟動(dòng)的協(xié)程的引用,它還會(huì)繼續(xù)運(yùn)行,導(dǎo)致內(nèi)存泄漏、資源泄露等問題。

協(xié)程構(gòu)建器

launch:它會(huì)立即返回一個(gè)Job對象,并在后臺(tái)執(zhí)行協(xié)程任務(wù)async:這個(gè)函數(shù)用于啟動(dòng)一個(gè)異步協(xié)程,并返回一個(gè)Deferred(繼承Job)對象,你可以通過調(diào)用await()方法來獲取異步協(xié)程的結(jié)果?!?/p>

作用域構(gòu)建器

以下三種作用域構(gòu)建器可以直接使用,它們都會(huì)繼承父協(xié)程的 coroutineScope。自己創(chuàng)建的協(xié)程作用域?qū)ο螅瑒t是使用自己的作用域。

使用自己的作用域的例如:

CoroutineScope實(shí)例對象Global.launch

可以記?。盒懽帜搁_頭會(huì)繼承,大寫的不會(huì)

以下三種作用域構(gòu)建器的異同,在 這里 有詳細(xì)說明。

runBlockingcoroutineScopesupervisorScope

import kotlinx.coroutines.*

fun main() = runBlocking {

val scope = CoroutineScope(Dispatchers.Default)

scope.launch {

delay(1000)

println("runBlocking 會(huì)等待CoroutineScope下協(xié)程的執(zhí)行嗎")

}

val job = GlobalScope.launch {

delay(1000)

println("runBlocking 會(huì)等待GlobalScope下協(xié)程的執(zhí)行嗎")

}

}

// 控制臺(tái)無打印結(jié)果,說明這兩種方法的作用域沒有繼承父協(xié)程的

fun main() = runBlocking {

coroutineScope {

delay(1000)

println("runBlocking 會(huì)等待coroutineScope下協(xié)程的執(zhí)行嗎")

}

supervisorScope {

delay(1000)

println("runBlocking 會(huì)等待supervisorScope下協(xié)程的執(zhí)行嗎")

}

}

// 控制臺(tái)把兩句話都打印了,明這兩種方法的作用域會(huì)繼承父協(xié)程的

掛起函數(shù)

掛起函數(shù)(Suspend Function)是一個(gè)特殊類型的函數(shù),它被標(biāo)記為suspend,并且只能在協(xié)程中調(diào)用。掛起函數(shù)的主要特點(diǎn)是它們能夠在執(zhí)行過程中掛起(暫停)和恢復(fù)(繼續(xù)執(zhí)行),而不會(huì)阻塞當(dāng)前線程。這使得掛起函數(shù)能夠以一種非阻塞的方式執(zhí)行異步操作,同時(shí)保持代碼的清晰性和可讀性。

掛起函數(shù)只能在協(xié)程體內(nèi)或其他掛起函數(shù)內(nèi)調(diào)用。

例如:將 launch { …… } 內(nèi)部的代碼塊提取到獨(dú)立的函數(shù)中。提取出來的函數(shù)需要 suspend 修飾符,它是掛起函數(shù)。

阻塞與非阻塞

runBlocking

delay 是非阻塞的, Thread.sleep 是阻塞的。顯式使用 runBlocking 協(xié)程構(gòu)建器來阻塞。

import kotlinx.coroutines.*

fun main() {

GlobalScope.launch { // 在后臺(tái)啟動(dòng)一個(gè)新的協(xié)程并繼續(xù)

delay(200)

"rustfisher.com".forEach {

print(it)

delay(280)

}

}

println("主線程中的代碼會(huì)立即執(zhí)行")

runBlocking { // 這個(gè)表達(dá)式阻塞了主線程

delay(3000L) //阻塞主線程防止過快退出

}

println("\n示例結(jié)束")

}

可以看到, runBlocking 里使用了 delay 來延遲。用了 runBlocking 的線程會(huì)一直阻塞直到 runBlocking 內(nèi)部的協(xié)程執(zhí)行完畢。 也就是 runBlocking{ delay } 實(shí)現(xiàn)了阻塞的效果。

我們也可以用 runBlocking 來包裝主函數(shù),runBlocking 中的Unit目前可以省略,并且runBlocking 也可用在測試中。

import kotlinx.coroutines.*

fun main() = runBlocking {

delay(100) // 在這里可以用delay了

GlobalScope.launch {

delay(100)

println("Fisher")

}

print("Rust ")

delay(3000)

}

全局協(xié)程像守護(hù)線程

我們在線程介紹中知道,如果進(jìn)程中只剩下了守護(hù)線程,那么虛擬機(jī)會(huì)退出。

前文那個(gè)打印 rustfisher.com 的例子,其實(shí)也能看到,字符沒打印完程序就結(jié)束了。 在 GlobalScope 中啟動(dòng)的活動(dòng)協(xié)程并不會(huì)使進(jìn)程?;睢K鼈兙拖袷刈o(hù)線程。

Job的生命周期

對于每一個(gè)創(chuàng)建的協(xié)程,會(huì)返回一個(gè)Job實(shí)例,該實(shí)例是協(xié)程的唯一標(biāo)識(shí),并且負(fù)責(zé)管理協(xié)程的生命周期。

New(新創(chuàng)建):協(xié)程對象剛剛通過launch或async等函數(shù)創(chuàng)建,但尚未開始執(zhí)行。Active(活躍):協(xié)程已經(jīng)開始執(zhí)行,但尚未完成。在這個(gè)階段,協(xié)程可能會(huì)執(zhí)行自己的任務(wù),也可能會(huì)啟動(dòng)子協(xié)程。Completing(完成中):協(xié)程已經(jīng)完成了自己的任務(wù),但可能還在等待其子協(xié)程完成。這個(gè)階段是短暫的,通常很快會(huì)過渡到下一個(gè)狀態(tài)。Completed(已完成):這是協(xié)程的最終狀態(tài),表示協(xié)程已經(jīng)成功執(zhí)行完畢或已被取消。Cancelling(取消中):協(xié)程正在等待取消操作完成。這通常發(fā)生在調(diào)用Job.cancel()方法后,但取消操作可能需要一些時(shí)間才能完成。Cancelled(已取消):協(xié)程已經(jīng)被取消。這可能是因?yàn)閰f(xié)程運(yùn)行出錯(cuò),或者顯式調(diào)用了Job.cancel()方法。在已取消狀態(tài)下,協(xié)程的執(zhí)行將被終止,并且不會(huì)再次啟動(dòng)。

需要注意的是,雖然上述狀態(tài)描述了協(xié)程的生命周期,但并非所有狀態(tài)都是直接可訪問的。相反,我們可以通過訪問Job對象的屬性(如isActive、isCancelled 和 isCompleted)來了解協(xié)程的當(dāng)前狀態(tài)。例如,如果 isActive 為 false 且 isCancelled 為 true,則表示協(xié)程處于取消中(Cancelling)狀態(tài);

注意:如果 isCompleted 為 true,則表示協(xié)程已完成(Completed)或已取消(Cancelled)。

常用函數(shù)

延時(shí)和等待

delay:可以達(dá)到延時(shí)的效果,是一個(gè)特殊的掛起函數(shù),它不會(huì)造成線程阻塞,但是會(huì)掛起協(xié)程,并且只能在協(xié)程中使用job.join():該方法會(huì)掛起當(dāng)前協(xié)程,等待job協(xié)程執(zhí)行完成,可以用于協(xié)程之間的順序執(zhí)行joinAll(job...):可以同時(shí)讓多個(gè)Job調(diào)用join方法job.await():該方法被join類似,但可以獲取協(xié)程完成的結(jié)果

啟動(dòng)和取消

啟動(dòng)

launch 和 async 協(xié)程構(gòu)建器都用于啟動(dòng)新協(xié)程。

lauch:返回一個(gè) Job 并且不附帶任何結(jié)果值。async:返回一個(gè) Deferred,它也是 Job,但可以使用 await() 來獲取協(xié)程執(zhí)行完后的返回值。

取消

Job.cancel():用于取消協(xié)程。Job.cancelAndJoin():同時(shí)具有 cancel() 和 join() 的作用。

暫停

yield():用于讓當(dāng)前協(xié)程暫停執(zhí)行,并將執(zhí)行權(quán)交還給協(xié)程調(diào)度器,以便讓其他協(xié)程有機(jī)會(huì)運(yùn)行。

當(dāng)你在協(xié)程中使用 yield() 時(shí),當(dāng)前協(xié)程會(huì)進(jìn)入掛起狀態(tài),但不會(huì)釋放其占用的資源(如內(nèi)存棧)。當(dāng)協(xié)程調(diào)度器決定再次執(zhí)行該協(xié)程時(shí),它會(huì)從 yield() 調(diào)用點(diǎn)恢復(fù)執(zhí)行。

這個(gè)函數(shù)通常用于實(shí)現(xiàn)非阻塞的并發(fā)編程,特別是在處理密集計(jì)算的場景時(shí),通過 yield() 可以避免長時(shí)間占用線程,從而提高應(yīng)用的響應(yīng)性和性能。

注意:yield() 并不保證一定會(huì)導(dǎo)致協(xié)程切換。它的行為取決于當(dāng)前的調(diào)度策略和協(xié)程調(diào)度器的實(shí)現(xiàn)。

import kotlinx.coroutines.*

fun main() = runBlocking {

launch {

delay(1000L) // 等待1秒

println("World!")

}

repeat(1000) { i ->

println("Hello $i")

//在每次打印500個(gè)Hello后,暫停當(dāng)前協(xié)程,讓hello有機(jī)會(huì)輸出

if (i % 500 == 0) {

yield()

}

}

}

協(xié)程啟動(dòng)

調(diào)度器

所有協(xié)程必須在調(diào)度器中運(yùn)行。

Dispatchers.Main:這是主線程調(diào)度器,用于處理UI交互和一些輕量級任務(wù)(調(diào)用掛起函數(shù)、UI函數(shù),更新LiveData)Dispatchers.Default:這是默認(rèn)調(diào)度器,通常用于CPU密集型任務(wù),如大量計(jì)算或數(shù)據(jù)處理。它會(huì)使用共享后臺(tái)線程的公共池來執(zhí)行協(xié)程。Dispatchers.IO:這是用于IO密集型任務(wù)的調(diào)度器,如文件讀寫、網(wǎng)絡(luò)請求等。它同樣使用共享線程池,但專注于IO操作。Dispatchers.Unconfined:這是一個(gè)不限制協(xié)程執(zhí)行線程的調(diào)度器。它不會(huì)將協(xié)程綁定到特定的線程或線程池,而是允許協(xié)程在任意線程中執(zhí)行。通常,這個(gè)調(diào)度器在某些特殊場景下使用,比如你需要完全控制協(xié)程的執(zhí)行線程。

啟動(dòng)方式

launch:這是最常用的啟動(dòng)協(xié)程的方式。它會(huì)立即返回一個(gè)Job對象,并在后臺(tái)執(zhí)行協(xié)程任務(wù)。如果在啟動(dòng)協(xié)程時(shí)使用了try-catch塊,異常會(huì)被捕獲;否則,異常會(huì)傳遞給未捕獲異常處理器進(jìn)行處理。async:這個(gè)函數(shù)用于啟動(dòng)一個(gè)異步協(xié)程,并返回一個(gè)Deferred對象,你可以通過調(diào)用await()方法來獲取異步協(xié)程的結(jié)果。runBlocking:這是一個(gè)阻塞式函數(shù),通常用于測試和調(diào)試協(xié)程代碼。它會(huì)啟動(dòng)一個(gè)新的協(xié)程,并等待其協(xié)程體以及所有子協(xié)程結(jié)束執(zhí)行完畢后才會(huì)返回。在調(diào)用runBlocking時(shí),當(dāng)前線程會(huì)被阻塞,直到協(xié)程執(zhí)行完畢。coroutineScope:它會(huì)創(chuàng)建一個(gè)協(xié)程作用域,并等待其協(xié)程體以及所有子協(xié)程結(jié)束。如果一個(gè)子協(xié)程失敗了,當(dāng)前域和域內(nèi)的協(xié)程都會(huì)被取消。(子協(xié)程的異常會(huì)傳播到父協(xié)程)supervisorScope:與 coroutineScope 一樣,但它在子協(xié)程失敗時(shí),不會(huì)影響域內(nèi)其他協(xié)程的執(zhí)行(子協(xié)程的異常不會(huì)傳播到父協(xié)程)

注: runBlocking 是常規(guī)函數(shù),會(huì)堵塞住當(dāng)前線程;coroutineScope 和 supervisorScope 是掛起函數(shù),不會(huì)堵塞住當(dāng)前線程。

import kotlinx.coroutines.*

import java.lang.RuntimeException

suspend fun main() {

// 換成supervisorScope后,使用log都會(huì)被輸出

coroutineScope {

val job1 = launch {

delay(5000)

// 以下輸出無法執(zhí)行

println("Job1 完成")

}

launch {

println("job2 執(zhí)行了")

throw RuntimeException()

}

delay(1000)

// 以下輸出無法執(zhí)行

println("會(huì)執(zhí)行我嗎?")

}

}

啟動(dòng)模式

DEFAULT:這是協(xié)程的默認(rèn)啟動(dòng)模式。當(dāng)協(xié)程創(chuàng)建后,它會(huì)立即開始調(diào)度。如果在調(diào)度前協(xié)程被取消,它會(huì)直接進(jìn)入取消狀態(tài)。ATOMIC:協(xié)程創(chuàng)建后會(huì)立即開始調(diào)度,協(xié)程在執(zhí)行到第一個(gè)掛起點(diǎn)之前不響應(yīng)取消。LAZY:協(xié)程只有在被需要時(shí)才會(huì)開始調(diào)度,如主動(dòng)調(diào)用協(xié)程的start、join或await等函數(shù)時(shí)。如果協(xié)程在調(diào)度前被取消,它將直接進(jìn)入異常結(jié)束狀態(tài)。UNDISPATCHED:協(xié)程創(chuàng)建后會(huì)在當(dāng)前函數(shù)調(diào)用棧中立即執(zhí)行,直到遇到第一個(gè)真正掛起的點(diǎn)。

如果想要你的協(xié)程立刻執(zhí)行,而不是等待調(diào)度,可以使用最后一個(gè)模式。

注:調(diào)度不等于執(zhí)行。調(diào)度(scheduling)是指決定協(xié)程何時(shí)在哪個(gè)線程上開始或恢復(fù)執(zhí)行的過程,而執(zhí)行(execution)是指協(xié)程代碼的實(shí)際運(yùn)行。協(xié)程什么時(shí)候執(zhí)行取決于調(diào)度器的當(dāng)前狀態(tài)和其他協(xié)程的優(yōu)先級。

import kotlinx.coroutines.*

fun main() = runBlocking {

// 默認(rèn)模式下程序立刻就結(jié)束了,沒有打印出log

//ATOMIC模式下,log被打印

val job1 = launch(start = CoroutineStart.ATOMIC) {

Thread.sleep(5000)

println("Job 完成")

}

// job2立刻就執(zhí)行了,使用其他模式需要等待job1睡眠結(jié)束

// 這是使用runBlocking啟動(dòng)方式的情況,使用coroutineScope則會(huì)立刻執(zhí)行job2

val job2 = launch(start = CoroutineStart.UNDISPATCHED) {

println("立刻執(zhí)行了")

}

job1.cancel()

}

注:使用UNDISPATCHED模式可以讓你的協(xié)程調(diào)度器即使為Dispatchers.IO類型(使用后臺(tái)線程),仍在主線程中執(zhí)行。

線程上下文

CoroutineContext 是一個(gè)接口,用于描述協(xié)程的運(yùn)行環(huán)境,包含了與協(xié)程執(zhí)行相關(guān)的各種參數(shù)和配置信息。

CoroutineContext 主要包含以下幾個(gè)方面的元素:

Job:代表協(xié)程的生命周期。通過 Job,你可以控制協(xié)程的啟動(dòng)、取消和等待其完成。 CoroutineDispatcher:協(xié)程調(diào)度器,用于向合適的線程分發(fā)任務(wù)。它決定了協(xié)程在哪個(gè)線程上執(zhí)行。 CoroutineName:協(xié)程的名稱,主要用于調(diào)試目的。 CoroutineExceptionHandler:處理協(xié)程中發(fā)生的(未被捕捉)異常。

有時(shí)我們需要在協(xié)程上下文中定義多個(gè)元素,可以用 + 操作符來實(shí)現(xiàn)。

例如:為一個(gè)協(xié)程指定一個(gè)調(diào)度器和名稱

fun main() = runBlocking {

launch(Dispatchers.Default + CoroutineName("test")) {

println("我工作在:${Thread.currentThread().name}")

}

}

繼承的定義

新創(chuàng)建的協(xié)程,它的 CoroutineContext 會(huì)包含一個(gè)全新的Job,并返回,用于控制新協(xié)程的生命周期。而它上下文中剩下的元素會(huì)從 創(chuàng)建該協(xié)程的CoroutineScope 或 父協(xié)程的 CoroutineContext 繼承。

import kotlinx.coroutines.*

fun main() = runBlocking {

val scope = CoroutineScope(Job() + Dispatchers.IO + CoroutineName("test"))

// 這里調(diào)用launch方法時(shí)繼承了scope的上下文

val job = scope.launch {

println("job-launch:${coroutineContext[CoroutineName]} ${Thread.currentThread().name}")

// 這里調(diào)用launch方法時(shí)繼承了父協(xié)程的上下文

val childJob = async {

println("childJob-launch:${coroutineContext[CoroutineName]} ${Thread.currentThread().name}")

coroutineContext[Job]

}

println("新協(xié)程的上下文中的Job對象 等于 返回的Job對象嗎:" + (childJob == childJob.await())) // 為true

}

job.join()

}

繼承的公式

協(xié)程的上下文 = 默認(rèn)值 + 繼承的CoroutineScope + 參數(shù)

默認(rèn)值:如 CoruoutineDispatchers 的默認(rèn)值為 Dispatchers.Default繼承的繼承的CoroutineScope是CoroutineScope或父協(xié)程的CoroutineContext傳入?yún)f(xié)程構(gòu)建器的參數(shù),其優(yōu)先級高于繼承的上下文參數(shù),會(huì)覆蓋對應(yīng)的參數(shù)值

協(xié)程取消與超時(shí)

取消

我們可以在協(xié)程尚未結(jié)束時(shí)主動(dòng)取消協(xié)程,協(xié)程在處于掛起點(diǎn)的時(shí)候就會(huì)被取消。

取消作用域時(shí),會(huì)把它的子協(xié)程都取消被取消的子協(xié)程不會(huì)影響其余兄弟協(xié)程協(xié)程通過拋出一個(gè)異常 CancellationException 來處理取消操作kotlinx.coroutines 中的掛起函數(shù)都是可被取消的取消協(xié)程時(shí),拋出的異常會(huì)被靜默處理,當(dāng)作正常完成

掛起點(diǎn)

當(dāng)掛起函數(shù)被調(diào)用時(shí),它們會(huì)暫停當(dāng)前協(xié)程的執(zhí)行,直到掛起函數(shù)的操作完成或需要等待某個(gè)條件滿足。這些暫停點(diǎn)被稱為掛起點(diǎn)(Suspension Points)。

協(xié)程只有在掛起點(diǎn)(即協(xié)程暫停執(zhí)行并等待某些條件滿足的點(diǎn))才會(huì)檢查其取消狀態(tài)。這些掛起點(diǎn)通常是由掛起函數(shù)(如 delay、withContext 等)產(chǎn)生的。如果協(xié)程在掛起點(diǎn)發(fā)現(xiàn)它已經(jīng)被取消,那么它通常會(huì)立即停止執(zhí)行并拋出 CancellationException。

需要注意的是,掛起點(diǎn)不僅僅是掛起函數(shù)本身產(chǎn)生的,還包括了掛起函數(shù)內(nèi)部可能調(diào)用的其他掛起函數(shù)。一個(gè)協(xié)程可能會(huì)在多個(gè)掛起點(diǎn)之間來回切換,直到最終完成。

另外,不是所有標(biāo)記為 suspend 的函數(shù)都會(huì)產(chǎn)生掛起點(diǎn)。有些掛起函數(shù)可能會(huì)立即返回結(jié)果,而不會(huì)導(dǎo)致協(xié)程掛起。這取決于函數(shù)內(nèi)部的實(shí)現(xiàn)和調(diào)用時(shí)的上下文。

取消失敗

如果協(xié)程在執(zhí)行計(jì)算(cpu密集型)任務(wù),并且沒檢查取消的話,那我們的取消嘗試會(huì)失敗。

import kotlinx.coroutines.*

fun main() = runBlocking {

val startTime = System.currentTimeMillis()

val job = launch(Dispatchers.Default) {

var nextPrintTime = startTime

var i = 0 // 模擬的控制循環(huán)數(shù)量

while (i < 5) { // 模擬耗時(shí)計(jì)算

if (System.currentTimeMillis() >= nextPrintTime) {

println("[job] 模擬耗時(shí)計(jì)算中 ${i++} ...")

nextPrintTime += 500L

}

}

}

delay(800) // 等待一會(huì)

println("[rustfisher] 嘗試取消協(xié)程")

job.cancelAndJoin()

println("程序退出 bye~")

}

[job] 模擬耗時(shí)計(jì)算中 0 ...

[job] 模擬耗時(shí)計(jì)算中 1 ...

[rustfisher] 嘗試取消協(xié)程

[job] 模擬耗時(shí)計(jì)算中 2 ...

[job] 模擬耗時(shí)計(jì)算中 3 ...

[job] 模擬耗時(shí)計(jì)算中 4 ...

程序退出 bye~

可以看到,模擬耗時(shí)計(jì)算直到4,整個(gè)程序退出。而調(diào)用 cancelAndJoin() 并沒有成功取消掉協(xié)程。

可以取消

讓協(xié)程可被取消的方法

顯式的檢查取消狀態(tài),例如檢查 isActive 變量使用 ensureActive 方法,如果 Job 處于非活躍狀態(tài),這個(gè)方法就會(huì)立即拋出異常 CancellationException使用 yield 方法,它會(huì)檢查協(xié)程的狀態(tài),如果狀態(tài)為 已取消 ,則拋出異常 CancellationException;它還會(huì)讓出線程的執(zhí)行權(quán)補(bǔ):使用 delay(值>0),它會(huì)讓協(xié)程處于掛起點(diǎn)

注:實(shí)際調(diào)用取消方法后,如果協(xié)程在掛起點(diǎn)則會(huì)拋出異常進(jìn)行取消。原因是 Job 對象的 isCancelled 變?yōu)?true后,調(diào)用會(huì)使協(xié)程掛起的函數(shù)時(shí)都會(huì)拋出異常而成功取消掉協(xié)程。

對上面的代碼進(jìn)行一些改進(jìn)。把 while (i < 5) 循環(huán)中的條件改成 while (isActive) 。修改后的代 碼如下:

import kotlinx.coroutines.*

fun main() = runBlocking {

val startTime = System.currentTimeMillis()

val job = launch(Dispatchers.Default) {

var nextPrintTime = startTime

var i = 0

while (i < 5 && isActive) { // 模擬耗時(shí)計(jì)算

// 或者在這調(diào)用ensureActive、yield、delay(1)都會(huì)拋出異常取消掉任務(wù),其他會(huì)使當(dāng)前協(xié)程處于掛起點(diǎn)的函數(shù)

if (System.currentTimeMillis() >= nextPrintTime) {

println("[job] 模擬耗時(shí)計(jì)算中 ${i++} ...")

nextPrintTime += 500L

}

}

}

delay(800) // 等待一會(huì)

println("[rustfisher] 嘗試取消協(xié)程")

job.cancelAndJoin()

println("程序退出 bye~")

}

[job] 模擬耗時(shí)計(jì)算中 0 ...

[job] 模擬耗時(shí)計(jì)算中 1 ...

[rustfisher] 嘗試取消協(xié)程

程序退出 bye~

釋放資源

finally塊

取消協(xié)程時(shí),掛起函數(shù)(使用suspend修飾的函數(shù))會(huì)拋出異常:CancellationException。我們可以使用try-catch-finally來處理。并且在 finally塊中釋放資源。

import kotlinx.coroutines.*

fun main() = runBlocking {

val job = launch {

try {

repeat(1000) { i ->

println("[job]模擬計(jì)算次數(shù) $i ...")

delay(300L)

}

} catch (e: CancellationException) {

println("[job] CancellationException ${e.message}")

} finally {

println("[job][finally] 釋放資源..")

}

}

delay(800) // 等待一會(huì)

println("[rustfisher] 嘗試取消協(xié)程")

job.cancelAndJoin()

println("[rustfisher] 程序退出 bye~")

}

不能取消

有時(shí)候,我們需要運(yùn)行不能取消的代碼塊。

withCotext(context){}:使用給定的協(xié)程上下文調(diào)用指定的掛起塊,掛起直到完成,然后返回結(jié)果。

withContext(NonCancellable) 可以創(chuàng)建一個(gè)無法取消的協(xié)程作用域,確保在這個(gè)作用域內(nèi)執(zhí)行的掛起函數(shù)不會(huì)被取消。這通常在資源釋放或清理操作的上下文中使用,這些操作可能需要在協(xié)程被取消后仍然執(zhí)行。

實(shí)際上,這里在finally塊中調(diào)用了delay方法,它會(huì)檢查協(xié)程 isCancelled 的值,發(fā)現(xiàn)為true就會(huì)拋出異常,導(dǎo)致執(zhí)行完delay方法后面的代碼無法執(zhí)行。若把delay方法換成Thread.sleep方法,或在finally塊中再捕捉一次一次,即使沒使用withContext(NonCancellable) 也能保證finally塊中的代碼都被執(zhí)行。

import kotlinx.coroutines.*

fun main() = runBlocking {

val job = launch {

try {

repeat(1000) { i ->

println("[job]模擬計(jì)算次數(shù) $i ...")

delay(300L)

}

} catch (e: CancellationException) {

println("[job] CancellationException ${e.message}")

} finally {

withContext(NonCancellable) {

println("[job][finally] 進(jìn)入NonCancellable")

delay(1000) // 假設(shè)這里還有一些耗時(shí)操作

println("[job][finally] NonCancellable完畢")

}

println("[job][finally] 結(jié)束")

}

}

delay(800) // 等待一會(huì)

println("[rustfisher] 嘗試取消協(xié)程")

job.cancelAndJoin()

println("[rustfisher] 程序退出 bye~")

}

運(yùn)行結(jié)果如下:

[job]模擬計(jì)算次數(shù) 0 ...

[job]模擬計(jì)算次數(shù) 1 ...

[job]模擬計(jì)算次數(shù) 2 ...

[rustfisher] 嘗試取消協(xié)程

[job] CancellationException StandaloneCoroutine was cancelled

[job][finally] 進(jìn)入NonCancellable

# 如果沒有使用withContext(NonCancellable),則無法輸出下面兩行

[job][finally] NonCancellable完畢

[job][finally] 結(jié)束

[rustfisher] 程序退出 bye~

超時(shí)

withTimeout

我們可以用 withTimeout(long) 來指定超時(shí)時(shí)間。

超時(shí)后會(huì)拋出 TimeoutCancellationException,它是CancellationException 的子類。

如果沒有使用try-catch,控制臺(tái)是看不到該異常的,因?yàn)樵诒蝗∠膮f(xié)程中 CancellationException 會(huì)被認(rèn)為是協(xié)程執(zhí)行結(jié)束的正常原因。

import kotlinx.coroutines.*

fun main() = runBlocking {

launch {

try {

withTimeout(400L) {

val startTime = System.currentTimeMillis()

repeat(1000) { i ->

println("[job] 運(yùn)行: $i, 累積運(yùn)行時(shí)間:${System.currentTimeMillis() - startTime}毫秒")

delay(100L)

}

}

} catch (e: Exception) {

println("異常: $e")

}

}

}

withTimeoutOrNull

withTimeoutOrNull 方法會(huì)在超時(shí)后返回null,如果成功執(zhí)行則返回我們指定的值。

import kotlinx.coroutines.*

fun main() = runBlocking {

launch {

val result1 = withTimeoutOrNull(1300L) {

repeat(1000) { i ->

println("[job1] 運(yùn)行 $i ...")

delay(500L)

}

"[1] Done" // 根據(jù)超時(shí)設(shè)置 執(zhí)行不到這里

}

println("Result1: $result1")

val result2 = withTimeoutOrNull(1300L) {

repeat(2) { i ->

println("[job2] 運(yùn)行 $i ...")

delay(500L)

}

"[2] Done" // 成功執(zhí)行完畢后到這里

}

println("Result2: $result2")

}

}

運(yùn)行結(jié)果:

[job1] 運(yùn)行 0 ...

[job1] 運(yùn)行 1 ...

[job1] 運(yùn)行 2 ...

Result1: null

[job2] 運(yùn)行 0 ...

[job2] 運(yùn)行 1 ...

Result2: [2] Done

協(xié)程的異常處理

異常的傳播

協(xié)程構(gòu)建器有兩種形式:自動(dòng)傳播異常(launch于actor),向用戶暴露異常(async和produce)。

(1)當(dāng)這些構(gòu)建器用于創(chuàng)建一個(gè)根協(xié)程(該協(xié)程不是另一個(gè)協(xié)程的子協(xié)程)時(shí):

前者:異常會(huì)在它發(fā)生的第一時(shí)間拋出后者:依賴用戶最終來消費(fèi)異常,例如通過 await 和 receive

fun main() = runBlocking {

val job1 = GlobalScope.launch {

println("job1 --> 拋出異常")

throw RuntimeException()

}

try {

job1.join()

} catch (e: Exception) {

// 最終還是拋出異常,這里捕捉失敗

println("job1 --> 捕捉了異常")

}

val job2 = GlobalScope.async {

println("job2 --> 拋出異常")

throw RuntimeException()

}

try {

job2.await()

} catch (e: Exception) {

// 這里捕捉異常成功

println("job2 --> 捕捉了異常")

}

}

(2)非根協(xié)程總是會(huì)被傳播,拋給父級

異常的傳播特性

當(dāng)一個(gè)協(xié)程由于一個(gè)異常運(yùn)行失敗時(shí),它會(huì)傳播這個(gè)異常并傳遞給的父級。接下來父級會(huì)進(jìn)行幾部操作:

取消它自己的子級取消它自己將異常傳播并傳遞給他的父級

superviseJob

使用superviseJob時(shí),子協(xié)程的失敗不會(huì)影響到其他子協(xié)程。它不會(huì)傳播異常給他的父級,而是讓子協(xié)程自己處理異常。

fun main() = runBlocking {

val scope = CoroutineScope(SupervisorJob())

val job1 = scope.launch {

delay(100)

println("job --> 1")

throw RuntimeException()

}

val job2 = scope.launch {

try {

delay(Long.MAX_VALUE)

} finally {

// 參數(shù)為Job()時(shí)輸出,為Supervisor()不輸出

println("job --> 2")

}

}

joinAll(job1, job2)

}

或者使用 supervisorScope 作用域構(gòu)建器

異常的捕獲

當(dāng)使用CoroutineExceptionHandler對協(xié)程的異常進(jìn)行捕獲時(shí),以下條件被滿足時(shí),異常才會(huì)被捕獲:

協(xié)程的上下文包含CoroutineExceptionHandler對象

根協(xié)程拋出異常:直接作為參數(shù)傳入該協(xié)程的上下文子協(xié)程拋出異常:要傳入根協(xié)程的上下文,只傳入子協(xié)程無法捕捉

如果該協(xié)程有爸爸和爺爺,那么爺爺才是根協(xié)程 異常是被自動(dòng)拋出異常的協(xié)程所拋出的(是launch,非async)

fun main() = runBlocking {

val handler = CoroutineExceptionHandler { _, throwable ->

println("捕捉到:$throwable")

}

val job1 = GlobalScope.launch(handler) {

throw RuntimeException("job --> 1") // 成功打印

}

val job2 = GlobalScope.launch {

// handler不在根協(xié)程中,異常捕捉不到,無打印

launch(handler) {

launch {

throw RuntimeException("job --> 2")

}

}

}

// 協(xié)程時(shí)async構(gòu)建器創(chuàng)建的,異常捕捉不到,無打印

val job3 = GlobalScope.async(handler) {

throw RuntimeException("job --> 3")

}

joinAll(job1, job2, job3)

// job3需要用await方法才會(huì)在控制臺(tái)顯示其異常信息

}

全局異常處理

全局異常處理器可以獲取到所有協(xié)程未處理的未捕獲異常,但它并不會(huì)對異常進(jìn)行捕獲,所以不能阻止程序崩潰。但是它在程序程序調(diào)試和異常上報(bào)等場景中仍然有很大的用處。

添加方法:

在classpath(例如resources目錄)下面新建 META-INF/services 目錄。在該目錄下新建文件名為 kotlinx.coroutines.CoroutineExceptionHandler 的文件,文件內(nèi)容填全局異常處理器的全類名。定義全局異常處理器,名稱隨意,需要繼承 CoroutineExceptionHandler

class GlobalCoroutineExceptionHandler: CoroutineExceptionHandler {

override val key = CoroutineExceptionHandler

override fun handleException(context: CoroutineContext, exception: Throwable) {

println("沒有被捕捉的異常:$exception")

}

}

取消與異常

協(xié)程內(nèi)部使用 CancellationException 來進(jìn)行取消,這個(gè)異常會(huì)被忽略。(沒有手動(dòng)捕捉時(shí)會(huì)自己靜默處理)子協(xié)程被取消時(shí),不會(huì)取消它的父協(xié)程如果一個(gè)協(xié)程遇到了 CancellationExceptino 以外的異常,則它會(huì)使用該異常取消它的父協(xié)程;父協(xié)程處理該異常時(shí),需要先取消所有子協(xié)程,才會(huì)去處理異常

fun main() = runBlocking {

val handler = CoroutineExceptionHandler { _, e ->

println("子協(xié)程都取消完成后,父協(xié)程開始處理異常")

}

val job1 = GlobalScope.launch(handler) {

launch {

try {

delay(Long.MAX_VALUE)

} finally {

withContext(NonCancellable) {

println("1號子協(xié)程 --> 開始被取消了")

delay(100)

println("1號子協(xié)程 --> 取消完畢")

}

}

}

launch {

println("2號子協(xié)程 --> 開始拋出異常")

throw RuntimeException()

}

}

joinAll(job1)

}

結(jié)果:

2號子協(xié)程 --> 開始拋出異常

1號子協(xié)程 --> 開始被取消了

1號子協(xié)程 --> 取消完畢

子協(xié)程都取消完成后,父協(xié)程開始處理異常

異常聚合

當(dāng)協(xié)程的多個(gè)子協(xié)程因?yàn)楫惓6r(shí),一般情況下取第一個(gè)異常進(jìn)行處理。在第一個(gè)異常之后發(fā)生的其他異常,都將會(huì)被綁定到第一個(gè)異常上。

fun main() = runBlocking {

val handler = CoroutineExceptionHandler { _, e ->

println("捕捉到異常:$e,${e.suppressed.contentToString()}")

}

val job1 = GlobalScope.launch(handler) {

launch {

try {

delay(Long.MAX_VALUE)

} finally {

throw RuntimeException("1號子協(xié)程")

}

}

launch {

delay(100)

throw RuntimeException("2號子協(xié)程")

}

}

joinAll(job1)

}

// 輸出 --> 捕捉到異常:java.lang.RuntimeException: 2號子協(xié)程,[java.lang.RuntimeException: 1號子協(xié)程]

異步流Flow

作用

掛起函數(shù)只可以異步的返回單個(gè)值,但如果想要異步返回多個(gè)計(jì)算好的值則需要用到Flow。

flow構(gòu)建器使用后會(huì)返回一個(gè)Flow對象

快速構(gòu)建流可以用 asFlow 方法 flow{…} 塊中的代碼可以掛起(可以自由使用掛起函數(shù))

序列只能使用它自己的掛起函數(shù)(例如無法用delay) 使用了它的函數(shù)不需要被標(biāo)記為 suspendflow流 使用 emit 函數(shù)發(fā)射值,使用 collect 函數(shù)收集值 集合和序列也可以返回多個(gè)值,但它們每個(gè)值的返回不是異步的。

fun simpleFlow() = flow {

for (i in 1..3) {

delay(1000)

emit(i) // 產(chǎn)生一個(gè)元素

}

}

fun simpleSeq() = sequence {

for (i in 1..3) {

Thread.sleep(1000)

yield(i) // 產(chǎn)生一個(gè)元素

}

}

fun main() = runBlocking {

launch {

for (i in 1..3) {

println("我沒有被堵塞")

delay(1500)

}

}

// 快速構(gòu)建流:(1..3).asFlow()

// 使用flow時(shí),主協(xié)程沒有被堵塞,兩個(gè)任務(wù)都在執(zhí)行

simpleFlow().collect(::println)

// 使用序列時(shí),則發(fā)生堵塞,兩個(gè)任務(wù)只能順序執(zhí)行

simpleSeq().forEach(::println)

}

概念

冷流

Flow是一種類似于序列的冷流,flow構(gòu)建器中的代碼直到被收集時(shí)才會(huì)開始執(zhí)行(調(diào)用 collect 方法時(shí)才執(zhí)行,類似懶加載)。

與之相對的是熱流,StateFlow 和 ShareFlow 是熱流,在垃圾回收之前,都是存在于內(nèi)存之中,并且處于活躍狀態(tài)的。

Flow中的收集可以重復(fù)進(jìn)行,即 collect 方法可以多次調(diào)用。

流的連續(xù)性

流的每次單獨(dú)收集都是按順序執(zhí)行的,除非使用特殊操作符。

從上游到下游,每個(gè)過渡操作符都會(huì)處理每個(gè)發(fā)射出的值,然后再交給末端操作符。

流構(gòu)建器

flow:直接使用 flow {...}flowOf:該構(gòu)建器定義了一個(gè)發(fā)射固定值集的流.asFlow:擴(kuò)展函數(shù),可以將各種集合與序列轉(zhuǎn)換為流

fun main() = runBlocking {

flowOf("one", "two", "three")

.onEach { delay(1000) }.collect(::println)

(1..3).asFlow()

.onEach { delay(1000) }.collect(::println)

}

流上下文

流的收集總是在調(diào)用協(xié)程的上下文中發(fā)生,流的該屬性稱為上下文保存flow{} 構(gòu)建器中的代碼必須遵循上下文保存屬性,并且不允許從其他上下文中發(fā)射(emit)flowOn操作符,該函數(shù)用于更改流發(fā)射的上下文launchIn,使用該函數(shù)替換collect,可以更改流收集時(shí)的上下文

即Flow的發(fā)射和收集默認(rèn)會(huì)遵循同樣的上下文,這將導(dǎo)致如果在主協(xié)程調(diào)用了收集方法,則它的發(fā)射也會(huì)在主協(xié)程里執(zhí)行。這會(huì)影響主協(xié)程其他任務(wù)的執(zhí)行。一般發(fā)射部分的代碼是耗時(shí)操作,需要在后臺(tái)協(xié)程中執(zhí)行,就可能需要更改其上下文。

如果在對流進(jìn)行收集時(shí),還需要對流再進(jìn)行一些耗時(shí)的操作(如過濾),則也需要更改其上下文。

fun simpleFlow() = flow {

println("Flow start:${Thread.currentThread().name}")

for (i in 1..2) {

delay(1000)

emit(i) // 產(chǎn)生一個(gè)元素

}

}.flowOn(Dispatchers.IO) //IO型任務(wù)

fun main() = runBlocking {

println("main:${Thread.currentThread().name}")

simpleFlow()

.onEach { println("Flow collect:${Thread.currentThread().name}") }

.launchIn(CoroutineScope(Dispatchers.Default)) //CPU密集型任務(wù)

.join() // 改變了收集的上下文,則該協(xié)程不受父級作用域管理,作用域指定this則不需要調(diào)用該方法

}

運(yùn)行結(jié)果:

main:main

Flow start:DefaultDispatcher-worker-2

Flow collect:DefaultDispatcher-worker-1

Flow collect:DefaultDispatcher-worker-2

流的取消

流采用與協(xié)程同樣的協(xié)作取消。流的收集可以在流所在協(xié)程被掛起時(shí)(處于掛起點(diǎn))取消。

取消檢測

Flow其實(shí)為 emit 方法附加上了 ensureActive 方法,這意味著從 Flow 塊發(fā)出的繁忙循環(huán)是可以取消的。(emit方法進(jìn)行了取消檢測)

處于性能原因,大多數(shù)其他的流操作是不會(huì)進(jìn)行取消檢測的,所有在協(xié)程處于繁忙循環(huán)時(shí),最好顯式的進(jìn)行手動(dòng)檢測。

通過 cancellable 操作符可以進(jìn)行取消檢測,但會(huì)影響性能

fun simpleFlow() = flow {

for (i in 1..5) {

delay(1000)

emit(i) // 調(diào)用了emit方法,可以取消成功

}

}

fun main() = runBlocking {

simpleFlow().collect {

println(it)

if (it == 3) cancel()

}

// 沒有調(diào)用emit方法,取消失敗

(1..5).asFlow().collect {

println(it)

if (it == 3) cancel()

}

// 使用了cancellable操作符,取消成功

(1..5).asFlow().cancellable().collect {

println(it)

if (it == 3) cancel()

}

}

背壓

當(dāng)數(shù)據(jù)生產(chǎn)者的生產(chǎn)速率高于數(shù)據(jù)消費(fèi)者的處理速率時(shí),就會(huì)產(chǎn)生背壓。這意味著生產(chǎn)者產(chǎn)生的數(shù)據(jù)量超過了消費(fèi)者能夠處理的數(shù)據(jù)量,造成了一種“阻塞”現(xiàn)象,數(shù)據(jù)在緩沖區(qū)中積壓,從而產(chǎn)生了壓力。

背壓問題通常有兩種解決方案:

降低數(shù)據(jù)生產(chǎn)者的生產(chǎn)速率,使其與消費(fèi)者的處理速率相匹配。 提高數(shù)據(jù)消費(fèi)者的處理速率,使其能夠更快地處理生產(chǎn)者產(chǎn)生的數(shù)據(jù)。

Flow中有以下方法可以來解決背壓問題:

buffer(int): 創(chuàng)建一個(gè)具有指定容量的緩沖區(qū),當(dāng)數(shù)據(jù)生產(chǎn)者的速度超過消費(fèi)者的速度時(shí),數(shù)據(jù)會(huì)被緩沖。(并發(fā))當(dāng)必須更改上下文時(shí),flowOn操作符使用了相同的緩存機(jī)制,但buffer函數(shù)是顯式的請求緩存而不改變執(zhí)行上下文。(并發(fā))onflate(): 合并發(fā)射項(xiàng),不對每個(gè)值進(jìn)行處理(不懂)collectLatest():會(huì)自動(dòng)丟棄那些還未被消費(fèi)者處理的數(shù)據(jù)項(xiàng),只保留最新的數(shù)據(jù)項(xiàng)。(只拿最新的)

fun simpleFlow() = flow {

for (i in 1..3) {

delay(100)

emit(i)

println("start:${Thread.currentThread().name} --> $i")

}

}

fun main() = runBlocking {

var time = measureTimeMillis {

simpleFlow().collect {

delay(300)

println("collect:${Thread.currentThread().name} --> $it")

}

}

println("花費(fèi)了:$time ms") //1254 ms = (300 + 100) * 3

time = measureTimeMillis {

simpleFlow().buffer().collect {

delay(300)

println("collect:${Thread.currentThread().name} --> $it")

}

}

println("花費(fèi)了:$time ms") //1068 ms = 300 * 3 + 100

time = measureTimeMillis {

simpleFlow().flowOn(Dispatchers.Default).collect {

delay(300)

println("collect:${Thread.currentThread().name} --> $it")

}

}

println("花費(fèi)了:$time ms") //1049 ms = 300 * 3 + 100

time = measureTimeMillis {

simpleFlow().conflate().collect {

delay(300)

println("collect:${Thread.currentThread().name} --> $it")

}

}

// 元素2被丟棄了

println("花費(fèi)了:$time ms") //778 ms

time = measureTimeMillis {

simpleFlow().collectLatest {

delay(300)

println("collect:${Thread.currentThread().name} --> $it")

}

}

// 元素1、2都被丟棄了

println("花費(fèi)了:$time ms") //660 ms

}

操作符

轉(zhuǎn)換操作符

**轉(zhuǎn)換操作符(Transformation Operators)**用于轉(zhuǎn)換流,它應(yīng)用于上游流,并返回下游流。

這些操作符也是冷操作符,運(yùn)行速度快,本身不是掛起函數(shù),返回新的轉(zhuǎn)換流的定義。

map:將流中的每個(gè)元素轉(zhuǎn)換為另一種形式(1對1) transform:可以轉(zhuǎn)換發(fā)射的元素,可以一對多(通過emit方法) take:只接收指定數(shù)量的元素。 filter:只保留滿足特定條件的元素。 drop:跳過指定數(shù)量的元素。 distinctUntilChanged:只發(fā)出與上一個(gè)值不同的元素。

末端操作符

末端操作符是在流中用于啟動(dòng)流收集的掛起函數(shù)。collect 是最基礎(chǔ)的末端操作符。

轉(zhuǎn)換為各種集合:toList 和 toSet獲取第一個(gè)元素:first確保流發(fā)射單個(gè)值:single將流規(guī)約到某值:

reduce:將流中的元素累積(或減少)為單個(gè)值fold:類似reduce,但可以指定初始值和更改累加器的類型

組合操作符

zip:用于將兩個(gè)流中的元素組合成對。(異步的組合)combine:用于將多個(gè)流中的元素組合成一個(gè)結(jié)果。

展平操作符

流表示異步接收的值序列,容易到這樣的情況:值序列1中每一個(gè)值都會(huì)觸發(fā)對另一個(gè)值序列的請求。由于流是異步的,因此就需要不同的展平模式。(組合)

假設(shè)值序列1為:a,b,c ;值序列2為:A,B

flatMapConcat:連接模式 —> aA,aB,bA,bB,cA,cBflatMapMerge:合并模式 —> aA,bA,cA,aB,bB,cBflatMapLatest:最新展平模式 —> aA,bA,cA,cB

異常處理

當(dāng)運(yùn)算符中的發(fā)射器或代碼拋出異常時(shí),有兩種處理異常的方法:

try/catch 塊:更適合捕捉下游異常catch 函數(shù):更適合捕捉上游異常(可以進(jìn)行恢復(fù)處理,如emit值)

Flow的設(shè)計(jì)原則是要保證異常的透明性的,即上游的異常也能傳播到下游

流的完成

當(dāng)流完成后(普通情況或異常情況),它可能需要執(zhí)行一些操作。

命令式finally塊:聲明式onCompletion函數(shù):非正常完成時(shí),可以獲取異常信息,但無法捕獲

通道和多路復(fù)用

channel

channel實(shí)際上是一個(gè)并發(fā)安全的隊(duì)列,它可以用來連接協(xié)程,實(shí)現(xiàn)不同協(xié)程的通信。 當(dāng)channel隊(duì)列中的緩沖區(qū)(默認(rèn)大小0)滿了,如果沒有人調(diào)用 receive 取走值,send 函數(shù)就會(huì)掛起,直到有人調(diào)用 receive,才會(huì)繼續(xù) send。

fun main() = runBlocking {

val channel = Channel()

val producer = GlobalScope.launch {

var i = 0

while (true) {

delay(1000)

channel.send(++i)

println("send:$i")

}

}

val consumer = GlobalScope.launch {

while (true) {

// 消費(fèi)效率比生產(chǎn)效率慢,但并沒有出現(xiàn)生產(chǎn)幾個(gè)后才消費(fèi)的情況

delay(2000)

val ele = channel.receive()

println("receive:$ele")

}

}

joinAll(producer, consumer)

}

channel是可以迭代的,可以獲取它的 iterator 進(jìn)行迭代。迭代channel一般用于生產(chǎn)效率高于消費(fèi)效率時(shí)。

produce與actor

Produce和actor是構(gòu)建生產(chǎn)者和消費(fèi)者的便捷方法。

通過 produce 方法可以啟動(dòng)一個(gè)生產(chǎn)者協(xié)程,并返回一個(gè) ReceiveChannel,其他協(xié)程可以用此channel來接收數(shù)據(jù)。使用 actor 方法則可以啟動(dòng)一個(gè)消費(fèi)者協(xié)程,并返回一個(gè) SendChannel,其他協(xié)程可以用詞channel來發(fā)送數(shù)據(jù)。

channel的關(guān)閉

produce 和 actor 返回的channel都會(huì)隨著對應(yīng)協(xié)程的執(zhí)行完畢而關(guān)閉,因此 channel 被稱為 熱數(shù)據(jù)流。

對于一個(gè)channel,如果調(diào)用它的 close 方法,它會(huì)立刻停止接收新元素(此時(shí)它的 isCloseForSend 會(huì)立刻返回true)。但由于channel存在緩沖區(qū),可能還有一些元素在緩沖區(qū)中沒有處理完,因此要等所有的元素都被讀取后 isCloseForReceive 才會(huì)返回true。

channel的生命周期最后有主導(dǎo)方來維護(hù),建議由主導(dǎo)方實(shí)現(xiàn)關(guān)閉。

BroadcastChannel

發(fā)送端和接收端在Channel中可以存在一對多的情況,但就算有多個(gè)接收端存在,同一個(gè)數(shù)據(jù)只會(huì)被一個(gè)接收端接收。廣播則可以讓多個(gè)接收端接收到同一個(gè)數(shù)據(jù)。

注意:此 API 自 1.5.0 起已過時(shí),自 1.7.0 起已棄用以刪除 它被替換為 SharedFlow 和 StateFlow。

多路復(fù)用

多路復(fù)用(Multiplexing)是通信技術(shù)中的一個(gè)基本概念,它指的是在同一傳輸介質(zhì)上同時(shí)傳輸多個(gè)不同信號源發(fā)出的信號,并且這些信號之間互不影響。這種技術(shù)的主要目的是提高介質(zhì)的利用率,從而達(dá)到節(jié)省信道資源、降低傳輸成本和提高傳輸效率的目的。

在 Kotlin 協(xié)程中,多路復(fù)用通常指的是同時(shí)執(zhí)行多個(gè)協(xié)程,并有效地管理和調(diào)度這些協(xié)程的執(zhí)行。在協(xié)程中,可以通過復(fù)用多個(gè) await 的方式實(shí)現(xiàn)多路復(fù)用。這意味著可以在一個(gè)協(xié)程中等待多個(gè)其他協(xié)程的完成,并根據(jù)需要選擇性地處理它們的結(jié)果。

一個(gè)常見的應(yīng)用場景是從不同的數(shù)據(jù)源(如網(wǎng)絡(luò)和本地緩存)同時(shí)獲取數(shù)據(jù)。例如,協(xié)程 A 可以從網(wǎng)絡(luò)獲取數(shù)據(jù),而協(xié)程 B 可以從本地緩存獲取數(shù)據(jù)。通過使用 select 代碼塊,可以同時(shí)執(zhí)行這兩個(gè)協(xié)程,并根據(jù)哪個(gè)協(xié)程先返回結(jié)果來選擇性地使用它的數(shù)據(jù)。這種方式有效地利用了系統(tǒng)資源,提高了程序的響應(yīng)速度和效率。

uspend fun CoroutineScope.getFromLocal() = async(Dispatchers.IO) {

delay(1000)

"本地文件讀取成功"

}

suspend fun CoroutineScope.getFromRemote() = async(Dispatchers.IO) {

delay(1200)

"網(wǎng)絡(luò)文件讀取成功"

}

fun main() = runBlocking {

GlobalScope.launch {

val localJob = getFromLocal()

val remoteJob = getFromRemote()

// 只會(huì)返回一個(gè)結(jié)果,返回最快的那個(gè)

val result = select {

localJob.onAwait {it}

remoteJob.onAwait {it}

}

println(result)

}.join()

}

// 這是await多路復(fù)用,還有channel多路復(fù)用

// Flow的多路復(fù)用:使用Flow的merge方法

SelectClause

可以被 select 的事件返回值都是 SelectClauseN 類型的,包括:

SelectClause0:對應(yīng)事件沒有返回值,例如 join --> onJoinSelectClause1:對應(yīng)事件有返回值,例如 onAwait 和 onReceiveSelectClause2:對應(yīng)事件有返回值,并需要額外參數(shù),如 onSend

如果我們想要確認(rèn)掛起函數(shù)是否支持select,只需要查看其是否存在返回值類型為 SelectClauseN 的函數(shù)即可。

并發(fā)安全

使用線程時(shí)會(huì)存在并發(fā)問題,如對某值的累加(因?yàn)樗皇窃硬僮鳎?。kotlin協(xié)程中同樣會(huì)出現(xiàn)并發(fā)問題。

fun main() = runBlocking {

var count = 0

List(1000) {

GlobalScope.launch { count++ }

}.joinAll()

println(count) // 值不為1000

val count2 = AtomicInteger(0)

List(1000) {

GlobalScope.launch { count2.incrementAndGet() }

}.joinAll()

println(count2.get()) // 值為1000

}

除了我們在線程中常用的解決并發(fā)問題的手段之外(例如java提供的原子類),協(xié)程框架也提供了一下并發(fā)安全的工具,包括:

Channel:并發(fā)安全的消息通道Mutex:輕量級鎖,它的 lock 和 unlock 從語義上與協(xié)程鎖比較類似,之所以輕量是因?yàn)樗讷@取不到鎖時(shí)不會(huì)堵塞線程,而是掛起等待鎖的釋放Semaphore:輕量級信號量,信號量可以有多個(gè),協(xié)程獲取到信號量后即可執(zhí)行并發(fā)操作。但Semaphore的參數(shù)為1時(shí),效果等價(jià)于Mutex

fun main() = runBlocking {

var count = 0

val mutex = Mutex()

List(1000) {

GlobalScope.launch {

mutex.withLock { count++ }

}

}.joinAll()

println(count) // 值為1000

count = 0

val semaphore = Semaphore(1)

List(1000) {

GlobalScope.launch {

semaphore.withPermit { count++ }

}

}.joinAll()

println(count) // 值為1000

}

避免訪問外部可變狀態(tài)

編寫函數(shù)時(shí)要求它不得訪問外部狀態(tài),只能基于參數(shù)做運(yùn)算,通過返回值提供運(yùn)算結(jié)果。

fun main() = runBlocking {

val count = 0

val result = count + List(1000) {

GlobalScope.async { 1 }

}.map { it.await() }.sum()

println(r) // 值為1000

}

柚子快報(bào)邀請碼778899分享:十、kotlin的協(xié)程

http://yzkb.51969.com/

文章來源

評論可見,查看隱藏內(nèi)容

本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場。

轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/19532458.html

發(fā)布評論

您暫未設(shè)置收款碼

請?jiān)谥黝}配置——文章設(shè)置里上傳

掃描二維碼手機(jī)訪問

文章目錄