官术网_书友最值得收藏!

  • Learning RxJava
  • Thomas Nield
  • 698字
  • 2021-07-02 22:22:53

Observable.interval()

As we have seen, Observables have a concept of emissions over time. Emissions are handed from the source up to the Observer sequentially. But these emissions can be spaced out over time depending on when the source provides them. Our JavaFX example with ToggleButton demonstrated this, as each click resulted in an emission of true or false.

But let's look at a simple example of a time-based Observable using Observable.interval(). It will emit a consecutive long emission (starting at 0) at every specified time interval. Here, we have an Observable<Long> that emits every second:

    import io.reactivex.Observable;

import java.util.concurrent.TimeUnit;

public class Launcher {
public static void main(String[]args) {

Observable.interval(1, TimeUnit.SECONDS)
.subscribe(s -> System.out.println(s + " Mississippi"));
sleep(5000);

}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

The output is as follows:

    0 Mississippi
1 Mississippi
2 Mississippi
3 Mississippi
4 Mississippi

Observable.interval() will emit infinitely at the specified interval (which is 1 second in this case). However, because it operates on a timer, it needs to run on a separate thread and will run on the computation Scheduler by default. We will cover concurrency in Chapter 6, Concurrency and Parallelization and learn about schedulers. For now, just note that our  main() method is going to kick off this Observable, but it will not wait for it to finish. It is now emitting on a separate thread. To keep our main() method from finishing and exiting the application before our Observable has a chance to fire, we use a sleep() method to keep this application alive for five seconds. This gives our Observable five seconds to fire emissions before the application quits. When you create production applications, you likely will not run into this issue often as non-daemon threads for tasks such as web services, Android apps, or JavaFX will keep the application alive.

Trick question: does Observable.interval() return a hot or a cold Observable? Because it is event-driven (and infinite), you may be tempted to say it is hot. But put a second Observer on it, wait for five seconds, and then add another Observer. What happens? Let's take a look:

    import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;

public class Launcher {

public static void main(String[] args) {

Observable<Long> seconds = Observable.interval(1,
TimeUnit.SECONDS);

//Observer 1
seconds.subscribe(l -> System.out.println("Observer 1: " + l));

//sleep 5 seconds
sleep(5000);

//Observer 2
seconds.subscribe(l -> System.out.println("Observer 2: " + l));

//sleep 5 seconds
sleep(5000);
}

public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

The output is as follows:

    Observer 1: 0
Observer 1: 1
Observer 1: 2
Observer 1: 3
Observer 1: 4
Observer 1: 5
Observer 2: 0
Observer 1: 6
Observer 2: 1
Observer 1: 7
Observer 2: 2
Observer 1: 8
Observer 2: 3
Observer 1: 9
Observer 2: 4

Look what happened after five seconds elapsed, when Observer 2 came in. Note that it is on its own separate timer and starting at 0! These two observers are actually getting their own emissions, each starting at 0. So this Observable is actually cold. To put all observers on the same timer with the same emissions, you will want to use ConnectableObservable to force these emissions to become hot:

    import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
import java.util.concurrent.TimeUnit;

public class Launcher {

public static void main(String[] args) {
ConnectableObservable<Long> seconds =
Observable.interval(1, TimeUnit.SECONDS).publish();

//observer 1
seconds.subscribe(l -> System.out.println("Observer 1: " + l));
seconds.connect();

//sleep 5 seconds
sleep(5000);

//observer 2
seconds.subscribe(l -> System.out.println("Observer 2: " + l));

//sleep 5 seconds
sleep(5000);

}

public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

The output is as follows:

    Observer 1: 0
Observer 1: 1
Observer 1: 2
Observer 1: 3
Observer 1: 4
Observer 1: 5
Observer 2: 5
Observer 1: 6
Observer 2: 6
Observer 1: 7
Observer 2: 7
Observer 1: 8
Observer 2: 8
Observer 1: 9
Observer 2: 9

Now Observer 2, although 5 seconds late and having missed the previous emissions, will at least be completely in sync with Observer 1 and receive the same emissions.

主站蜘蛛池模板: 林甸县| 沙雅县| 商河县| 西华县| 南安市| 河北区| 阿勒泰市| 龙南县| 彭阳县| 屏东县| 阿城市| 普陀区| 翁源县| 高尔夫| 晋州市| 两当县| 东兰县| 屏山县| 肃宁县| 宝山区| 买车| 襄樊市| 石狮市| 米易县| 休宁县| 霍城县| 海原县| 孝感市| 峨眉山市| 武夷山市| 金塔县| 额尔古纳市| 卫辉市| 宜城市| 封丘县| 徐州市| 改则县| 大悟县| 芜湖市| 永泰县| 响水县|