官术网_书友最值得收藏!

Custom SseEmitter

By using TemperatureSensor, which exposes a stream using temperature values, we may subscribe each new SseEmitter to the Observable stream and send the received onNext signals to SSE clients. To handle errors and the closing of a proper HTTP connection, let's write the following SseEmitter extension:

class RxSeeEmitter extends SseEmitter {
static final long SSE_SESSION_TIMEOUT = 30 * 60 * 1000L;
private final Subscriber<Temperature> subscriber; // (1)

RxSeeEmitter() {
super(SSE_SESSION_TIMEOUT); // (2)

this.subscriber = new Subscriber<Temperature>() { // (3)
@Override
public void onNext(Temperature temperature) {
try {
RxSeeEmitter.this.send(temperature); // (4)
} catch (IOException e) {
unsubscribe(); // (5)
}
}

@Override
public void onError(Throwable e) { } // (6)

@Override
public void onCompleted() { } // (7)
};

onCompletion(subscriber::unsubscribe); // (8)
onTimeout(subscriber::unsubscribe); // (9)
}

Subscriber<Temperature> getSubscriber() { // (10)
return subscriber;
}
}

The RxSeeEmitter extends the well-known SseEmitter. It also encapsulates a subscriber for Temperature events (1). In the constructor, RxSeeEmitter calls the super-class constructor with a necessary SSE session timeout (2) and also creates an instance of the Subscriber<Temperature> class (3). This subscriber reacts to the received onNext signals by resending them to an SSE client (4). In cases where the data sending fails, the subscriber unsubscribes itself from the incoming observable stream (5). In the current implementation, we know that the temperature stream is infinite and cannot produce any errors, so the onComplete() and onError() handlers are empty (6), (7), but in real applications, it is better to have some handlers there.

Lines (8) and (9) register cleanup actions for SSE session completion or timeout. The RxSeeEmitter subscribers should cancel the subscription. To use a subscriber, RxSeeEmitter exposes it by utilizing the getSubscriber() method (10).

主站蜘蛛池模板: 淮安市| 开远市| 凌源市| 鹤岗市| 民权县| 万安县| 芦山县| 巴里| 清镇市| 长兴县| 南岸区| 凤山市| 额尔古纳市| 墨脱县| 抚顺县| 靖安县| 建瓯市| 汝阳县| 绍兴县| 徐州市| 古浪县| 棋牌| 孝义市| 保德县| 临城县| 邹城市| 华坪县| 肇庆市| 博爱县| 拜泉县| 融水| 沙湾县| 东兰县| 涟源市| 扬中市| 洛宁县| 西藏| 宜兰市| 武定县| 苍南县| 高台县|