- Learning RxJava
- Thomas Nield
- 341字
- 2021-07-02 22:22:55
Handling a Disposable within an Observer
Earlier, I shied away from talking about the onSubscribe() method in the Observer, but now we will address it. You may have noticed that Disposable is passed in the implementation of an Observer through the onSubscribe() method. This method was added in RxJava 2.0, and it allows the Observer to have the ability to dispose of the subscription at any time.
For instance, you can implement your own Observer and use onNext(), onComplete(), or onError() to have access to the Disposable. This way, these three events can call dispose() if, for whatever reason, the Observer does not want any more emissions:
Observer<Integer> myObserver = new Observer<Integer>() {
private Disposable disposable;
@Override
public void onSubscribe(Disposable disposable) {
this.disposable = disposable;
}
@Override
public void onNext(Integer value) {
//has access to Disposable
}
@Override
public void onError(Throwable e) {
//has access to Disposable
}
@Override
public void onComplete() {
//has access to Disposable
}
};
The Disposable is sent from the source all the way up the chain to the Observer, so each step in the Observable chain has access to the Disposable.
Note that passing an Observer to the subscribe() method will be void and not return a Disposable since it is assumed that the Observer will handle it. If you do not want to explicitly handle the Disposable and want RxJava to handle it for you (which is probably a good idea until you have reason to take control), you can extend ResourceObserver as your Observer, which uses a default Disposable handling. Pass this to subscribeWith() instead of subscribe(), and you will get the default Disposable returned:
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.ResourceObserver;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[] args) {
Observable<Long> source =
Observable.interval(1, TimeUnit.SECONDS);
ResourceObserver<Long> myObserver = new
ResourceObserver<Long>() {
@Override
public void onNext(Long value) {
System.out.println(value);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done!");
}
};
//capture Disposable
Disposable disposable = source.subscribeWith(myObserver);
}
}
- Python程序設(shè)計(jì)教程(第2版)
- Drupal 8 Blueprints
- 用Flutter極速構(gòu)建原生應(yīng)用
- 微服務(wù)從小白到專家:Spring Cloud和Kubernetes實(shí)戰(zhàn)
- 零代碼實(shí)戰(zhàn):企業(yè)級(jí)應(yīng)用搭建與案例詳解
- 實(shí)戰(zhàn)Java高并發(fā)程序設(shè)計(jì)(第2版)
- Web程序設(shè)計(jì):ASP.NET(第2版)
- Kotlin進(jìn)階實(shí)戰(zhàn)
- Clojure Web Development Essentials
- 你必須知道的.NET(第2版)
- RESTful Web API Design with Node.js(Second Edition)
- PHP 7 Programming Blueprints
- Slick2D Game Development
- 計(jì)算機(jī)網(wǎng)絡(luò)基礎(chǔ)
- 分布式中間件技術(shù)實(shí)戰(zhàn):Java版