- Hands-On Reactive Programming in Spring 5
- Oleh Dokuka Igor Lozynskyi
- 298字
- 2021-07-23 16:36:23
Custom SseEmitter
By using TemperatureSensor, which exposes a stream using temperature values, we may subscribe each new SseEmitter to the Observable stream and send the received onNext signals to SSE clients. To handle errors and the closing of a proper HTTP connection, let's write the following SseEmitter extension:
class RxSeeEmitter extends SseEmitter {
static final long SSE_SESSION_TIMEOUT = 30 * 60 * 1000L;
private final Subscriber<Temperature> subscriber; // (1)
RxSeeEmitter() {
super(SSE_SESSION_TIMEOUT); // (2)
this.subscriber = new Subscriber<Temperature>() { // (3)
@Override
public void onNext(Temperature temperature) {
try {
RxSeeEmitter.this.send(temperature); // (4)
} catch (IOException e) {
unsubscribe(); // (5)
}
}
@Override
public void onError(Throwable e) { } // (6)
@Override
public void onCompleted() { } // (7)
};
onCompletion(subscriber::unsubscribe); // (8)
onTimeout(subscriber::unsubscribe); // (9)
}
Subscriber<Temperature> getSubscriber() { // (10)
return subscriber;
}
}
The RxSeeEmitter extends the well-known SseEmitter. It also encapsulates a subscriber for Temperature events (1). In the constructor, RxSeeEmitter calls the super-class constructor with a necessary SSE session timeout (2) and also creates an instance of the Subscriber<Temperature> class (3). This subscriber reacts to the received onNext signals by resending them to an SSE client (4). In cases where the data sending fails, the subscriber unsubscribes itself from the incoming observable stream (5). In the current implementation, we know that the temperature stream is infinite and cannot produce any errors, so the onComplete() and onError() handlers are empty (6), (7), but in real applications, it is better to have some handlers there.
Lines (8) and (9) register cleanup actions for SSE session completion or timeout. The RxSeeEmitter subscribers should cancel the subscription. To use a subscriber, RxSeeEmitter exposes it by utilizing the getSubscriber() method (10).
- 社交網絡對齊
- 物聯網(IoT)基礎:網絡技術+協議+用例
- Truffle Quick Start Guide
- 電子政務效益的經濟分析與評價
- Web Application Development with R Using Shiny
- PLC、現場總線及工業網絡實用技術速成
- Mastering JavaFX 10
- 6G新技術 新網絡 新通信
- 物聯網長距離無線通信技術應用與開發
- WordPress Web Application Development
- 物聯網與智能家居
- 人人都該都懂的互聯網思維
- Microsoft Power Platform Enterprise Architecture
- 互聯網安全的40個智慧洞見(2016)
- 商業的本質和互聯網