欧美free性护士vide0shd,老熟女,一区二区三区,久久久久夜夜夜精品国产,久久久久久综合网天天,欧美成人护士h版

首頁綜合 正文
目錄

柚子快報邀請碼778899分享:關于rxjava理解

柚子快報邀請碼778899分享:關于rxjava理解

http://yzkb.51969.com/

簡單介紹

rxjava中有Flowable、Maybe、Observable、Single、Completable 5種使用途徑,其中Flowable實現(xiàn)reactivestreams接口,是jdk9標準接口,支持背壓、并發(fā)模式。 反應式編程特點就是在訂閱時才執(zhí)行具體業(yè)務代碼。

接口介紹

Publisher消息提供發(fā)布者,提供訂閱方法,發(fā)布者定義好后,消費者通過訂閱通知發(fā)布者可以準備消息了。

public interface Publisher {

public void subscribe(Subscriber s);

}

Subscriber消息訂閱者,提供4個方法

public interface Subscriber {

/**

* 發(fā)布者可通過該方法,通知下游訂閱者收到訂閱了,可以通過Subscription 取消或者請求消息。

*/

public void onSubscribe(Subscription s);

/**

* 發(fā)布者被訂閱或被Subscription請求時,觸發(fā)訂閱者的onNext方法,發(fā)布消息。

*/

public void onNext(T t);

/**

* 執(zhí)行業(yè)務過程中發(fā)生異常,通知消費者,一般發(fā)生異常會中斷后續(xù)onNext流程

*/

public void onError(Throwable t);

/**

* 通知下游業(yè)務執(zhí)行完成

*/

public void onComplete();

}

Subscription提供reqeust和cancel 2個方法

public interface Subscription {

/**

* 讓上游發(fā)布消息

*/

public void request(long n);

/**

* 取消消息推送

*/

public void cancel();

}

Processor繼承Subscriber、Publisher。

Flowable遞歸調用

便于鏈式調用,所有Flowable相關的類繼承Flowable,實現(xiàn)subscribeActual,在Flowable中融合所有類的調度方法。

protected abstract void subscribeActual(@NonNull Subscriber<@NonNull ? super T> subscriber);

鏈式調用結構

左邊先調用的發(fā)布者在最里面,右邊訂閱者先調用的在最外面,因為先執(zhí)行構造定義,在通過訂閱串聯(lián)所有發(fā)布者。 Rx多數(shù)代碼的發(fā)布者和訂閱者在同一個類中實現(xiàn),發(fā)布者被訂閱(subscribe)時,通過包裹訂閱者接收上游的onSubscribe,可實現(xiàn)并發(fā)、緩存、重試、條件判斷等功能。這樣就構成,先鏈式調用的發(fā)布者,后被訂閱,先收到onSubscribe。

部分代碼理解

假設都是鏈式調用,先被調用的方法(發(fā)布者方法)代表前面,后被調用的方法代表后面。發(fā)布者稱為上游,訂閱者稱為下游。

線程切換

需要區(qū)分Flowable和ParallelFlowable,前者用于單線程,后者用于多線程,多線程和單線程有方法可以動態(tài)切換。

Flowable的subscribeOn和observeOn

subscribeOn 表示,首次被調用起到最上游的request請求和所有訂閱者的onNext方法都由該線程執(zhí)行。 構造函數(shù)的入?yún)ource為先鏈式調用的上游。 切換線程后執(zhí)行上游的訂閱,觸發(fā)上游調用onSubscribe、onNext等方法。如果上游不自動推送消息下來,在上游調用onSubscribe時,觸發(fā)上游的reqeuest,使上游推送消息到該類。 onNext直接轉發(fā)數(shù)據(jù)。

observeOn 表示,從被調用起后續(xù)所有的onNext方法都由該線程執(zhí)行。

總結 從上面可以看出,subscribeOn不論在哪里調用,可以作用于所有訂閱者,observeOn只能作用于后面的訂閱者。

ParallelFlowable的runOn 和subscribeOn類似,只是訂閱者變成多條。

多線程

Flowable使用方法parallel()切換到ParallelFlowable,而ParallelFlowable使用方法serialize()切換到Flowable。 其中parallel可以指定并行度,在runOn后可并發(fā)調用parallel的subscribe,serialize收集多個訂閱者的數(shù)據(jù)單線程推送給下游。

parallel()

serialize()

flatmap和map區(qū)別

flatmap用于獲取一堆數(shù)據(jù),類似一對多的關系,map用于數(shù)據(jù)轉換,類似從一對一的關系。

示例代碼

代碼中FlowableBufferedCreate根據(jù)FlowableCreate改寫的,把queue替換成blockingqueue,用于緩存滿的時候堵塞上游數(shù)據(jù)推送,避免堆積到內存里面。 ParallelListenableFuture用于當ListenableFuture調用完成后推送給下游。

// 1. 多線程查詢

ParallelFlowable queryParallelFlowable = Flowable.range(0, queryThreadSize)

.parallel(2)

// 切換到查詢線程

.runOn(Schedulers.from(msgpushQueryTaskExecutor, true))

.flatMap((index) -> new FlowableBufferedCreate(emitter -> {

Integer querySize = queryThreadSize;

Integer queryIndex = index;

if (queryThreadSize == 1) {

querySize = null;

queryIndex = null;

}

// 調用查詢

msgPushBO.query(shardId, null, queryIndex, querySize, (target) -> {

emitter.onNext(target);

redissionThreadLockTask.access();

}, metricRegistry);

// 觸發(fā)完成

emitter.onComplete();

}, 256)

);

Flowable queryFlowable = queryParallelFlowable

// 并行轉單行且發(fā)生異常后,延遲發(fā)送錯誤信息,一個分支錯誤,不影響其他分支繼續(xù)查詢

.sequentialDelayError();

// 2.多線程推送

ParallelFlowable handleParallelFlowable = queryFlowable

// 推送是nio,這里不設置并發(fā)數(shù)量了

.parallel()

// 切換到推送線程

.runOn(Schedulers.from(msgpushHandleTaskExecutor, false))

.map((target) -> {

redissionThreadLockTask.access();

// 推送

ListenableFuture listenableFuture = msgPushBO.handleNioExecute(shardId, null, target, metricRegistry);

return new MsgListenableFutureMap(listenableFuture, target);

});

// future數(shù)據(jù)提取,推送線程future異常屏蔽

ParallelListenableFuture>

parallelListenableFuture = new ParallelListenableFuture(handleParallelFlowable);

// 推送結果組裝成list

Flowable>> handleFlowable = parallelListenableFuture

.sequential()

.serialize()

.buffer(100, () -> new ArrayList<>());

final ListenableFutureDisposable listenableFutureDisposable = new ListenableFutureDisposable();

// 3. 數(shù)據(jù)更新

Disposable disposable = handleFlowable

// 允許多線程更新

.parallel(4, 1)

.runOn(Schedulers.from(msgpushUpdateTaskExecutor, false))

.sequential()

.subscribe((result) -> {

redissionThreadLockTask.access();

msgPushBO.handleAfter(result, metricRegistry);

String msg = "分片ID【" + shardId + "】,當前已處理數(shù)據(jù):" + parallelListenableFuture.getFutureDoneSize() + "條!";

listenableFutureCallback.info(msg);

}, (error) -> {

redissionThreadLockTask.realease();

listenableFutureDisposable.setDone();

listenableFutureCallback.info("分片ID【" + shardId + "】執(zhí)行異常:" + ThrowableUtil.stackTraceFullToString(error));

}, () -> {

redissionThreadLockTask.realease();

listenableFutureDisposable.setDone();

String msg = "分片ID【" + shardId + "】執(zhí)行成功,處理數(shù)據(jù):" + parallelListenableFuture.getFutureDoneSize() + "條!"

+ "\r\n執(zhí)行效率:\r\n" + report(metricRegistry);

listenableFutureCallback.info(msg);

});

listenableFutureDisposable.setDisposable(disposable);

return listenableFutureDisposable;

總結

rxjava解決的是業(yè)務存在耗時操作導致整個請求堵塞,可以通過線程切分,耗時操作放入單獨的線程中執(zhí)行,對于webflux結合nio請求可以節(jié)約線程數(shù)量,提供系統(tǒng)吞吐率。對于后端業(yè)務,為耗時操作提供異步解決方法,框架封裝很多常用操作,提高開發(fā)效率。rxjava其實在Android中用得廣泛,可以讓頁面刷新和耗時業(yè)務分開,避免界面假死。 存在問題 不能根據(jù)下游吞吐量自動調整推送線程數(shù)量,只能根據(jù)經(jīng)驗或測試后,根據(jù)結果手動調整。下游如果能在不降低單線程效率情況下,提高推送線程數(shù)量提供吞吐量,上游可以嘗試自動感知嘗試。

柚子快報邀請碼778899分享:關于rxjava理解

http://yzkb.51969.com/

相關閱讀

評論可見,查看隱藏內容

本文內容根據(jù)網(wǎng)絡資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。

轉載請注明,如有侵權,聯(lián)系刪除。

本文鏈接:http://gantiao.com.cn/post/18921547.html

發(fā)布評論

您暫未設置收款碼

請在主題配置——文章設置里上傳

掃描二維碼手機訪問

文章目錄