官术网_书友最值得收藏!

  • Learning RxJava
  • Thomas Nield
  • 364字
  • 2021-07-02 22:22:55

Disposing

When you subscribe() to an Observable to receive emissions, a stream is created to process these emissions through the Observable chain. Of course, this uses resources. When we are done, we want to dispose of these resources so that they can be garbage-collected. Thankfully, the finite Observables that call onComplete() will typically dispose of themselves safely when they are done. But if you are working with infinite or long-running Observables, you likely will run into situations where you want to explicitly stop the emissions and dispose of everything associated with that subscription. As a matter of fact, you cannot trust the garbage collector to take care of active subscriptions that you no longer need, and explicit disposal is necessary in order to prevent memory leaks.

The Disposable is a link between an Observable and an active Observer, and you can call its dispose() method to stop emissions and dispose of all resources used for that Observer. It also has an isDisposed() method, indicating whether it has been disposed of already:

    package io.reactivex.disposables;

public interface Disposable {
void dispose();
boolean isDisposed();
}

When you provide onNext(), onComplete(), and/or onError() lambdas as arguments to the subscribe() method,  it will actually return a Disposable. You can use this to stop emissions at any time by calling its dispose() method. For instance, we can stop receiving emissions from an Observable.interval() after five seconds:

    import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import java.util.concurrent.TimeUnit;

public class Launcher {

public static void main(String[] args) {

Observable<Long> seconds =
Observable.interval(1, TimeUnit.SECONDS);

Disposable disposable =
seconds.subscribe(l -> System.out.println("Received: " + l));

//sleep 5 seconds
sleep(5000);

//dispose and stop emissions
disposable.dispose();

//sleep 5 seconds to prove
//there are no more emissions
sleep(5000);

}

public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

Here, we let Observable.interval() run for five seconds with an Observer, but we save the Disposable returned from the subscribe() method. Then we call the Disposable's dispose() method to stop the process and free any resources that were being used. Then, we sleep for another five seconds just to prove that no more emissions are happening.

主站蜘蛛池模板: 广宗县| 安平县| 河西区| 宝兴县| 施甸县| 简阳市| 龙川县| 腾冲县| 邵阳县| 温州市| 兴隆县| 嘉义县| 赫章县| 合肥市| 涿州市| 南乐县| 共和县| 衡阳市| 东城区| 海口市| 西和县| 彭阳县| 曲阜市| 满城县| 江陵县| 酒泉市| 舒兰市| 临沭县| 监利县| 湘阴县| 汝州市| 河南省| 六枝特区| 黎城县| 巴里| 鄢陵县| 梁平县| 新丰县| 宁远县| 调兵山市| 尚义县|