官术网_书友最值得收藏!

Implementing business logic

The TemperatureSensor class previously sent events to a Spring ApplicationEventPublisher, but now it should return a reactive stream with Temperature events. The Reactive implementation of TemperatureSensor may look like the following:

@Component                                                          // (1)
public class TemperatureSensor {
private final Random rnd = new Random(); // (2)

private final Observable<Temperature> dataStream = // (3)
Observable
.range(0, Integer.MAX_VALUE) // (4)
.concatMap(tick -> Observable // (5)
.just(tick) // (6)
.delay(rnd.nextInt(5000), MILLISECONDS) // (7)
.map(tickValue -> this.probe())) // (8)
.publish() // (9)
.refCount(); // (10)

private Temperature probe() {
return new Temperature(16 + rnd.nextGaussian() * 10); // (11)
}

public Observable<Temperature> temperatureStream() { // (12)
return dataStream;
}
}

Here, we register the TemperatureSensor as a Spring bean by applying the @Component annotation (1), so this bean can be autowired into other beans. The TemperatureSensor implementation uses a RxJava API that was not previously explained in detail. Nevertheless, we are trying to clarify the used transformation by exploring the class logic.

Our sensor holds the random number generator rnd to simulate actual hardware sensor measurements (2). In a statement, (3), we define a private field called dataStream, which is returned by the public method temperatureStream() (12).  Thus, dataStream is the only Observable stream defined by the component. This stream generates an effectively endless flow of numbers (4) by applying the factory method range(0, Integer.MAX_VALUE). The range() method generates a sequence of integers starting from 0 that have Integer.MAX_VALUE elements. For each of these values, we apply the transformation (5)concatMap(tick -> ...). The method concatMap() receives a function, f, that transforms an tick item into an observable stream of elements, applies the f function to each element of the incoming stream, and joins the resulting streams one by one. In our case, the f function makes a sensor measurement after a random delay (to match the behavior of the previous implementation). To probe a sensor, we create a new stream with only one element tick (6). To simulate a random delay, we apply the delay(rnd.nextInt(5000), MILLISECONDS) (7) 0perator, which shifts elements forward in time.

For the next step, we probe the sensor and retrieve a temperature value by applying the map(tickValue -> this.probe())) transformation (8), which in turn calls the probe() method with the same data generation logic as before (11). In that case, we ignore the tickValue, as it was required only to generate a one-element stream. So, after applying the concatMap(tick -> ...)we have a stream that returns sensor values with a random interval of up to five seconds between emitted elements.

Actually, we could return a stream without applying operators (9) and (10), but in that case, each subscriber (SSE client) would trigger a new subscription for the stream and a new sequence of sensor readings. This means that sensor readings would not be shared among subscribers that could lead to hardware overload and degradation. To prevent this, we use the publish() (9) operator, which broadcasts events from a source stream to all destination streams. The publish() operator returns a special kind of Observable called ConnectableObservable. The latter provides the refCount() (10) operator, which creates a subscription to the incoming shared stream only when there is at least one outgoing subscription. In contrast with the Publisher-Subscriber implementation, this one makes it possible not to probe the sensor when nobody listens.

主站蜘蛛池模板: 莱州市| 垣曲县| 焦作市| 南和县| 江山市| 吉首市| 南雄市| 长岭县| 凉城县| 阳新县| 桓台县| 滦南县| 新化县| 桦甸市| 伊通| 涪陵区| 宜丰县| 同江市| 海原县| 即墨市| 铜陵市| 成安县| 象州县| 平山县| 井冈山市| 扶风县| 昔阳县| 高要市| 志丹县| 隆尧县| 阳城县| 霍山县| 东海县| 三亚市| 贡觉县| 乐陵市| 柳林县| 顺昌县| 蓬莱市| 新乡县| 大悟县|