官术网_书友最值得收藏!

Implementing business logic

The TemperatureSensor class previously sent events to a Spring ApplicationEventPublisher, but now it should return a reactive stream with Temperature events. The Reactive implementation of TemperatureSensor may look like the following:

@Component                                                          // (1)
public class TemperatureSensor {
private final Random rnd = new Random(); // (2)

private final Observable<Temperature> dataStream = // (3)
Observable
.range(0, Integer.MAX_VALUE) // (4)
.concatMap(tick -> Observable // (5)
.just(tick) // (6)
.delay(rnd.nextInt(5000), MILLISECONDS) // (7)
.map(tickValue -> this.probe())) // (8)
.publish() // (9)
.refCount(); // (10)

private Temperature probe() {
return new Temperature(16 + rnd.nextGaussian() * 10); // (11)
}

public Observable<Temperature> temperatureStream() { // (12)
return dataStream;
}
}

Here, we register the TemperatureSensor as a Spring bean by applying the @Component annotation (1), so this bean can be autowired into other beans. The TemperatureSensor implementation uses a RxJava API that was not previously explained in detail. Nevertheless, we are trying to clarify the used transformation by exploring the class logic.

Our sensor holds the random number generator rnd to simulate actual hardware sensor measurements (2). In a statement, (3), we define a private field called dataStream, which is returned by the public method temperatureStream() (12).  Thus, dataStream is the only Observable stream defined by the component. This stream generates an effectively endless flow of numbers (4) by applying the factory method range(0, Integer.MAX_VALUE). The range() method generates a sequence of integers starting from 0 that have Integer.MAX_VALUE elements. For each of these values, we apply the transformation (5)concatMap(tick -> ...). The method concatMap() receives a function, f, that transforms an tick item into an observable stream of elements, applies the f function to each element of the incoming stream, and joins the resulting streams one by one. In our case, the f function makes a sensor measurement after a random delay (to match the behavior of the previous implementation). To probe a sensor, we create a new stream with only one element tick (6). To simulate a random delay, we apply the delay(rnd.nextInt(5000), MILLISECONDS) (7) 0perator, which shifts elements forward in time.

For the next step, we probe the sensor and retrieve a temperature value by applying the map(tickValue -> this.probe())) transformation (8), which in turn calls the probe() method with the same data generation logic as before (11). In that case, we ignore the tickValue, as it was required only to generate a one-element stream. So, after applying the concatMap(tick -> ...)we have a stream that returns sensor values with a random interval of up to five seconds between emitted elements.

Actually, we could return a stream without applying operators (9) and (10), but in that case, each subscriber (SSE client) would trigger a new subscription for the stream and a new sequence of sensor readings. This means that sensor readings would not be shared among subscribers that could lead to hardware overload and degradation. To prevent this, we use the publish() (9) operator, which broadcasts events from a source stream to all destination streams. The publish() operator returns a special kind of Observable called ConnectableObservable. The latter provides the refCount() (10) operator, which creates a subscription to the incoming shared stream only when there is at least one outgoing subscription. In contrast with the Publisher-Subscriber implementation, this one makes it possible not to probe the sensor when nobody listens.

主站蜘蛛池模板: 葵青区| 丰顺县| 新干县| 鲜城| 綦江县| 云龙县| 和田县| 武山县| 凤庆县| 镇远县| 乐业县| 孝昌县| 来安县| 乌拉特后旗| 大连市| 东宁县| 乐清市| 湾仔区| 星子县| 绥芬河市| 朝阳区| 攀枝花市| 青岛市| 曲松县| 平谷区| 垦利县| 靖远县| 宁波市| 仁化县| 岐山县| 阳泉市| 汕头市| 灵台县| 高阳县| 聊城市| 托里县| 化德县| 清流县| 四会市| 牡丹江市| 庐江县|