官术网_书友最值得收藏!

  • Learning RxJava
  • Thomas Nield
  • 159字
  • 2021-07-02 22:22:55

Handling Disposal with Observable.create()

If your Observable.create() is returning a long-running or infinite Observable, you should ideally check the isDisposed() method of ObservableEmitter regularly, to see whether you should keep sending emissions. This prevents unnecessary work from being done if the subscription is no longer active.

In this case, you should use Observable.range(), but for the sake of the example, let's say we are emitting integers in a for loop in Observable.create(). Before emitting each integer, you should make sure that ObservableEmitter does not indicate that a disposal was called:

    import io.reactivex.Observable;

public class Launcher {
public static void main(String[] args) {
Observable<Integer> source =
Observable.create(observableEmitter -> {
try {
for (int i = 0; i < 1000; i++) {
while (!observableEmitter.isDisposed()) {
observableEmitter.onNext(i);
}
if (observableEmitter.isDisposed())
return;
}
observableEmitter.onComplete();
} catch (Throwable e) {
observableEmitter.onError(e);
}
});
}
}

If your Observable.create() is wrapped around some resource, you should also handle the disposal of that resource to prevent leaks. ObservableEmitter has the setCancellable() and setDisposable() methods for that. In our earlier JavaFX example, we should remove the  ChangeListener from our JavaFX ObservableValue when a disposal occurs. We can provide a lambda to setCancellable(), which will execute the following action for us, which will occur when dispose() is called:

    private static <T> Observable<T> valuesOf(final ObservableValue<T> 
fxObservable) {
return Observable.create(observableEmitter -> {

//emit initial state
observableEmitter.onNext(fxObservable.getValue());

//emit value changes uses a listener
final ChangeListener<T> listener =
(observableValue, prev, current) ->
observableEmitter.onNext(current);

//add listener to ObservableValue
fxObservable.addListener(listener);

//Handle disposing by specifying cancellable
observableEmitter.setCancellable(() ->
fxObservable.removeListener(listener));
});
}
主站蜘蛛池模板: 威信县| 收藏| 叙永县| 莫力| 乐业县| 通州区| 景宁| 门源| 益阳市| 光山县| 保山市| 禄丰县| 萝北县| 宜兰市| 广丰县| 浦北县| 三台县| 尤溪县| 富民县| 青铜峡市| 蓝田县| 新巴尔虎右旗| 新巴尔虎左旗| 茶陵县| 丹东市| 精河县| 东方市| 琼海市| 施甸县| 古浪县| 溧阳市| 美姑县| 东宁县| 双桥区| 托克逊县| 岳阳县| 贵州省| 广平县| 中方县| 凌源市| 阳山县|