- Learning RxJava
- Thomas Nield
- 159字
- 2021-07-02 22:22:55
Handling Disposal with Observable.create()
If your Observable.create() is returning a long-running or infinite Observable, you should ideally check the isDisposed() method of ObservableEmitter regularly, to see whether you should keep sending emissions. This prevents unnecessary work from being done if the subscription is no longer active.
In this case, you should use Observable.range(), but for the sake of the example, let's say we are emitting integers in a for loop in Observable.create(). Before emitting each integer, you should make sure that ObservableEmitter does not indicate that a disposal was called:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable<Integer> source =
Observable.create(observableEmitter -> {
try {
for (int i = 0; i < 1000; i++) {
while (!observableEmitter.isDisposed()) {
observableEmitter.onNext(i);
}
if (observableEmitter.isDisposed())
return;
}
observableEmitter.onComplete();
} catch (Throwable e) {
observableEmitter.onError(e);
}
});
}
}
If your Observable.create() is wrapped around some resource, you should also handle the disposal of that resource to prevent leaks. ObservableEmitter has the setCancellable() and setDisposable() methods for that. In our earlier JavaFX example, we should remove the ChangeListener from our JavaFX ObservableValue when a disposal occurs. We can provide a lambda to setCancellable(), which will execute the following action for us, which will occur when dispose() is called:
private static <T> Observable<T> valuesOf(final ObservableValue<T>
fxObservable) {
return Observable.create(observableEmitter -> {
//emit initial state
observableEmitter.onNext(fxObservable.getValue());
//emit value changes uses a listener
final ChangeListener<T> listener =
(observableValue, prev, current) ->
observableEmitter.onNext(current);
//add listener to ObservableValue
fxObservable.addListener(listener);
//Handle disposing by specifying cancellable
observableEmitter.setCancellable(() ->
fxObservable.removeListener(listener));
});
}
- Java 9 Concurrency Cookbook(Second Edition)
- Git高手之路
- UI智能化與前端智能化:工程技術、實現方法與編程思想
- Kali Linux Wireless Penetration Testing Beginner's Guide(Third Edition)
- 琢石成器:Windows環境下32位匯編語言程序設計
- Learning Zurb Foundation
- Instant Ext.NET Application Development
- 深入分布式緩存:從原理到實踐
- 詳解MATLAB圖形繪制技術
- MINECRAFT編程:使用Python語言玩轉我的世界
- Magento 2 Beginners Guide
- Data Science Algorithms in a Week
- Learning Bootstrap 4(Second Edition)
- Mastering Magento Theme Design
- Expert Cube Development with SSAS Multidimensional Models