柚子快報邀請碼778899分享:關于rxjava理解
柚子快報邀請碼778899分享:關于rxjava理解
簡單介紹
rxjava中有Flowable、Maybe、Observable、Single、Completable 5種使用途徑,其中Flowable實現(xiàn)reactivestreams接口,是jdk9標準接口,支持背壓、并發(fā)模式。 反應式編程特點就是在訂閱時才執(zhí)行具體業(yè)務代碼。
接口介紹
Publisher消息提供發(fā)布者,提供訂閱方法,發(fā)布者定義好后,消費者通過訂閱通知發(fā)布者可以準備消息了。
public interface Publisher
public void subscribe(Subscriber super T> s);
}
Subscriber消息訂閱者,提供4個方法
public interface Subscriber
/**
* 發(fā)布者可通過該方法,通知下游訂閱者收到訂閱了,可以通過Subscription 取消或者請求消息。
*/
public void onSubscribe(Subscription s);
/**
* 發(fā)布者被訂閱或被Subscription請求時,觸發(fā)訂閱者的onNext方法,發(fā)布消息。
*/
public void onNext(T t);
/**
* 執(zhí)行業(yè)務過程中發(fā)生異常,通知消費者,一般發(fā)生異常會中斷后續(xù)onNext流程
*/
public void onError(Throwable t);
/**
* 通知下游業(yè)務執(zhí)行完成
*/
public void onComplete();
}
Subscription提供reqeust和cancel 2個方法
public interface Subscription {
/**
* 讓上游發(fā)布消息
*/
public void request(long n);
/**
* 取消消息推送
*/
public void cancel();
}
Processor繼承Subscriber、Publisher。
Flowable遞歸調用
便于鏈式調用,所有Flowable相關的類繼承Flowable,實現(xiàn)subscribeActual,在Flowable中融合所有類的調度方法。
protected abstract void subscribeActual(@NonNull Subscriber<@NonNull ? super T> subscriber);
鏈式調用結構
左邊先調用的發(fā)布者在最里面,右邊訂閱者先調用的在最外面,因為先執(zhí)行構造定義,在通過訂閱串聯(lián)所有發(fā)布者。 Rx多數(shù)代碼的發(fā)布者和訂閱者在同一個類中實現(xiàn),發(fā)布者被訂閱(subscribe)時,通過包裹訂閱者接收上游的onSubscribe,可實現(xiàn)并發(fā)、緩存、重試、條件判斷等功能。這樣就構成,先鏈式調用的發(fā)布者,后被訂閱,先收到onSubscribe。
部分代碼理解
假設都是鏈式調用,先被調用的方法(發(fā)布者方法)代表前面,后被調用的方法代表后面。發(fā)布者稱為上游,訂閱者稱為下游。
線程切換
需要區(qū)分Flowable和ParallelFlowable,前者用于單線程,后者用于多線程,多線程和單線程有方法可以動態(tài)切換。
Flowable的subscribeOn和observeOn
subscribeOn 表示,首次被調用起到最上游的request請求和所有訂閱者的onNext方法都由該線程執(zhí)行。 構造函數(shù)的入?yún)ource為先鏈式調用的上游。 切換線程后執(zhí)行上游的訂閱,觸發(fā)上游調用onSubscribe、onNext等方法。如果上游不自動推送消息下來,在上游調用onSubscribe時,觸發(fā)上游的reqeuest,使上游推送消息到該類。 onNext直接轉發(fā)數(shù)據(jù)。
observeOn 表示,從被調用起后續(xù)所有的onNext方法都由該線程執(zhí)行。
總結 從上面可以看出,subscribeOn不論在哪里調用,可以作用于所有訂閱者,observeOn只能作用于后面的訂閱者。
ParallelFlowable的runOn 和subscribeOn類似,只是訂閱者變成多條。
多線程
Flowable使用方法parallel()切換到ParallelFlowable,而ParallelFlowable使用方法serialize()切換到Flowable。 其中parallel可以指定并行度,在runOn后可并發(fā)調用parallel的subscribe,serialize收集多個訂閱者的數(shù)據(jù)單線程推送給下游。
parallel()
serialize()
flatmap和map區(qū)別
flatmap用于獲取一堆數(shù)據(jù),類似一對多的關系,map用于數(shù)據(jù)轉換,類似從一對一的關系。
示例代碼
代碼中FlowableBufferedCreate根據(jù)FlowableCreate改寫的,把queue替換成blockingqueue,用于緩存滿的時候堵塞上游數(shù)據(jù)推送,避免堆積到內存里面。 ParallelListenableFuture用于當ListenableFuture調用完成后推送給下游。
// 1. 多線程查詢
ParallelFlowable
.parallel(2)
// 切換到查詢線程
.runOn(Schedulers.from(msgpushQueryTaskExecutor, true))
.flatMap((index) -> new FlowableBufferedCreate(emitter -> {
Integer querySize = queryThreadSize;
Integer queryIndex = index;
if (queryThreadSize == 1) {
querySize = null;
queryIndex = null;
}
// 調用查詢
msgPushBO.query(shardId, null, queryIndex, querySize, (target) -> {
emitter.onNext(target);
redissionThreadLockTask.access();
}, metricRegistry);
// 觸發(fā)完成
emitter.onComplete();
}, 256)
);
Flowable
// 并行轉單行且發(fā)生異常后,延遲發(fā)送錯誤信息,一個分支錯誤,不影響其他分支繼續(xù)查詢
.sequentialDelayError();
// 2.多線程推送
ParallelFlowable
// 推送是nio,這里不設置并發(fā)數(shù)量了
.parallel()
// 切換到推送線程
.runOn(Schedulers.from(msgpushHandleTaskExecutor, false))
.map((target) -> {
redissionThreadLockTask.access();
// 推送
ListenableFuture
return new MsgListenableFutureMap(listenableFuture, target);
});
// future數(shù)據(jù)提取,推送線程future異常屏蔽
ParallelListenableFuture
parallelListenableFuture = new ParallelListenableFuture(handleParallelFlowable);
// 推送結果組裝成list
Flowable>> handleFlowable = parallelListenableFuture
.sequential()
.serialize()
.buffer(100, () -> new ArrayList<>());
final ListenableFutureDisposable listenableFutureDisposable = new ListenableFutureDisposable();
// 3. 數(shù)據(jù)更新
Disposable disposable = handleFlowable
// 允許多線程更新
.parallel(4, 1)
.runOn(Schedulers.from(msgpushUpdateTaskExecutor, false))
.sequential()
.subscribe((result) -> {
redissionThreadLockTask.access();
msgPushBO.handleAfter(result, metricRegistry);
String msg = "分片ID【" + shardId + "】,當前已處理數(shù)據(jù):" + parallelListenableFuture.getFutureDoneSize() + "條!";
listenableFutureCallback.info(msg);
}, (error) -> {
redissionThreadLockTask.realease();
listenableFutureDisposable.setDone();
listenableFutureCallback.info("分片ID【" + shardId + "】執(zhí)行異常:" + ThrowableUtil.stackTraceFullToString(error));
}, () -> {
redissionThreadLockTask.realease();
listenableFutureDisposable.setDone();
String msg = "分片ID【" + shardId + "】執(zhí)行成功,處理數(shù)據(jù):" + parallelListenableFuture.getFutureDoneSize() + "條!"
+ "\r\n執(zhí)行效率:\r\n" + report(metricRegistry);
listenableFutureCallback.info(msg);
});
listenableFutureDisposable.setDisposable(disposable);
return listenableFutureDisposable;
總結
rxjava解決的是業(yè)務存在耗時操作導致整個請求堵塞,可以通過線程切分,耗時操作放入單獨的線程中執(zhí)行,對于webflux結合nio請求可以節(jié)約線程數(shù)量,提供系統(tǒng)吞吐率。對于后端業(yè)務,為耗時操作提供異步解決方法,框架封裝很多常用操作,提高開發(fā)效率。rxjava其實在Android中用得廣泛,可以讓頁面刷新和耗時業(yè)務分開,避免界面假死。 存在問題 不能根據(jù)下游吞吐量自動調整推送線程數(shù)量,只能根據(jù)經(jīng)驗或測試后,根據(jù)結果手動調整。下游如果能在不降低單線程效率情況下,提高推送線程數(shù)量提供吞吐量,上游可以嘗試自動感知嘗試。
柚子快報邀請碼778899分享:關于rxjava理解
相關閱讀
本文內容根據(jù)網(wǎng)絡資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉載請注明,如有侵權,聯(lián)系刪除。