- Learning RxJava
- Thomas Nield
- 392字
- 2021-07-02 22:22:52
Implementing and subscribing to an Observer
When you call the subscribe() method on an Observable, an Observer is used to consume these three events by implementing its methods. Instead of specifying lambda arguments like we were doing earlier, we can implement an Observer and pass an instance of it to the subscribe() method. Do not bother yourself about onSubscribe() at the moment. Just leave its implementation empty until we discuss it at the end of this chapter:
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
public class Launcher {
public static void main(String[] args) {
Observable<String> source =
Observable.just("Alpha", "Beta", "Gamma", "Delta",
"Epsilon");
Observer<Integer> myObserver = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
//do nothing with Disposable, disregard for now
}
@Override
public void onNext(Integer value) {
System.out.println("RECEIVED: " + value);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done!");
}
};
source.map(String::length).filter(i -> i >= 5)
.subscribe(myObserver);
}
}
The output is as follows:
RECEIVED: 5
RECEIVED: 5
RECEIVED: 5
RECEIVED: 7
Done!
We quickly create an Observer<Integer> that serves as our Observer, and it will receive integer length emissions. Our Observer receives emissions at the end of an Observable chain and serves as the endpoint where the emissions are consumed. By consumed, this means they reach the end of the process where they are written to a database, text file, a server response, displayed in a UI, or (in this case) just printed to the console.
To further explain this example in detail, we start with string emissions at our source. We declare our Observer in advance and pass it to the subscribe() method at the end of our Observable chain. Note that each string is transformed to its length. The onNext() method receives each integer length emission and prints it using System.out.println("RECEIVED: " + value). We will not get any errors running this simple process, but if one did occur anywhere in our Observable chain, it will be pushed to our onError() implementation on Observer, where the stack trace of Throwable will be printed. Finally, when the source has no more emissions (after pushing "Epsilon"), it will call onComplete() up the chain all the way to the Observer, where its onComplete() method will be called and print Done! to the console.
- C語(yǔ)言程序設(shè)計(jì)案例教程(第2版)
- Web Scraping with Python
- JavaFX Essentials
- 數(shù)據(jù)結(jié)構(gòu)簡(jiǎn)明教程(第2版)微課版
- Java性能權(quán)威指南(第2版)
- GameMaker Programming By Example
- 劍指MySQL:架構(gòu)、調(diào)優(yōu)與運(yùn)維
- Mastering Drupal 8 Views
- Keras深度學(xué)習(xí)實(shí)戰(zhàn)
- 軟件體系結(jié)構(gòu)
- 計(jì)算機(jī)應(yīng)用技能實(shí)訓(xùn)教程
- scikit-learn Cookbook(Second Edition)
- Oracle Database XE 11gR2 Jump Start Guide
- Clojure Web Development Essentials
- Internet of Things with Arduino Cookbook