柚子快報(bào)邀請(qǐng)碼778899分享:RxJava響應(yīng)式編程
柚子快報(bào)邀請(qǐng)碼778899分享:RxJava響應(yīng)式編程
文章目錄
響應(yīng)式編程什么是響應(yīng)式編程TheReactiveManifesto(響應(yīng)式宣言)百度百科維基百科SpringFramework5ReactiveX
小結(jié)
RxJava什么是RxJava基本概念原理數(shù)據(jù)流上游、下游背壓觀察者模式RxJ觀察者模式圖解原理
使用場(chǎng)景如何使用依賴pom.xml使用步驟
操作符介紹創(chuàng)建類操作符轉(zhuǎn)換類操作符過(guò)濾類操作符組合類操作符功能操作符
線程調(diào)度相關(guān)同步、異步訂閱調(diào)度器背壓策略BackpressureStrategy.ERRORBackpressureStrategy.MISSINGBackpressureStrategy.BUFFERBackpressureStrategy.DROPBackpressureStrategy.LATEST
并行處理
小結(jié)
參考鏈接
響應(yīng)式編程
什么是響應(yīng)式編程
互聯(lián)網(wǎng)上對(duì)響應(yīng)式編程的解釋其實(shí)是多種多樣的。下面從幾個(gè)權(quán)威的解釋來(lái)認(rèn)識(shí)什么是響應(yīng)式編程。
TheReactiveManifesto(響應(yīng)式宣言)
Reactive Systems are: Responsive, Resilient, Elastic and Message Driven.
The Reactive Manifesto對(duì)響應(yīng)式的歸納為:響應(yīng)式系統(tǒng)應(yīng)當(dāng)是響應(yīng)的、適應(yīng)性強(qiáng)的、彈性的和消息驅(qū)動(dòng)的。
百度百科
響應(yīng)式編程是一種面向數(shù)據(jù)流和變化傳播的編程范式。
百度給出了例子來(lái)說(shuō)明何為響應(yīng)式編程:Excel的單元格可以包含類似"=B1+C1"的公式,或進(jìn)行求和計(jì)算,而包含公式的單元格的值會(huì)依據(jù)其他單元格的值的變化而變化,這就是典型的響應(yīng)式編程。
維基百科
Reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change. With this paradigm it is possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease, and also communicate that an inferred dependency within the associated execution model exists, which facilitates the automatic propagation of the changed data flow.
維基百科作為全世界的權(quán)威知識(shí)庫(kù),其給出的定義為:響應(yīng)式編程是一種聲明式的編程范式,其核心要素是數(shù)據(jù)流與其傳播變化,前者是關(guān)于數(shù)據(jù)結(jié)構(gòu)的描述,包括靜態(tài)的數(shù)組和動(dòng)態(tài)的事件發(fā)射器。
SpringFramework5
The term “reactive” refers to programming models that are built around reacting to change?—?network component reacting to I/O events, UI controller reacting to mouse events, etc. In that sense non-blocking is reactive because instead of being blocked we are now in the mode of reacting to notifications as operations complete or data becomes available.
術(shù)語(yǔ)“reactive”指的是圍繞響應(yīng)更改構(gòu)建的編程模型——網(wǎng)絡(luò)組件響應(yīng)I/O事件,UI控制器響應(yīng)鼠標(biāo)事件,等等。從這個(gè)意義上說(shuō),非阻塞是反應(yīng)性的,因?yàn)楫?dāng)操作完成或數(shù)據(jù)可用時(shí),我們現(xiàn)在處于響應(yīng)通知的模式,而不是被阻塞。
ReactiveX
Reactive Extensions for Async Programming.
ReactiveX是實(shí)現(xiàn)異步編程的響應(yīng)式擴(kuò)展。ReactiveX的官網(wǎng)并沒(méi)有對(duì)Reactive作出明確的解釋,而是通過(guò)一系列的技術(shù)實(shí)現(xiàn)去詮釋什么是響應(yīng)式編程。RxPY、RxJs、RxCpp、RxAndroid、RxGroovy,以及本文的核心編程庫(kù)RxJava,ReactiveX實(shí)現(xiàn)了主流編程語(yǔ)言的響應(yīng)式編程API,其核心便是異步編程。
小結(jié)
響應(yīng)式編程其實(shí)是個(gè)比較抽象的概念,以上解釋初看時(shí)并不會(huì)有任何直觀感受,需要真正走近使用才能有明顯的體會(huì)。
RxJava
什么是RxJava
歷史:ReactiveX(后文簡(jiǎn)稱Rx)是Reactive Extensions的縮寫(xiě),由微軟的架構(gòu)師Erik Meijer領(lǐng)導(dǎo)團(tuán)隊(duì)開(kāi)發(fā),于2012年11月開(kāi)源,Rx庫(kù)支持 .NET 、JavaScript 、Java 、Android、C++ 等幾乎全部的流行編程語(yǔ)言,Rx的大部分語(yǔ)言庫(kù)由ReactiveX這個(gè)組織負(fù)責(zé)維護(hù),比較流行的有RxJava/RxJS/Rx.NET,社區(qū)網(wǎng)站 reactivex.io,源碼托管于github.com/ReactiveX。其中RxJava(后文簡(jiǎn)稱RxJ)是Java語(yǔ)言的實(shí)現(xiàn)。 概念:RxJava 是對(duì)Java語(yǔ)言的響應(yīng)式編程實(shí)現(xiàn):一個(gè)使用可觀察序列組成異步和基于事件的程序的庫(kù)。它擴(kuò)展了觀察者模式以支持?jǐn)?shù)據(jù)/事件序列,并添加了允許您以聲明方式將序列組合在一起的運(yùn)算符,同時(shí)抽象出對(duì)低級(jí)線程、同步、線程安全和并發(fā)數(shù)據(jù)結(jié)構(gòu)等事物的關(guān)注。 總結(jié):基于事件流、實(shí)現(xiàn)異步操作的庫(kù)。
基本概念原理
數(shù)據(jù)流
數(shù)據(jù)流是響應(yīng)式編程的核心。數(shù)據(jù)流是一個(gè)按時(shí)間排序的即將發(fā)生的事件序列。拿點(diǎn)外賣來(lái)講,總會(huì)有這樣一些流程,選擇店鋪,進(jìn)入店鋪,兌換優(yōu)惠券,選擇主食,加個(gè)飲料,下單,支付。這一系列事件組成的序列就是數(shù)據(jù)流。數(shù)據(jù)流可以被觀測(cè)(下單完成庫(kù)存減少),被過(guò)濾(錢不夠了無(wú)法下單),被操作,或者為新的消費(fèi)者與另外一條流合并為一條新的流。
上游、下游
source.operator1().operator2().operator3().subscribe(consumer);
RxJ中的數(shù)據(jù)流由一個(gè)源、零個(gè)或多個(gè)中間步驟組成,后跟一個(gè)數(shù)據(jù)消費(fèi)者;我們可以把operator2()左看向源的方向稱為上游,向右看向消費(fèi)者被稱為下游。如果把RxJ的數(shù)據(jù)流向比作一條河流,那么就很容易理解上下游對(duì)應(yīng)的意義了。
背壓
當(dāng)數(shù)據(jù)流通過(guò)異步步驟時(shí),每個(gè)步驟可能以不同的速度執(zhí)行不同的事情,這時(shí)如果上游的數(shù)據(jù)產(chǎn)生過(guò)快,下游處理數(shù)據(jù)過(guò)慢,未及時(shí)處理的數(shù)據(jù)造成積壓,這些數(shù)據(jù)被放到緩沖池中,如果長(zhǎng)時(shí)間不處理這些緩沖數(shù)據(jù),最后一定會(huì)造成無(wú)法預(yù)料的結(jié)果。比如,在發(fā)洪水期間,下游沒(méi)辦法一下子消耗那么多水,大壩此時(shí)的作用就是攔截洪水,并根據(jù)下游的消耗情況酌情排放。
觀察者模式
觀察者模式是常用的軟件設(shè)計(jì)模式之一。觀察者模式定義對(duì)象間的一種一對(duì)多的依賴關(guān)系,當(dāng)一個(gè)對(duì)象(被觀察者)的狀態(tài)發(fā)生改變時(shí),所有依賴于它的對(duì)象(觀察者)都得到通知并被自動(dòng)更新。以點(diǎn)外賣為例,下單(被觀察者)這個(gè)動(dòng)作完成,外賣平臺(tái)可能會(huì)觸發(fā)以下動(dòng)作,實(shí)時(shí)修改訂單狀態(tài)(觀察者1)、通知用戶下單完成(觀察者2)、給客戶送積分(觀察者3)、通知商家接單(觀察者4)…,這些動(dòng)作之間并沒(méi)有強(qiáng)耦合,非常適合觀察者模式,即如果下單后需要觸發(fā)更多的操作,只需要增加觀察者即可,完美實(shí)現(xiàn)軟件設(shè)計(jì)的開(kāi)閉原則。
RxJ觀察者模式
RxJ其實(shí)就是對(duì)觀察者模式的擴(kuò)展。 被觀察者(Observable):通過(guò)訂閱行為subscribe()把事件按順序發(fā)送到觀察者(Observer)。 觀察者(Observer):按順序接收到事件&做出響應(yīng)反饋。
圖解原理
RxJ的入門(mén)有一個(gè)經(jīng)典案例,即顧客點(diǎn)餐,下面以此例對(duì)RxJ產(chǎn)生一個(gè)初步的認(rèn)識(shí)。 顧客點(diǎn)餐:顧客點(diǎn)餐,廚師按訂單依次準(zhǔn)備(薯?xiàng)l、漢堡、雞塊、肉卷、奶茶、魚(yú)排),服務(wù)員依次按做好的菜品上菜(薯?xiàng)l、漢堡、雞塊、肉卷、奶茶、魚(yú)排),最后廚師告知服務(wù)菜齊了,服務(wù)員通知顧客菜已上齊。 其中RxJ中觀察者、被觀察者、訂閱、事件與上圖對(duì)應(yīng)的關(guān)系可以表示為
點(diǎn)餐RxJava說(shuō)明廚師被觀察者(Observable)產(chǎn)生數(shù)據(jù)流,產(chǎn)生事件——即做雞塊、做薯?xiàng)l…,菜做完告知服務(wù)員服務(wù)員訂閱(subscribe)連接被觀察者和觀察者——即將菜上給顧客,最后告知顧客做完了顧客觀察者(Observer)消費(fèi)數(shù)據(jù)流,監(jiān)聽(tīng)事件——即上一個(gè)菜吃一個(gè),上完接著吃每份菜、做完菜的通知事件(Event)數(shù)據(jù)流——即雞塊、薯?xiàng)l…這些菜品,以及菜已做齊這件事
以上過(guò)程分別可以用RxJ3可以表示為
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableEmitter;
import io.reactivex.rxjava3.core.ObservableOnSubscribe;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.operators.observable.ObservableObserveOn;
import org.junit.Test;
/**
* 顧客點(diǎn)餐案例解釋RxJava
*
* @author yaoji
* @date 2022/7/20 0:52
*/
public class CustomerOrderObservableTest {
@Test
public void orderTest() {
/*
* Observable被觀察者,數(shù)據(jù)流/事件的生產(chǎn)者——RxJava中稱為上游-upstream
* 本例中被觀察者為廚師,負(fù)責(zé)準(zhǔn)備薯?xiàng)l、漢堡、雞塊、肉卷、奶茶、魚(yú)排,并告知服務(wù)員菜都準(zhǔn)備齊了
*/
Observable
@Override
public void subscribe(@NonNull ObservableEmitter
// emitter是發(fā)射器,可以理解為服務(wù)員
// RxJava通過(guò)onNext事件傳遞數(shù)據(jù)流
System.out.println("薯?xiàng)l準(zhǔn)備完畢,服務(wù)員上菜...");
emitter.onNext("薯?xiàng)l");
System.out.println("漢堡準(zhǔn)備完畢,服務(wù)員上菜...");
emitter.onNext("漢堡");
System.out.println("雞塊準(zhǔn)備完畢,服務(wù)員上菜...");
emitter.onNext("雞塊");
System.out.println("肉卷準(zhǔn)備完畢,服務(wù)員上菜...");
emitter.onNext("肉卷");
System.out.println("奶茶準(zhǔn)備完畢,服務(wù)員上菜...");
emitter.onNext("奶茶");
System.out.println("魚(yú)排準(zhǔn)備完畢,服務(wù)員上菜...");
emitter.onNext("魚(yú)排");
System.out.println("全部菜品準(zhǔn)備完成");
// onComplete事件表示所有數(shù)據(jù)傳遞完畢 即全部菜準(zhǔn)備齊全
emitter.onComplete();
}
});
/*
* Observer觀察者,數(shù)據(jù)流/事件的接收者——RxJava中稱為下游-downstream
* 本例中觀察者為客戶,負(fù)責(zé)接收服務(wù)員送到的菜,以及菜是否上齊等信息
*/
Observer
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("顧客點(diǎn)單:薯?xiàng)l、漢堡、雞塊、肉卷、奶茶、魚(yú)排");
}
@Override
public void onNext(@NonNull String s) {
System.out.println("顧客接受上菜:" + s);
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("上菜出錯(cuò)了" + e);
}
@Override
public void onComplete() {
System.out.println("菜上齊了?。?!");
}
};
// 通過(guò)subscribe建立聯(lián)系,subscribe()=訂閱,即服務(wù)員,連接廚師和顧客
cookerObservable.subscribe(customerObserver);
/*輸出結(jié)果:
顧客點(diǎn)單:薯?xiàng)l、漢堡、雞塊、肉卷、奶茶、魚(yú)排
薯?xiàng)l準(zhǔn)備完畢,服務(wù)員上菜...
顧客接受上菜:薯?xiàng)l
漢堡準(zhǔn)備完畢,服務(wù)員上菜...
顧客接受上菜:漢堡
雞塊準(zhǔn)備完畢,服務(wù)員上菜...
顧客接受上菜:雞塊
肉卷準(zhǔn)備完畢,服務(wù)員上菜...
顧客接受上菜:肉卷
奶茶準(zhǔn)備完畢,服務(wù)員上菜...
顧客接受上菜:奶茶
魚(yú)排準(zhǔn)備完畢,服務(wù)員上菜...
顧客接受上菜:魚(yú)排
全部菜品準(zhǔn)備完成
菜上齊了!??!
*/
}
}
使用場(chǎng)景
所有異步操作均可使用RxJ。其實(shí)RxJ完全可以被自定義線程替代,但RxJ的鏈?zhǔn)秸{(diào)用和操作符能大量簡(jiǎn)化代碼的復(fù)雜度,使得異步調(diào)用、線程切換變得十分簡(jiǎn)單。
P.S RxJ在Android領(lǐng)域能發(fā)揮出更強(qiáng)大的功效(可試試RxAndroid和RxKotlin),因?yàn)榇蟛糠值腢I開(kāi)發(fā)都有異步數(shù)據(jù)流的處理。RxJ在Java服務(wù)端并沒(méi)有很熱門(mén)的反響,其根本原因是服務(wù)端絕大部分處理以同步為主,加之RxJ的入門(mén)成本是比較高的。目前使用RxJ完成的項(xiàng)目中,比較出名的類庫(kù)是Hystrix(處理分布式系統(tǒng)的延遲和容錯(cuò)的庫(kù))。
如何使用
早在2019年,ReactiveX就發(fā)布了RxJ3,RxJ主要包括RxJ1.x,RxJ2.x,RxJ3.x,需要注意的是,RxJ2.x是對(duì)RxJ1.x的增強(qiáng),而RxJ3.x除了增強(qiáng)外,其包結(jié)構(gòu)是做了不兼容調(diào)整的,升級(jí)時(shí)需要注意。本文的使用案例均基于RxJ3.x。 Github RxJava官網(wǎng)使用說(shuō)明
依賴pom.xml
maven管理
使用步驟
創(chuàng)建被觀察者 Observable:產(chǎn)生數(shù)據(jù)流創(chuàng)建觀察者 Observer:消費(fèi)數(shù)據(jù)訂閱 Subscribe:連接被觀察者和觀察者
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import org.junit.Test;
/**
* RxJava使用步驟
* @author yaoji
* @date 2022/7/24 21:06
*/
public class UseRxJavaTest {
/**
* 基本步驟
* 1. 創(chuàng)建被觀察者Observable:產(chǎn)生數(shù)據(jù)流
* 2. 創(chuàng)建觀察者Observer:消費(fèi)數(shù)據(jù)
* 3. 訂閱Subscribe:連接被觀察者和觀察者
*/
@Test
public void baseUseTest() {
// 1.創(chuàng)建observable
@NonNull final Observable
@Override
public void subscribe(@NonNull ObservableEmitter
System.out.println("發(fā)送數(shù)據(jù):薯?xiàng)l");
emitter.onNext("薯?xiàng)l");
emitter.onComplete();
}
});
// 2.創(chuàng)建observer
final Observer
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("訂閱");
}
@Override
public void onNext(String o) {
System.out.println("接收數(shù)據(jù):" + o);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
System.out.println("數(shù)據(jù)接收完畢");
}
};
// 3.訂閱subscribe
observable.subscribe(observer);
}
/**
* Observable鏈?zhǔn)秸{(diào)用
* 可以通過(guò)鏈?zhǔn)秸{(diào)用一步完成上述編碼
*/
@Test
public void chainCallTest() {
Observable.create((ObservableOnSubscribe
System.out.println("發(fā)送數(shù)據(jù):薯?xiàng)l");
emitter.onNext("薯?xiàng)l");
emitter.onComplete();
})
// 訂閱時(shí)響應(yīng)
.doOnSubscribe(d -> System.out.println("訂閱"))
// next事件響應(yīng)
.doOnNext(o -> System.out.println("接收數(shù)據(jù):" + o))
// complete事件響應(yīng)
.doOnComplete(() -> System.out.println("數(shù)據(jù)接收完畢"))
// 訂閱
.subscribe();
}
/**
* 使用支持背壓的{@link Flowable}替代{@link Observable}
* 注:{@link Flowable}的使用幾乎和{@link Observable}相差無(wú)幾
* 只是{@link Flowable}支持了背壓{@link BackpressureStrategy}
*/
@Test
public void flowableTest() {
Flowable.create((FlowableOnSubscribe
// 創(chuàng)建被觀察者
System.out.println("發(fā)送數(shù)據(jù):薯?xiàng)l");
emitter.onNext("薯?xiàng)l");
emitter.onComplete();
// 背壓的模式
}, BackpressureStrategy.BUFFER
)
// 訂閱時(shí)響應(yīng)
.doOnSubscribe(d -> System.out.println("訂閱"))
// next事件響應(yīng)
.doOnNext(o -> System.out.println("接收數(shù)據(jù):" + o))
// complete事件響應(yīng)
.doOnComplete(() -> System.out.println("數(shù)據(jù)接收完畢"))
// 訂閱
.subscribe();
}
}
以上通過(guò)三種方式對(duì)RxJ的基本步驟作了編碼,后文所有例子將使用支持背壓、鏈?zhǔn)秸{(diào)用的Flowable進(jìn)行編碼。
操作符介紹
RxJ的大部分運(yùn)算符對(duì) Observable 進(jìn)行操作并返回一個(gè) Observable,這使得RxJ可以進(jìn)行鏈?zhǔn)秸{(diào)用,每次運(yùn)算符都會(huì)修改前一個(gè)運(yùn)算符的運(yùn)算產(chǎn)生的 Observable。 下面將分類介紹RxJ的核心操作符——操作符比較多、且重載很多。
創(chuàng)建類操作符
產(chǎn)生新 Observable 的操作符
import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Flowable;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* RxJava創(chuàng)建的操作符
* 作用:產(chǎn)生新 Observable 的操作符
* {@see https://github.com/ReactiveX/RxJava/blob/3.x/docs/Creating-Observables.md}
*
* @author yaoji
* @date 2022/7/26 0:12
*/
@FixMethodOrder(MethodSorters.JVM)
public class CreateOperatorTest {
/**
* create操作符 最基本的創(chuàng)建 {@link io.reactivex.rxjava3.core.Observable} 的方式
*/
@Test
public void createTest() {
System.out.println("\ncreate()操作符測(cè)試結(jié)果:");
Flowable.create(emitter -> {
emitter.onNext("薯?xiàng)l");
emitter.onNext("雞翅");
}, BackpressureStrategy.BUFFER).subscribe(System.out::println).dispose();
// 輸出 薯?xiàng)l 雞翅
}
/**
* just操作符
*/
@Test
public void justTest() {
System.out.println("\njust()操作符測(cè)試結(jié)果:");
// 指定發(fā)射數(shù)據(jù)
Flowable.just("薯?xiàng)l", "雞翅").subscribe(System.out::println).dispose();
// 輸出 薯?xiàng)l 雞翅
}
/**
* fromArray()操作符 指定發(fā)射數(shù)據(jù)為數(shù)組
*/
@Test
public void fromArrayTest() {
System.out.println("\nfromArray()操作符測(cè)試結(jié)果:");
Flowable.fromArray(new String[]{"薯?xiàng)l", "雞翅"}).subscribe(System.out::println).dispose();
// 輸出 薯?xiàng)l 雞翅
}
/**
* fromIterable()操作符 指定發(fā)射數(shù)據(jù)為迭代器
*/
@Test
public void fromIterableTest() {
System.out.println("\nfromIterable()操作符測(cè)試結(jié)果:");
List
foods.add("薯?xiàng)l");
foods.add("雞翅");
Flowable.fromIterable(foods).subscribe(System.out::println).dispose();
// 輸出 薯?xiàng)l 雞翅
}
/**
* interval()操作符 按一定時(shí)間間隔發(fā)射數(shù)據(jù)(異步)
* 該操作為
* 異步
*/
@Test
public void intervalTest() throws InterruptedException {
System.out.println("\ninterval()操作符測(cè)試結(jié)果:");
// 每間隔1s發(fā)射一條數(shù)據(jù)(數(shù)據(jù)從0開(kāi)始,每次遞增1)
Flowable.interval(1, TimeUnit.SECONDS).subscribe(data ->
System.out.println("timestamp:" + System.currentTimeMillis() + " data: " + data));
// 異步操作 需要阻塞線程
Thread.sleep(3 * 1000);
// 輸出
// timestamp:1658942322488 data: 0
// timestamp:1658942323482 data: 1
// timestamp:1658942324487 data: 2
}
/**
* timer()操作符 延遲一定時(shí)間后發(fā)射數(shù)據(jù)(0L)
* 該操作為異步
*/
@Test
public void timerTest() throws InterruptedException {
System.out.println("\ntimer()操作符測(cè)試結(jié)果:");
final long start = System.currentTimeMillis();
// 延遲1800ms發(fā)射數(shù)據(jù)(數(shù)據(jù)為0L)
Flowable.timer(1800, TimeUnit.MILLISECONDS).subscribe(data -> {
System.out.println("延遲: " + (System.currentTimeMillis() - start) + "ms");
System.out.println("接收數(shù)據(jù):" + data);
});
Thread.sleep(3 * 1000);
// 輸出 延遲: 1981ms 接收數(shù)據(jù):0
}
/**
* range()操作符 指定發(fā)射序列整數(shù)數(shù)據(jù)
*/
@Test
public void rangeTest() {
System.out.println("\nrange()操作符測(cè)試結(jié)果:");
// 發(fā)射數(shù)據(jù)從20開(kāi)始,共10個(gè)
Flowable.range(20, 3).subscribe(System.out::println).dispose();
// 輸出 20 21 22
}
}
轉(zhuǎn)換類操作符
轉(zhuǎn)換 Observable 發(fā)出的事件(數(shù)據(jù))的操作符
import io.reactivex.=rxjava3.core.Flowable;
import io.reactivex.rxjava3.functions.Function;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.reactivestreams.Publisher;
/**
* RxJava轉(zhuǎn)換的操作符
* 作用:轉(zhuǎn)換 Observable 發(fā)出的事件(數(shù)據(jù))的操作符
* {@see https://github.com/ReactiveX/RxJava/blob/3.x/docs/Transforming-Observables.md}
*
* @author yaoji
* @date 2022/7/26 0:42
*/
@FixMethodOrder(MethodSorters.JVM)
public class TransformOperatorTest {
/**
* map()操作符 對(duì)發(fā)射的數(shù)據(jù)進(jìn)行函數(shù)式轉(zhuǎn)換 產(chǎn)生新的 Observable
*/
@Test
public void mapTest() {
System.out.println("\nmap()操作符測(cè)試結(jié)果:");
Flowable.just("薯?xiàng)l", "雞翅")
// 每條數(shù)據(jù)單獨(dú)轉(zhuǎn)換 (各種食物調(diào)味)
.map(eachFood -> {
// 通過(guò)map對(duì)數(shù)據(jù)進(jìn)行函數(shù)轉(zhuǎn)換(雞翅要特辣的)
return "雞翅".equals(eachFood) ? eachFood + "(特辣)" : eachFood + "(常規(guī))";
})
.subscribe(System.out::println).dispose();
// 輸出 薯?xiàng)l(常規(guī)) 雞翅(特辣)
}
/**
* flatMap()操作符 對(duì)每條數(shù)據(jù)都轉(zhuǎn)換為 Observable
* flatMap() 和 map() 的區(qū)別是:
* map() 只對(duì)數(shù)據(jù)轉(zhuǎn)換,最終每條數(shù)據(jù)對(duì)應(yīng)的 Observable 只有數(shù)據(jù)變化
* flatMap() 對(duì) Observable 改變,數(shù)據(jù)可能被去除或增加
*/
@Test
public void flatMapTest() {
System.out.println("\nflatMap()操作符測(cè)試結(jié)果:");
Flowable.just("薯?xiàng)l", "雞翅")
// 每條數(shù)據(jù)單獨(dú)轉(zhuǎn)換為 Observable (點(diǎn)了雞翅可以贈(zèng)送飲料——觀察者接收的食物量改變)
.flatMap(
(Function
// 產(chǎn)生新的 Observable (雞翅送冰紅茶)
return "雞翅".equals(s) ? Flowable.just(s, "冰紅茶") : Flowable.just(s);
}
)
// 觀察者接收到的數(shù)據(jù)為"薯?xiàng)l", "雞翅", "冰紅茶"
.subscribe(System.out::println).dispose();
// 輸出 薯?xiàng)l 雞翅 冰紅茶
}
/**
* buffer()操作符 將發(fā)射的數(shù)據(jù)放入集合,每次發(fā)射集合替代發(fā)射單條數(shù)據(jù)
*/
@Test
public void bufferTest() {
System.out.println("\nbuffer()操作符測(cè)試結(jié)果:");
Flowable.just("奶茶", "鹵蛋", "漢堡", "炸雞", "雞湯", "牛排")
// 兩條數(shù)據(jù)分到一起發(fā)射 (每次上菜 左右手都端 節(jié)省時(shí)間)
.buffer(2)
.subscribe(System.out::println).dispose();
// 輸出 [奶茶, 鹵蛋] [漢堡, 炸雞] [雞湯, 牛排]
}
/**
* window()操作符 按 Observable 集合發(fā)射
* window()操作符和buffer()的區(qū)別在于:
* buffer()是將數(shù)據(jù)作為集合發(fā)射
* window()是將 Observable 集合一次發(fā)射
*/
@Test
public void windowTest() {
System.out.println("\nwindow()操作符測(cè)試結(jié)果:");
Flowable.range(1, 4)
.window(2)
.subscribe(c -> {
// 接收到的是 Observable
System.out.println(c);
// "子訂閱" 輸出
c.subscribe(System.out::println);
}
).dispose();
// 輸出
// io.reactivex.rxjava3.internal.operators.flowable.FlowableWindowSubscribeIntercept@212bf671
// 1
// 2
// io.reactivex.rxjava3.internal.operators.flowable.FlowableWindowSubscribeIntercept@16aa0a0a
// 3
// 4
}
/**
* groupBy()操作符 對(duì) Observable 發(fā)射的數(shù)據(jù)按Key分組,每個(gè) Observable 發(fā)射一組數(shù)據(jù)
*/
@Test
public void groupByTest() {
System.out.println("\ngroupBy()操作符測(cè)試結(jié)果:");
Flowable.just("雞頭", "雞你太美", "漢堡", "奶茶")
.groupBy((Function
if (food.contains("雞")) {
return "雞";
}
return "其它";
})
.subscribe(ob -> {
Object key = ob.getKey();
System.out.println("分組:" + key);
// "子訂閱"
ob.subscribe(System.out::println);
}).dispose();
// 輸出
// 分組:雞
// 雞頭
// 雞你太美
// 分組:其它
// 漢堡
// 奶茶
}
}
過(guò)濾類操作符
對(duì) Observable 發(fā)射的數(shù)據(jù)進(jìn)行過(guò)濾和選擇的操作符
import io.reactivex.rxjava3.core.Flowable;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import java.util.concurrent.TimeUnit;
/**
* RxJava過(guò)濾的操作符
* 作用:對(duì) Observable 發(fā)射的數(shù)據(jù)進(jìn)行過(guò)濾和選擇的操作符
* {@see https://github.com/ReactiveX/RxJava/blob/3.x/docs/Filtering-Observables.md}
* 注:這里的過(guò)濾和Java 8的{@link java.util.stream.Stream}有類似效果,都是將發(fā)射的數(shù)據(jù)按一定條件過(guò)濾
* 過(guò)濾操作符很多,大部分的實(shí)際用處可能不大,且使用比較簡(jiǎn)單,作個(gè)簡(jiǎn)單了解即可
*
* @author yaoji
* @date 2022/7/27 1:22
*/
@FixMethodOrder(MethodSorters.JVM)
public class FilterOperatorTest {
/**
* filter()操作符 對(duì) Observable 發(fā)射的數(shù)據(jù)進(jìn)行過(guò)濾,滿足條件的才被 Observer 接收
*/
@Test
public void filterTest() {
System.out.println("\nfilter()操作符測(cè)試結(jié)果:");
Flowable.range(1, 6)
// 過(guò)濾 剩下偶數(shù)
.filter(integer -> integer % 2 == 0)
.subscribe(System.out::println).dispose();
// 輸出 2 4 6
}
/**
* distinct()操作符 對(duì) Observable 發(fā)射的數(shù)據(jù)去重
*/
@Test
public void distinctTest() {
System.out.println("\ndistinct()操作符測(cè)試結(jié)果:");
Flowable.just(1, 2, 3, 1, 2, 3, 4, 5)
// 去重 1 2 3重復(fù) 剩下一組數(shù)據(jù)
.distinct()
.subscribe(System.out::println).dispose();
// 輸出1 2 3 4 5
}
/**
* first()操作符 僅發(fā)射 Observable 的第一條數(shù)據(jù)
*/
@Test
public void firstTest() {
System.out.println("\nfirst()操作符測(cè)試結(jié)果:");
Flowable.range(1, 10)
// 取第一條數(shù)據(jù) 沒(méi)有發(fā)射數(shù)據(jù)則默認(rèn)發(fā)射-1
.first(-1)
// 不想要默認(rèn)發(fā)射數(shù)據(jù) 則使用firstElement替代
//.firstElement()
.subscribe(System.out::println).dispose();
// 輸出 1
}
/**
* last()操作符 僅發(fā)射 Observable 的最后一條數(shù)據(jù)
*/
@Test
public void lastTest() {
System.out.println("\nlast()操作符測(cè)試結(jié)果:");
Flowable.range(1, 10)
// 取最后一條數(shù)據(jù) 沒(méi)有發(fā)射數(shù)據(jù)則默認(rèn)發(fā)射-1
.last(-1)
// 不想要默認(rèn)發(fā)射數(shù)據(jù) 則使用lastElement替代
//.lastElement()
.subscribe(System.out::println).dispose();
// 輸出 10
}
/**
* ofType()操作符 按數(shù)據(jù)的類型過(guò)濾
*/
@Test
public void ofTypeTest() {
System.out.println("\nofType()操作符測(cè)試結(jié)果:");
Flowable.just(1L, "薯?xiàng)l", Math.PI, 0.618f)
// 按Double過(guò)濾 只有π會(huì)被發(fā)射出去
.ofType(Double.class)
.subscribe(System.out::println).dispose();
// 輸出 3.141592653589793
}
/**
* sample()操作符 在周期性的時(shí)間間隔內(nèi),發(fā)射最后該段間隔的最后一條數(shù)據(jù)
*/
@Test
public void sampleTest() {
System.out.println("\nsample()操作符測(cè)試結(jié)果:");
Flowable.range(0, 99999999)
// 共發(fā)射 99999999 個(gè)數(shù)據(jù),每200ms間隔內(nèi)的數(shù)據(jù)取最后一條發(fā)射
.sample(200, TimeUnit.MILLISECONDS)
.subscribe(System.out::println).dispose();
// 輸出 47181190 94194001 輸出結(jié)果和運(yùn)行時(shí)有關(guān)
}
/**
* skip()操作符 過(guò)濾掉前n項(xiàng)數(shù)據(jù)后發(fā)射
*/
@Test
public void skipTest() {
System.out.println("\nskip()操作符測(cè)試結(jié)果:");
Flowable.range(1, 10)
// 1-10 過(guò)濾掉前5個(gè)
.skip(5)
.subscribe(System.out::println).dispose();
// 輸出 6 7 8 9 10
}
/**
* take()操作符 只發(fā)射前n項(xiàng)數(shù)據(jù)
*/
@Test
public void takeTest() {
System.out.println("\ntake()操作符測(cè)試結(jié)果:");
Flowable.range(1, 10)
// 1-10 只發(fā)射前5個(gè)
.take(5)
.subscribe(System.out::println).dispose();
// 輸出 1 2 3 4 5
}
}
組合類操作符
組合多個(gè) Observable 為一個(gè) Observable 的操作符
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.functions.BiFunction;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
/**
* RxJava組合操作符
* 作用:組合多個(gè) Observable 為一個(gè) Observable
* {@see https://github.com/ReactiveX/RxJava/blob/3.x/docs/Combining-Observables.md}
*
* @author yaoji
* @date 2022/7/27 22:32
*/
@FixMethodOrder(MethodSorters.JVM)
public class CombineOperatorTest {
/**
* merge()操作符 組合多個(gè) Observable 到一個(gè)中
*/
@Test
public void mergeTest() {
Flowable.just(1, 2)
// 合并第一個(gè) Observable
.mergeWith(Flowable.just(3, 4))
// 合并第二個(gè) Observable
.mergeWith(Flowable.just(5, 6))
.subscribe(System.out::println).dispose();
// 輸出 1, 2, 3, 4, 5, 6
}
/**
* startWith()操作符 Observable 發(fā)射數(shù)據(jù)前先發(fā)射指定的數(shù)據(jù)序列
*/
@Test
public void startWithTest() {
Flowable.just(1, 2)
.startWith(Flowable.just(3, 4))
.startWith(Flowable.just(5, 6))
.subscribe(System.out::println).dispose();
// 輸出 5, 6, 3, 4, 1, 2
}
/**
* count()操作符 統(tǒng)計(jì) Observable 發(fā)射的數(shù)據(jù)數(shù)量,并將統(tǒng)計(jì)結(jié)果發(fā)射給觀察者
*/
@Test
public void countTest() {
// 發(fā)射從1開(kāi)始的10個(gè)數(shù)
Flowable.range(1, 10)
.count()
.subscribe(System.out::println).dispose();
// 輸出 10
}
/**
* zip()操作符 通過(guò)函數(shù)將多個(gè) Observable 發(fā)射的數(shù)據(jù)作函數(shù)處理,并將函數(shù)處理的結(jié)果發(fā)射給觀察者
*/
@Test
public void zipTest() {
Flowable.just(1.1, 2.2, 3.3, 4.4)
// 第一次zip 對(duì)4組發(fā)射數(shù)據(jù)排列作相乘處理,得 2.2, 8.8, 6.6(4.4的數(shù)據(jù)自動(dòng)丟棄)
.zipWith(Flowable.just(2, 4, 2), (last, current) -> last * current)
// 第二次zip 對(duì)三組發(fā)射數(shù)據(jù)排列作相加處理,得 3.2, 10.8(6.6的數(shù)據(jù)自動(dòng)丟棄)
.zipWith(Flowable.just(1, 2), (BiFunction
.subscribe(System.out::println).dispose();
// 輸出 3.2, 10.8
}
/**
* combineLatest()操作符 多個(gè) Observable 進(jìn)行合并,最后一個(gè) Observable 發(fā)射的每一條數(shù)據(jù)與 前面所有的 Observable 發(fā)射的最后一條數(shù)據(jù)進(jìn)行函數(shù)轉(zhuǎn)換,最后將結(jié)果發(fā)射給觀察者
*/
@Test
public void combineLatestTest() {
Flowable.combineLatest(
// 發(fā)射 1-10 該 Observable 只有最后一條數(shù)據(jù)10參與組合
Flowable.range(1, 10),
// 發(fā)射 1-5 該 Observable 的每一條數(shù)據(jù)參與組合
Flowable.range(1, 5),
(first, second) -> "first " + first + ", second " + second
).subscribe(System.out::println).dispose();
// 輸出
// first 10, second 1
// first 10, second 2
// first 10, second 3
// first 10, second 4
// first 10, second 5
Flowable.combineLatest(
// 發(fā)射 1-10 該 Observable 只有最后一條數(shù)據(jù)10參與組合
Flowable.range(1, 10),
// 發(fā)射 1-5 該 Observable 只有最后一條數(shù)據(jù)5參與組合
Flowable.range(1, 5),
// 發(fā)射 1-5 該 Observable 的每一條數(shù)據(jù)參與組合
Flowable.range(1, 5),
(first, second, third) -> "first " + first + ", second " + second + ", third" + third
).subscribe(System.out::println).dispose();
// 輸出
// first 10, second 5, third1
// first 10, second 5, third2
// first 10, second 5, third3
// first 10, second 5, third4
// first 10, second 5, third5
// 上述組合的 Observable 如果都換成 interval()-異步創(chuàng)建,則產(chǎn)生的效果會(huì)更能體現(xiàn)
}
}
功能操作符
實(shí)用性比較強(qiáng)的操作符
import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.FlowableOnSubscribe;
import io.reactivex.rxjava3.schedulers.Schedulers;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import java.util.concurrent.TimeUnit;
/**
* RxJava效能的操作符
* {@see https://github.com/ReactiveX/RxJava/blob/3.x/docs/Observable-Utility-Operators.md}
*
* @author yaoji
* @date 2022/7/29 0:19
*/
@FixMethodOrder(MethodSorters.JVM)
public class UtilityOperatorTest {
/**
* subscribe()操作符 訂閱:連接被觀察者和觀察者
* subscribe()前為被觀察者,后為觀察者
*/
@Test
public void subscribeTest() {
Flowable.just(1, 2).subscribe(System.out::println).dispose();
// 輸出 1, 2
}
/**
* timestamp()操作符 將時(shí)間戳附加到 Observable 發(fā)出的每一項(xiàng)上
*/
@Test
public void timestampTest() throws InterruptedException {
Flowable.interval(100, TimeUnit.MILLISECONDS)
// 發(fā)射的數(shù)據(jù)會(huì)帶上時(shí)間戳信息
.timestamp()
.subscribe(System.out::println);
Thread.sleep(500);
// 輸出
/*
Timed[time=1659025449727, unit=MILLISECONDS, value=0]
Timed[time=1659025449822, unit=MILLISECONDS, value=1]
Timed[time=1659025449916, unit=MILLISECONDS, value=2]
Timed[time=1659025450027, unit=MILLISECONDS, value=3]
Timed[time=1659025450121, unit=MILLISECONDS, value=4]
*/
}
/**
* doOnEach()操作符 Observable 每次發(fā)射一次事件前都會(huì)調(diào)用(包括onNext()、onError()、onComplete())
*/
@Test
public void doOnEachTest() {
Flowable.just(1, 2)
.doOnEach(c -> {
if (c.isOnNext()) {
// Observable 發(fā)射 onNext()事件,共兩個(gè)數(shù)據(jù) 所以會(huì)調(diào)用兩次
System.out.println("Observable發(fā)射事件onNext(),發(fā)射數(shù)據(jù):" + c.getValue());
}
if (c.isOnComplete()) {
// Observable 發(fā)射 onComplete()事件,數(shù)據(jù)發(fā)射完后 Observable 會(huì)默認(rèn)調(diào)用onComplete(), 所以會(huì)執(zhí)行一次
System.out.println("Observable發(fā)射事件onComplete()");
}
})
.subscribe(onNext -> System.out.println("Observer接收數(shù)據(jù):" + onNext))
.dispose();
// 輸出
/*
Observable發(fā)射事件onNext(),發(fā)射數(shù)據(jù):1
Observer接收數(shù)據(jù):1
Observable發(fā)射事件onNext(),發(fā)射數(shù)據(jù):2
Observer接收數(shù)據(jù):2
Observable發(fā)射事件onComplete()
*/
}
/**
* doOnNext()操作符 在觀察者 Observer 執(zhí)行onNext()前執(zhí)行
* doAfterNext()操作符 在觀察者 Observer 執(zhí)行onNext()后執(zhí)行
*/
@Test
public void doOnNextTest() {
Flowable.just("onNext-1", "onNext-2")
.doOnNext(c -> System.out.print("onNext()前執(zhí)行\(zhòng)t"))
.doAfterNext(c -> System.out.println("\tonNext()后執(zhí)行"))
// 注:subscribe()的內(nèi)執(zhí)行的就是onNext(),或者說(shuō)監(jiān)聽(tīng)到被觀察者發(fā)射onNext()事件
.subscribe(System.out::print)
.dispose();
// 輸出
// onNext()前執(zhí)行 onNext-1 onNext()后執(zhí)行
// onNext()前執(zhí)行 onNext-2 onNext()后執(zhí)行
}
/**
* doOnComplete()操作符 在所有事件發(fā)射完成后執(zhí)行
*/
@Test
public void doOnCompleteTest() {
Flowable.just(1, 2)
// Observable 將所有數(shù)據(jù)發(fā)射完畢后執(zhí)行
.doOnComplete(() -> System.out.println("onComplete()后執(zhí)行"))
.subscribe(System.out::println)
.dispose();
}
/**
* doFinally()操作符 在 Observable 終止或釋放時(shí)執(zhí)行
*/
@Test
public void doFinallyTest() {
/*
* 每隔1秒發(fā)射一條數(shù)據(jù)
* 由于interval是異步執(zhí)行,所以 Observable 根本來(lái)不及發(fā)射出數(shù)據(jù),主線程就會(huì)推出,導(dǎo)致無(wú)法輸出信息
* 但是doFinally()可以保證在釋放時(shí)執(zhí)行
* */
Flowable.interval(1, TimeUnit.SECONDS)
// Observable 終止或釋放時(shí)執(zhí)行
.doFinally(() -> System.out.println("Observable釋放時(shí)執(zhí)行"))
// 這里觀察者是無(wú)法接收到數(shù)據(jù)的
.subscribe(onNext -> System.out.println("觀察者接收到數(shù)據(jù)"))
// 手動(dòng)直接釋放,Observable來(lái)不及發(fā)射數(shù)據(jù)就會(huì)被釋放
.dispose();
// 輸出 Observable釋放時(shí)執(zhí)行
}
/**
* doOnError()操作符 在觀察者 Observer 執(zhí)行onError()前執(zhí)行
*/
@Test
public void doOnError() {
// Observable 發(fā)射onError()事件
Flowable.error(new RuntimeException("發(fā)射onError()事件"))
// 在 Observer 接收onError()前執(zhí)行
.doOnError(error -> System.out.println(error.getClass().getName() + error.getMessage()))
.subscribe(System.out::println)
.dispose();
// 輸出 java.lang.RuntimeException發(fā)射onError()事件
}
/**
* doOnTerminate()操作符 Observable 執(zhí)行終止時(shí)執(zhí)行(可能發(fā)射完成、可能發(fā)射異常)
*/
@Test
public void doOnTerminatingTest() {
Flowable.just(1, 2)
// 合并onNext()事件并發(fā)射
.mergeWith(Flowable.just(3))
// 合并onError()事件并發(fā)射——模擬發(fā)射異常的場(chǎng)景
.mergeWith(Flowable.error(new RuntimeException("error")))
// 合并onNext()事件并發(fā)射
.mergeWith(Flowable.just(4))
.doOnTerminate(() -> System.out.println("Observable終止時(shí)執(zhí)行"))
.subscribe(System.out::println)
.dispose();
// 輸出 從輸出結(jié)果看出 在發(fā)射異常后直接執(zhí)行了doOnTerminate()
/*
1
2
3
Observable終止時(shí)執(zhí)行
*/
}
/**
* observeOn()操作符 指定觀察者接收事件的調(diào)度器/線程
*/
@Test
public void observeOnTest() {
Flowable.just(1, 2)
// 指定觀察者接收事件的線程
.observeOn(Schedulers.newThread())
.subscribe(onNext -> System.out.println("observe線程:" + Thread.currentThread().getName() + ", 接收數(shù)據(jù):" + onNext));
// 輸出
// observe線程:RxNewThreadScheduler-1, 接收數(shù)據(jù):1
// observe線程:RxNewThreadScheduler-1, 接收數(shù)據(jù):2
}
/**
* subscribeOn()操作符 指定被觀察者發(fā)射事件的調(diào)度器/線程
*/
@Test
public void subscribeOnTest() throws InterruptedException {
Flowable.create((FlowableOnSubscribe
System.out.println("observable線程:" + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onNext(2);
}, BackpressureStrategy.BUFFER)
// 指定觀察者接收事件的線程
.subscribeOn(Schedulers.newThread())
.subscribe(System.out::println);
Thread.sleep(100);
// 輸出 observable線程:RxNewThreadScheduler-1
}
/**
* retry()操作符 出現(xiàn)錯(cuò)誤時(shí)被觀察者重新發(fā)射數(shù)據(jù)
*/
@Test
public void retryTest() {
Flowable.just(1, 2)
// 手動(dòng)發(fā)射error模擬異常
.mergeWith(Flowable.error(new RuntimeException()))
.mergeWith(Flowable.just(3, 4))
.retry(1)
.subscribe(System.out::println)
.dispose();
// 輸出 1 2 1 2,經(jīng)過(guò)合并后,發(fā)射的事件應(yīng)該是1,2,3,4,但是失敗后重試了一次
}
}
線程調(diào)度相關(guān)
同步、異步訂閱
剛接觸RxJ的使用,鑒于RxJ定義就是處理異步的庫(kù),所以很容易讓人產(chǎn)生這樣一種錯(cuò)覺(jué):RxJ就是異步的。
其實(shí)并不是:RxJ的訂閱可以分為同步訂閱和異步訂閱。
同步訂閱是指被觀察者與觀察者在同一個(gè)線程運(yùn)行,這里被觀察者發(fā)射的數(shù)據(jù)只有被觀察者接收處理后,才能繼續(xù)發(fā)射下一條數(shù)據(jù)。異步訂閱是指被觀察者與觀察者在不同線程運(yùn)行,這里被觀察者發(fā)射的數(shù)據(jù)全部存到緩存區(qū),觀察者則只在緩存區(qū)拿數(shù)據(jù)。
異步訂閱需要使用RxJ的調(diào)度器,以實(shí)現(xiàn)異步、線程切換。
調(diào)度器
RxJ的 Observable 和 Observer 分別處理了發(fā)射事件和接收事件的職責(zé),其中通過(guò)操作符subscribeOn()、observeOn()可分別定義各自處理程序的線程,但RxJ操作符不直接與Thread或ExecutorService一起使用,而是提供了幾個(gè)通過(guò)Schedulers類訪問(wèn)的標(biāo)準(zhǔn)調(diào)度器。
Schedulers.io():創(chuàng)建一個(gè)可復(fù)用的工作線程,線程不夠新建。主要用于I/O類型的操作,如文件處理、網(wǎng)絡(luò)請(qǐng)求等耗時(shí)的操作。Schedulers.computation():創(chuàng)建固定數(shù)量(cpu核數(shù))的專用線程運(yùn)行計(jì)算密集型工作,是大多數(shù)異步操作符(如interval(),timer())的默認(rèn)調(diào)度器。因線程數(shù)有限,盡量不用I/O耗時(shí)操作,常用于快速計(jì)算類型的操作。Schedulers.single():以順序和FIFO的方式在單個(gè)線程上運(yùn)行。Schedulers.newThread():每次新建一個(gè)線程運(yùn)行。Schedulers.trampoline():將當(dāng)前線程執(zhí)行加入主線程。主要用于多個(gè) Observable 運(yùn)行時(shí),阻塞并按Observable 順序調(diào)用。
背壓策略
當(dāng)上下游的流操作處于不同的線程(上文的調(diào)度器)——即異步訂閱時(shí),如果上游發(fā)射數(shù)據(jù)的速度大于下游處理數(shù)據(jù)的速度,那么就會(huì)造成積壓,這些數(shù)據(jù)存放在一個(gè)異步緩存池(RxJ默認(rèn)緩沖區(qū)大小為128)中,如果緩存池中的數(shù)據(jù)不能及時(shí)處理,最后就會(huì)造成內(nèi)存溢出。
處理上述問(wèn)題的原理是:控制數(shù)據(jù)發(fā)射、數(shù)據(jù)接收的速度。
RxJ在 BackpressureStrategy 類中提供了多種背壓策略,用于處理上述問(wèn)題
背壓策略處理方式BackpressureStrategy.ERROR拋出異常 MissingBackpressureException ,通過(guò) onError() 事件發(fā)射異常事件BackpressureStrategy.MISSING給出友好提示,通過(guò) onError 事件發(fā)射,和ERROR策略似乎無(wú)明顯區(qū)別BackpressureStrategy.BUFFER將緩存區(qū)大小設(shè)置成無(wú)限大(容易造成內(nèi)存溢出、系統(tǒng)崩潰)BackpressureStrategy.DROP超過(guò)緩沖區(qū)大小的數(shù)據(jù)全部丟棄BackpressureStrategy.LATEST超出緩存區(qū)數(shù)據(jù)只保留最新的
BackpressureStrategy.ERROR
策略:拋出異常 MissingBackpressureException ,通過(guò) onError() 事件發(fā)射異常事件
@Test
public void errorTest() {
Flowable.create((FlowableOnSubscribe
int count = 150;
// 發(fā)射 150 個(gè)數(shù)據(jù) 緩沖池接收數(shù)據(jù)大小為 128
for (int i = 1; i <= count; i++) {
emitter.onNext(i);
}
emitter.onComplete();
// 指定背壓策略為 ERROR
}, BackpressureStrategy.ERROR)
// Observer 在io線程運(yùn)行
.observeOn(Schedulers.io())
// Observable 在主線程運(yùn)行 為了阻塞主線程方便查看輸出
.subscribeOn(Schedulers.trampoline())
// 在發(fā)射 onError()事件執(zhí)行
.doOnError(e -> System.out.println("發(fā)生錯(cuò)誤:" + e.getMessage()))
.subscribe(System.out::println)
.dispose();
// 輸出 可以看出 此時(shí)數(shù)據(jù)發(fā)射量大于緩沖時(shí) Observable 會(huì)拋出異常并發(fā)射onError()事件
/*
1
2
...
發(fā)生錯(cuò)誤:create: could not emit value due to lack of requests
*/
}
BackpressureStrategy.MISSING
策略:給出友好提示,通過(guò) onError 事件發(fā)射,和ERROR策略似乎無(wú)明顯區(qū)別
/**
* BackpressureStrategy.MISSING
* 給出友好提示,通過(guò) `onError` 事件發(fā)射,和ERROR策略似乎無(wú)明顯區(qū)別
*/
@Test
public void missingTest() {
Flowable.create((FlowableOnSubscribe
int count = 150;
// 發(fā)射 150 個(gè)數(shù)據(jù) 緩沖池接收數(shù)據(jù)大小為 128
for (int i = 1; i <= count; i++) {
emitter.onNext(i);
}
emitter.onComplete();
// 指定背壓策略為錯(cuò)誤
}, BackpressureStrategy.MISSING)
// Observer 在io線程運(yùn)行
.observeOn(Schedulers.io())
// Observable 在主線程運(yùn)行 為了阻塞主線程方便查看輸出
.subscribeOn(Schedulers.trampoline())
// 在發(fā)射 onError()事件執(zhí)行
.doOnError(e -> System.out.println("發(fā)生錯(cuò)誤:" + e.getMessage()))
.subscribe(System.out::println)
.dispose();
// 輸出
/*
1
2
...
發(fā)生錯(cuò)誤:Queue is full?!
*/
}
BackpressureStrategy.BUFFER
策略:將緩存區(qū)大小設(shè)置成無(wú)限大
@Test
public void bufferTest() {
Flowable.create((FlowableOnSubscribe
int count = 150;
// 發(fā)射 150 個(gè)數(shù)據(jù) 緩沖池接收數(shù)據(jù)大小為 128
for (int i = 1; i <= count; i++) {
emitter.onNext(i);
}
emitter.onComplete();
// 指定背壓策略為 BUFFER
}, BackpressureStrategy.BUFFER)
// Observer 在io線程運(yùn)行
.observeOn(Schedulers.io())
// Observable 在主線程運(yùn)行 為了阻塞主線程方便查看輸出
.subscribeOn(Schedulers.trampoline())
// 在發(fā)射 onError()事件執(zhí)行
.doOnError(e -> System.out.println("發(fā)生錯(cuò)誤:" + e.getMessage()))
.subscribe(System.out::println)
.dispose();
// 輸出 1-150 無(wú)異常 注:有興趣可以將發(fā)射的數(shù)據(jù)改成引用對(duì)象,一直發(fā)射觀察內(nèi)存情況,看看會(huì)不會(huì)內(nèi)存溢出
}
BackpressureStrategy.DROP
策略:超過(guò)緩存區(qū)大小的數(shù)據(jù)全部丟棄
@Test
public void dropTest() {
Flowable.create((FlowableOnSubscribe
int count = 150;
// 發(fā)射 150 個(gè)數(shù)據(jù) 緩沖池接收數(shù)據(jù)大小為 128
for (int i = 1; i <= count; i++) {
emitter.onNext(i);
}
emitter.onComplete();
// 指定背壓策略為 DROP
}, BackpressureStrategy.DROP)
// Observer 在io線程運(yùn)行
.observeOn(Schedulers.io())
// Observable 在主線程運(yùn)行 為了阻塞主線程方便查看輸出
.subscribeOn(Schedulers.trampoline())
// 在發(fā)射 onError()事件執(zhí)行
.doOnError(e -> System.out.println("發(fā)生錯(cuò)誤:" + e.getMessage()))
//.onBackpressureDrop()
.subscribe(System.out::println);
// 輸出 1-128 丟棄了129-150
}
BackpressureStrategy.LATEST
策略:超出緩存區(qū)數(shù)據(jù)只保留最新的
@Test
public void latestTest() {
Flowable.create((FlowableOnSubscribe
int count = 150;
// 發(fā)射 150 個(gè)數(shù)據(jù) 緩沖池接收數(shù)據(jù)大小為 128
for (int i = 1; i <= count; i++) {
emitter.onNext(i);
}
emitter.onComplete();
// 指定背壓策略為 LATEST
}, BackpressureStrategy.LATEST)
// Observer 在io線程運(yùn)行
.observeOn(Schedulers.io())
// Observable 在主線程運(yùn)行 為了阻塞主線程方便查看輸出
.subscribeOn(Schedulers.trampoline())
// 在發(fā)射 onError()事件執(zhí)行
.doOnError(e -> System.out.println("發(fā)生錯(cuò)誤:" + e.getMessage()))
.subscribe(System.out::println);
// 輸出 1-128, 150 注:丟棄超過(guò)緩沖池大小的數(shù)據(jù),但保留最新的一個(gè)
}
并行處理
@Test
public void schedulerParallelTest() throws InterruptedException {
Flowable.fromArray(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.map(c -> {
System.out.println(Thread.currentThread().getName() + "發(fā)射: " + c);
// 作平方計(jì)算后發(fā)射
return c * c;
})
// 指定被觀察者發(fā)射數(shù)據(jù)的線程 線程的數(shù)量為cpu核數(shù)
.subscribeOn(Schedulers.computation())
.subscribe(next -> System.out.println(Thread.currentThread().getName() + "接收: " + next));
Thread.sleep(500);
/* 輸出如下
注:本測(cè)試計(jì)算機(jī)為8核cpu,被觀察發(fā)射數(shù)據(jù)的線程一直為同一個(gè),并不是并行
RxComputationThreadPool-1發(fā)射: 1
RxComputationThreadPool-1接收: 1
RxComputationThreadPool-1發(fā)射: 2
RxComputationThreadPool-1接收: 4
RxComputationThreadPool-1發(fā)射: 3
RxComputationThreadPool-1接收: 9
RxComputationThreadPool-1發(fā)射: 4
RxComputationThreadPool-1接收: 16
RxComputationThreadPool-1發(fā)射: 5
RxComputationThreadPool-1接收: 25
RxComputationThreadPool-1發(fā)射: 6
RxComputationThreadPool-1接收: 36
RxComputationThreadPool-1發(fā)射: 7
RxComputationThreadPool-1接收: 49
RxComputationThreadPool-1發(fā)射: 8
RxComputationThreadPool-1接收: 64
RxComputationThreadPool-1發(fā)射: 9
RxComputationThreadPool-1接收: 81
RxComputationThreadPool-1發(fā)射: 10
RxComputationThreadPool-1接收: 100
*/
}
初學(xué)時(shí)很容易將上述編碼理解為:只要指定了調(diào)度器(甚至不用指定調(diào)度器,只要發(fā)射了多條數(shù)據(jù)),RxJ就會(huì)將每條數(shù)據(jù)交給不同的線程處理。從輸出可以很明顯的看出,被觀察者和觀察者都在 RxComputationThreadPool-1 這個(gè)線程運(yùn)行,整個(gè)發(fā)射、接收數(shù)據(jù)過(guò)程確實(shí)是異步的,不過(guò)只是相對(duì)于主線程而言,其發(fā)射的一組數(shù)據(jù)流本質(zhì)還是串行執(zhí)行的。
@Test
public void flatMapParallel() throws InterruptedException {
// 發(fā)射1-10
Flowable.range(1, 10)
// flatMap合并單獨(dú)的數(shù)據(jù)流到“數(shù)據(jù)流集合”
.flatMap(v ->
// 每條數(shù)據(jù)都轉(zhuǎn)換為單獨(dú)的數(shù)據(jù)流
Flowable.just(v)
// 指定每條數(shù)據(jù)流的運(yùn)行調(diào)度器
.subscribeOn(Schedulers.computation())
.map(c -> {
System.out.println(Thread.currentThread().getName() + "發(fā)射: " + c);
return c * c;
})
)
.blockingSubscribe(next -> System.out.println(Thread.currentThread().getName() + "接收: " + next));
Thread.sleep(500);
/*輸出如下:可以看到 發(fā)射數(shù)據(jù)的線程已經(jīng)是多線程并行了(8核cpu)
RxComputationThreadPool-1發(fā)射: 1
RxComputationThreadPool-2發(fā)射: 2
RxComputationThreadPool-3發(fā)射: 3
RxComputationThreadPool-5發(fā)射: 5
RxComputationThreadPool-6發(fā)射: 6
RxComputationThreadPool-4發(fā)射: 4
RxComputationThreadPool-2發(fā)射: 8
RxComputationThreadPool-1發(fā)射: 7
RxComputationThreadPool-4發(fā)射: 10
main接收: 1
main接收: 9
main接收: 25
main接收: 36
main接收: 4
main接收: 64
main接收: 49
main接收: 16
RxComputationThreadPool-3發(fā)射: 9
main接收: 100
main接收: 81
*/
}
RxJ中的并行性意味著運(yùn)行獨(dú)立的流,并將它們的結(jié)果合并回單個(gè)流中。操作符 flatMap 將從1到10的每個(gè)數(shù)字映射到它自己的獨(dú)立的 Flowable 并合并流,每次發(fā)射的數(shù)據(jù)轉(zhuǎn)換為每次發(fā)射 Flowable,實(shí)現(xiàn)并行計(jì)算。
小結(jié)
RxJ通過(guò)各種操作符讓異步調(diào)用和線程切換變得比較簡(jiǎn)潔,但是付出的代價(jià)一旦引入RxJ那么所有看這份代碼的人都要去學(xué)習(xí),是否值得引入是值得商榷的。
參考鏈接
RxJava3.x Github官方文檔 Reactivex官方文檔 Reactive Programming 一種技術(shù),各自表述 RxJava2最全面、最詳細(xì)的講解(一) RxJava2最全面、最詳細(xì)的講解(二) 圖解RxJava2(一) 圖解RxJava2(二) 圖解RxJava2(三) RxJava現(xiàn)學(xué)現(xiàn)用下(應(yīng)用場(chǎng)景) RxJava操作符匯總 RxJava操作符(03-變換操作) RxJava3.x——背壓策略
柚子快報(bào)邀請(qǐng)碼778899分享:RxJava響應(yīng)式編程
參考閱讀
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點(diǎn)和立場(chǎng)。
轉(zhuǎn)載請(qǐng)注明,如有侵權(quán),聯(lián)系刪除。