- Hands-On Reactive Programming in Spring 5
- Oleh Dokuka Igor Lozynskyi
- 529字
- 2021-07-23 16:36:20
Exposing the SSE endpoint
The next step requires adding the TemperatureController class with the @RestController annotation, which means that the component is used for HTTP communication, as shown in the following code:
@RestController
public class TemperatureController {
private final Set<SseEmitter> clients = // (1)
new CopyOnWriteArraySet<>();
@RequestMapping(
value = "/temperature-stream", // (2)
method = RequestMethod.GET)
public SseEmitter events(HttpServletRequest request) { // (3)
SseEmitter emitter = new SseEmitter(); // (4)
clients.add(emitter); // (5)
// Remove emitter from clients on error or disconnect
emitter.onTimeout(() -> clients.remove(emitter)); // (6)
emitter.onCompletion(() -> clients.remove(emitter)); // (7)
return emitter; // (8)
}
@Async // (9)
@EventListener // (10)
public void handleMessage(Temperature temperature) { // (11)
List<SseEmitter> deadEmitters = new ArrayList<>(); // (12)
clients.forEach(emitter -> {
try {
emitter.send(temperature, MediaType.APPLICATION_JSON); // (13)
} catch (Exception ignore) {
deadEmitters.add(emitter); // (14)
}
});
clients.removeAll(deadEmitters); // (15)
}
}
Now, to understand the logic of the TemperatureController class, we need to describe the SseEmitter. Spring Web MVC provides that class with the sole purpose of sending SSE events. When a request-handling method returns the SseEmitter instance, the actual request processing continues until SseEnitter.complete(), an error, or a timeout occurs.
The TemperatureController provides one request handler (3) for the URI /temperature-stream (2) and returns the SseEmitter (8). In the case when a client requests that URI, we create and return the new SseEmitter instance (4) with its previous registration in the list of the active clients (5). Furthermore, the SseEmitter constructor may consume the timeout parameter.
For the clients' collection, we may use the CopyOnWriteArraySet class from the java.util.concurrent package (1). Such an implementation allows us to modify the list and iterate over it at the same time. When a web client opens a new SSE session, we add a new emitter to the clients' collection. The SseEmitter removes itself from the clients' list when it has finished processing or has reached timeout (6) (7).
Now, having a communication channel with clients means that we need to be able to receive events about temperature changes. For that purpose, our class has a handleMessage() method (11). It is decorated with the @EventListener annotation (10) in order to receive events from Spring. This framework will invoke the handleMessage() method only when receiving Temperature events, as this type of method's argument is known as temperature. The @Async annotation (9) marks a method as a candidate for the asynchronous execution, so it is invoked in the manually configured thread pool. The handleMessage() method receives a new temperature event and asynchronously sends it to all clients in JSON format in parallel for each event (13). Also, when sending to individual emitters, we track all failing ones (14) and remove them from the list of the active clients (15). Such an approach makes it possible to spot clients that are not operational anymore. Unfortunately, SseEmitter does not provide any callback for handling errors, and can be done by handling errors thrown by the send() method only.
- 網(wǎng)管員典藏書架:網(wǎng)絡(luò)管理與運維實戰(zhàn)寶典
- 計算機網(wǎng)絡(luò)與數(shù)據(jù)通信
- PLC、現(xiàn)場總線及工業(yè)網(wǎng)絡(luò)實用技術(shù)速成
- 區(qū)塊鏈輕松上手:原理、源碼、搭建與應(yīng)用
- 雷達饋線技術(shù)
- 圖解手機元器件維修技巧
- 6G新技術(shù) 新網(wǎng)絡(luò) 新通信
- CCNP TSHOOT(642-832)認證考試指南
- 通信原理及MATLAB/Simulink仿真
- Practical Web Penetration Testing
- 5G時代的大數(shù)據(jù)技術(shù)架構(gòu)和關(guān)鍵技術(shù)詳解
- 園區(qū)網(wǎng)絡(luò)架構(gòu)與技術(shù)
- Getting Started with tmux
- Python API Development Fundamentals
- 黑客心理學(xué):社會工程學(xué)原理