官术网_书友最值得收藏!

Hot Observable

Hot Observable, on the other hand, has the producer created or activated outside of it. Hot Observable emits the stream that is shared by all observers. Let's see the example, as follows:

public class RxJavaHotObservable1 {
public static void main(String args[]) {
Observable<Long> observableInterval = Observable.interval(2, TimeUnit.SECONDS);
PublishSubject<Long> publishSubject = PublishSubject.create();
observableInterval.subscribe(publishSubject);
publishSubject.subscribe(i -> System.out.println("Observable #1 : "+i));
addDelay(4000);
publishSubject.subscribe(i -> System.out.println("Observable #2 : "+i));
addDelay(10000);
}
private static void addDelay(int miliseconds) {
try {
Thread.sleep(miliseconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

The observableInterval observable emits the event instead of data in this example. The interval method is used to emit sequential numbers at given intervals. We have used PublishSubject to make this observable as a hot type.  It can be behave as either Observable or Observer. It is part of the Observable chain in this case. We then simply add two subscribers to PublishSubject with some delay in between. You will get an output as follows:

The second Observer is subscribed after some delay to the first Observer. The Observable emits the sequential number every two seconds. The second Observer starts at the fourth second. Hot Observable emits just a single stream, which is shared across all Observers. So, in the case of the second Observer, the actual value is started from 2 instead of 0 as it subscribes after some time. 

In this sense, hot Observable can be compared with a subscription to a radio station. A person who starts listening will not be able to hear what was played before he subscribed, as it is common to all subscribers (or say Observers in Reactive language). There are other ways to create hot Observable. We will see one of them as follows:

public class RxJavaHotObservable2 {
public static void main(String args[]) {
Observable<Long> observableInt = Observable.interval(2, TimeUnit.SECONDS);
ConnectableObservable<Long> connectableIntObservable = observableInt.publish();
connectableIntObservable.subscribe(i -> System.out.println("Observable #1 : "+i));
connectableIntObservable.connect();
addDelay(7000);
connectableIntObservable.
subscribe(i -> System.out.println("Observable #2 : "+i));
addDelay(10000);
}

private static void addDelay(int miliseconds) {
try {
Thread.sleep(miliseconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

In this code, hot Observable is created with ConnectableObservable. It will not start emitting the data until the connect method is called on it, making it more controllable. Soon after the connect method is called, it will start a single stream, which is shared across the Observers. You will get an output as follows:

You can see how the second Observer missed the first few items as it was subscribed with some delay. You can convert any cold Observable to ConnectableObservable by calling the publish method on it. 

主站蜘蛛池模板: 桂平市| 阿拉善左旗| 垦利县| 长春市| 桦川县| 黑龙江省| 漳州市| 大宁县| 静宁县| 古丈县| 米易县| 宿松县| 五峰| 岳池县| 奉贤区| 泾源县| 屯昌县| 南宁市| 开鲁县| 永昌县| 天台县| 盐池县| 新巴尔虎左旗| 定西市| 额尔古纳市| 石楼县| 东源县| 景谷| 望江县| 左贡县| 嘉义市| 盖州市| 静安区| 密云县| 澜沧| 思茅市| 徐水县| 报价| 肥东县| 镇宁| 鄂伦春自治旗|