官术网_书友最值得收藏!

  • Learning RxJava
  • Thomas Nield
  • 461字
  • 2021-07-02 22:22:52

Shorthand Observers with lambdas

Implementing an Observer is a bit verbose and cumbersome. Thankfully, the subscribe() method is overloaded to accept lambda arguments for our three events. This is likely what we will want to use for most cases, and we can specify three lambda parameters separated by commas: the onNext lambda, the onError lambda, and the onComplete lambda. For our previous example, we can consolidate our three method implementations using these three lambdas:

    Consumer<Integer> onNext = i ->  System.out.println("RECEIVED: "          + i);

Action onComplete = () -> System.out.println("Done!");

Consumer<Throwable> onError = Throwable::printStackTrace;

We can pass these three lambdas as arguments to the subscribe() method, and it will use them to implement an Observer for us. This is much more concise and requires far less boilerplate code:

    import io.reactivex.Observable;

public class Launcher {

public static void main(String[] args) {

Observable<String> source =
Observable.just("Alpha", "Beta", "Gamma", "Delta",
"Epsilon");


source.map(String::length).filter(i -> i >= 5)
.subscribe(i -> System.out.println("RECEIVED: " + i),
Throwable::printStackTrace,
() -> System.out.println("Done!"));
}
}

The output is as follows:

    RECEIVED: 5
RECEIVED: 5
RECEIVED: 5
RECEIVED: 7
Done!

Note that there are other overloads for subscribe(). You can omit onComplete() and only implement onNext() and onError(). This will no longer perform any action for onComplete(), but there will likely be cases where you do not need one:

    import io.reactivex.Observable;

public class Launcher {

public static void main(String[] args) {

Observable<String> source =
Observable.just("Alpha", "Beta", "Gamma", "Delta",
"Epsilon");

source.map(String::length).filter(i -> i >= 5)
.subscribe(i -> System.out.println("RECEIVED: " + i),
Throwable::printStackTrace);
}
}

The output is as follows:

    RECEIVED: 5
RECEIVED: 5
RECEIVED: 5
RECEIVED: 7

As you have seen in earlier examples, you can even omit onError and just specify onNext:

    import io.reactivex.Observable;

public class Launcher {

public static void main(String[] args) {

Observable<String> source =
Observable.just("Alpha", "Beta", "Gamma", "Delta",
"Epsilon");

source.map(String::length).filter(i -> i >= 5)
.subscribe(i -> System.out.println("RECEIVED: " + i));
}
}

However, not implementing onError() is something you want to avoid doing in production. Errors that happen anywhere in the Observable chain will be propagated to onError() to be handled and then terminate the Observable with no more emissions. If you do not specify an action for onError, the error will go unhandled.

You can use retry() operators to attempt recovery and resubscribe to an Observable if an error occurs. We will cover how to do that in the next chapter.

It is critical to note that most of the subscribe() overload variants (including the shorthand lambda ones we just covered) return a Disposable that we did not do anything with. disposables allow us to disconnect an Observable from an Observer so emissions are terminated early, which is critical for infinite or long-running Observables. We will cover disposables at the end of this chapter.

主站蜘蛛池模板: 甘肃省| 海晏县| 商都县| 肥西县| 沐川县| 锡林浩特市| 峨眉山市| 太谷县| 辉南县| 汉川市| 弥渡县| 弥渡县| 衡阳市| 宣威市| 蒲江县| 盈江县| 信阳市| 泰来县| 凉城县| 双辽市| 清徐县| 富宁县| 内乡县| 台安县| 剑河县| 仙游县| 丰镇市| 恩平市| 白山市| 忻城县| 玉龙| 连平县| 尼勒克县| 郑州市| 班玛县| 台江县| 和林格尔县| 柏乡县| 池州市| 黑水县| 灵武市|