- Spring 5.0 Projects
- Nilang Patel
- 353字
- 2021-07-02 12:35:06
Custom subscribers
In a certain scenario, calling a Subscribe method on Publisher is not appropriate and you may want to write custom subscriber with own handling. Reactor framework provides support for defining custom subscribers by extending the reactor.core.publisher.BaseSubscriber<T> abstract class. You don't need to implement the Subscribe interface of Reactive Streams specification directly. Instead, you need to just extend this class to apply the custom implementation as follows:
static class CustomSubscriber extends BaseSubscriber<String>{
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("Fetching the values ...!!");
subscription.request(10);
}
@Override
protected void hookOnNext(String value) {
System.out.println("Fetchig next value in hookOnNext()-->"+value);
}
@Override
protected void hookOnComplete() {
System.out.println("Congratulation, Everything is completed successfully ..!!");
}
@Override
protected void hookOnError(Throwable throwable) {
System.out.println("Opps, Something went wrong ..!! "+throwable.getMessage());
}
@Override
protected void hookOnCancel() {
System.out.println("Oh !!, Operation has been cancelled ..!! ");
}
@Override
protected void hookFinally(SignalType type) {
System.out.println("Shutting down the operation, Bye ..!! "+type.name());
}
}
The BaseSubscriber class provides various hook methods, which represent the corresponding event. It is a placeholder to provide a custom implementation. Implementing these methods is similar to using various versions of the subscribe() method that we have seen in the Type of subscriber section. For example, if you only implement the hookOnNext, hookOnError , and hookOnComplete methods, then it is equivalent to the fourth version of subscribe().
The hookOnSubscribe() method facilitates a subscription event. The backpressure is provided with subscription.request(). You can request as many element, as you want. For example, update the code for the hookOnSubscribe() method as follows:
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("Fetching the values ...!!");
for(int index=0; index<6;index++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscription.request(1);
}
}
We are requesting records one-by-one by calling subscription.request(1) in a loop. To get an idea how it works, we put a two-second delay in between so you will get a record for every two requests. Once all data is completed, it will trigger the completion event and the hookOnComplete() method will be called. The output would be as follows:

- 6G潛在關鍵技術(下冊)
- SEO 20日
- 電子政務效益的經濟分析與評價
- INSTANT PhpStorm Starter
- Oracle SOA Suite 11g Performance Tuning Cookbook
- 局域網組建、管理與維護項目教程(Windows Server 2003)
- 網絡的琴弦:玩轉IP看監控
- WordPress Web Application Development
- 轉化:提升網站流量和轉化率的技巧
- Implementing NetScaler VPX?
- Hands-On Bitcoin Programming with Python
- 5G技術核心與增強:從R15到R17
- 現代通信系統(第5版)
- Qt5 Python GUI Programming Cookbook
- 物聯網與智慧農業