RxJava如何結合觀察者與鏈式處理

RxJava如何結合觀察者與鏈式處理

Author: Dorae
Date: 2018年12月3日17:10:31
轉載請註明出處


一、概述

首先問自己幾個問題,如果非常清楚這幾個問題的目的與答案,那麼恭喜你,不用繼續往下看了-_-。

  1. RxJava是幹什麼的;
  2. 鏈式調用中當存在數個Observable.subscribeOn()時的處理方式;
  3. 鏈式調用中當存在數個Observable.observeOn()時的處理方式;
  4. 數據是如何經過操作符進行的處理。

回顧

觀察者模式

如圖1-1所示

圖 1-1

Java8的stream

參見這裏

二、RxJava是什麼

一款為了簡化異步調用,且功能比較全面的框架。

三、RxJava如何結合了觀察者模式與鏈式處理

參見Java8中的sink鏈,在RxJava中同樣實現了鏈式處理。如代碼片段code1-1所示,我們對其結構進行分析:

code1-1

Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("Dorae");
        }
    })
    .filter(e -> e.contains("o"))
    .map(e -> "AfterMap: " + e)
    .filter(e -> e.contains("D"))
    .subscribe(new Observer<String>() {

                @Override
                public void onNext(@NonNull String o) {
                    System.out.println("觀察者 onNext: " + o);
                }

                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("觀察者onSubscribe: " + d + "### " + Thread.currentThread().getName());
                }

                @Override
                public void onError(Throwable e) {
                }

                @Override
                public void onComplete() {

                }
            });
            

1、首先看下其輸出結果:

觀察者onSubscribe: io.reactive[email protected]52d455b8### main

觀察者 onNext: AfterMap: Dorae

2、現在來對如何輸出這段結果進行分析

首先了解下RxJava中的幾種基本角色:

  • Observable,是RxJava描述的事件流,可以説其與Observer構成了RxJava的基礎。在鏈式調用中,事件從創建到加工到最後被Observer接受,其實就是由一條Observerable鏈串起來的。
  • Observer,RxJava中的訂閲者,也就是需要對事件進行響應的一個角色。其實除了我們通常自己實現的一個Observer,在鏈中的每一步都會產生一個Observer,然後這些Observer構成一條鏈,最終完成整個鏈的計算。
  • ObservableOnSubscribe,整個事件流的源頭,通常需要我們自己實現,其依賴一個Emitter。
  • Emitter,可以將其理解為觸發器,推動整個流程的運轉。
  • Scheduler,這個其實不用太過關心,RxJava用其封裝了Thread,用於完成線程切換等任務。

是不是感覺上邊的一堆廢話非常枯燥?先上一張RxJava的核心結構,如圖3-1所示。

圖 3-1

現在我們再來看看code1-1,其最終形成的Observable鏈如圖3-2所示,每次調用map、filter等操作,都會生成一個新對象,並且保持了一個對上游的引用(用於生成Observer鏈)。

圖 3-2

Observer鏈如圖3-3所示,整個事件流程由CreateEmitter觸發,最終交由我們的實現Observer$1處理。

圖 3-3

看了上邊幾張圖之後,是不是感覺清晰了很多?那麼讓我們進一步看下Rxjava如何完成了一鍵線程切換。

四、RxJava如何實現線程切換

通常我們使用RxJava的線程切換功能時,只需要在調用鏈中加上一句subscribeOn()或observeOn(),其中Scheduler如上所述,其實就是一個包裝了ThreadPool的調度器。那麼我們先來看下相關源碼。

1、subscribeOn

如代碼code4-1所示,為subscribeOn的核心代碼。很明顯,其中在新線程中只是簡單的直接調用了source,也就是説這裏之後的所有操作均在一個新線程中進行,和單線程並沒有什麼區別。

code 4-1

public final Observable<T> subscribeOn(Scheduler scheduler) {
    return new ObservableSubscribeOn<T>() {
        @Override
        public void subscribeActual(final Observer<? super T> observer) {
            scheduler.createWorker().schedule(new SubscribeTask() {
                @Override
                public void run() {
                    source.subscribe(e);
                }
            });
        }
    };
}

2、observeOn

如代碼段code4-2所示,為observeOn的核心邏輯,可以看出其在訂閲階段(生成Observer鏈的階段)還是在當前線程執行,只有觸發之後,到了ObserverOn的Observer的節點時才會真正的切換到新線程。

code 4-2

public final Observable<T> observeOn(Scheduler scheduler) {
    return new ObservableOnSubscribe<T>() {
        @Override
        public void subscribeActual(@NonNull Observer<Object> e) { 
            source.subscribe(new Observer<T>() {

                @Override
                public void onNext(T var1) {
                    scheduler.createWorker().schedule(new Runnable() {
                        @Override
                        public void run() {
                            e.onNext(var1);
                        }
                    });
                }
            });
        }
    };
}

多次Observable.subscribeOn()、多次Observable.observeOn()會發生什麼

通過上述code4-1、code4-2的分析,是不是可以推斷出當多次subscribeOn時會發生什麼?沒錯,雖然每次subscribeOn都會產生一次線程切換,但是真正起作用的只有最開始的一次subscribeOn,也就相當於只在最初的位置調用了subscribeOn;對於observeOn也是類似,每次都會產生新線程,但是每次都會產生一定的影響,也就是每個線程都承擔了一部分工作。

小結

通過本文,我們可以簡要了解到RxJava的基本原理,但是對於其豐富的api還需要在實踐中進行磨合。但是,RxJava既然作為一個異步框架,其必然有一定的侷限,比如其切換線程時無法阻塞當前線程(這種對於Android等需要渲染或者網絡IO的需求來説非常適用),但是對於常見的服務端業務來説,還需要額外引入阻塞當前線程的操作(因為大部分的server代碼還是單線程模型),倘若完全不用線程切換在服務端強行引入,可能會得不償失。個人更推薦Java8的CompletableFuture。

參考

理解RxJava(一)基本流程源碼分析

RxJava基本原理分析

關鍵詞:rxjava observer public 觀察 處理 override subscribeon void observable 如何

相關推薦:

RxJava2源碼解析(二)

八個層面比較 Java 8, RxJava, Reactor

rxjava2.x源碼學習隨筆

5 reasons to use RxJava in your projects

RxJava 2.x 源碼分析

深入理解 RxJava2:揭祕 subscribeOn(3)

rxjava2——線程切換

5 Reasons to Use RxJava in Your Projects

Android異步框架RxJava 1.x系列(三) - 線程調度器Scheduler

詳解 RxJava 的消息訂閲和線程切換原理