官术网_书友最值得收藏!

Custom subscribers

In a certain scenario, calling a Subscribe method on Publisher is not appropriate and you may want to write custom subscriber with own handling. Reactor framework provides support for defining custom subscribers by extending the reactor.core.publisher.BaseSubscriber<T> abstract class. You don't need to implement the Subscribe interface of Reactive Streams specification directly. Instead, you need to just extend this class to apply the custom implementation as follows:

static class CustomSubscriber extends BaseSubscriber<String>{
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("Fetching the values ...!!");
subscription.request(10);
}
@Override
protected void hookOnNext(String value) {
System.out.println("Fetchig next value in hookOnNext()-->"+value);
}
@Override
protected void hookOnComplete() {
System.out.println("Congratulation, Everything is completed successfully ..!!");
}
@Override
protected void hookOnError(Throwable throwable) {
System.out.println("Opps, Something went wrong ..!! "+throwable.getMessage());
}
@Override
protected void hookOnCancel() {
System.out.println("Oh !!, Operation has been cancelled ..!! ");
}
@Override
protected void hookFinally(SignalType type) {
System.out.println("Shutting down the operation, Bye ..!! "+type.name());
}
}

The  BaseSubscriber class provides various hook methods, which represent the corresponding event. It is a placeholder to provide a custom implementation. Implementing these methods is similar to using various versions of the subscribe() method that we have seen in the Type of subscriber section. For example, if you only implement the hookOnNext, hookOnError , and hookOnComplete methods, then it is equivalent to the fourth version of subscribe().

The  hookOnSubscribe() method facilitates a subscription event. The backpressure is provided with subscription.request(). You can request as many element, as you want. For example, update the code for the hookOnSubscribe() method as follows:

 @Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("Fetching the values ...!!");
for(int index=0; index<6;index++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscription.request(1);
}
}

We are requesting records one-by-one by calling subscription.request(1) in a loop. To get an idea how it works, we put a two-second delay in between so you will get a record for every two requests. Once all data is completed, it will trigger the completion event and the hookOnComplete() method will be called. The output would be as follows:

主站蜘蛛池模板: 手游| 荃湾区| 长白| 顺平县| 大港区| 托克逊县| 湛江市| 盘山县| 江川县| 上饶县| 镇原县| 潮州市| 应用必备| 增城市| 台东县| 涿鹿县| 新蔡县| 读书| 鹤山市| 萨嘎县| 伊春市| 孝感市| 澄城县| 锡林郭勒盟| 武安市| 阜南县| 东明县| 安阳县| 陆丰市| 个旧市| 鹰潭市| 泸水县| 锦屏县| 腾冲县| 景德镇市| 永福县| 琼海市| 宜章县| 乐陵市| 奉新县| 集贤县|