官术网_书友最值得收藏!

Custom subscribers

In a certain scenario, calling a Subscribe method on Publisher is not appropriate and you may want to write custom subscriber with own handling. Reactor framework provides support for defining custom subscribers by extending the reactor.core.publisher.BaseSubscriber<T> abstract class. You don't need to implement the Subscribe interface of Reactive Streams specification directly. Instead, you need to just extend this class to apply the custom implementation as follows:

static class CustomSubscriber extends BaseSubscriber<String>{
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("Fetching the values ...!!");
subscription.request(10);
}
@Override
protected void hookOnNext(String value) {
System.out.println("Fetchig next value in hookOnNext()-->"+value);
}
@Override
protected void hookOnComplete() {
System.out.println("Congratulation, Everything is completed successfully ..!!");
}
@Override
protected void hookOnError(Throwable throwable) {
System.out.println("Opps, Something went wrong ..!! "+throwable.getMessage());
}
@Override
protected void hookOnCancel() {
System.out.println("Oh !!, Operation has been cancelled ..!! ");
}
@Override
protected void hookFinally(SignalType type) {
System.out.println("Shutting down the operation, Bye ..!! "+type.name());
}
}

The  BaseSubscriber class provides various hook methods, which represent the corresponding event. It is a placeholder to provide a custom implementation. Implementing these methods is similar to using various versions of the subscribe() method that we have seen in the Type of subscriber section. For example, if you only implement the hookOnNext, hookOnError , and hookOnComplete methods, then it is equivalent to the fourth version of subscribe().

The  hookOnSubscribe() method facilitates a subscription event. The backpressure is provided with subscription.request(). You can request as many element, as you want. For example, update the code for the hookOnSubscribe() method as follows:

 @Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("Fetching the values ...!!");
for(int index=0; index<6;index++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscription.request(1);
}
}

We are requesting records one-by-one by calling subscription.request(1) in a loop. To get an idea how it works, we put a two-second delay in between so you will get a record for every two requests. Once all data is completed, it will trigger the completion event and the hookOnComplete() method will be called. The output would be as follows:

主站蜘蛛池模板: 格尔木市| 涟水县| 长子县| 新和县| 丰都县| 黄石市| 天气| 从化市| 博野县| 光泽县| 尉犁县| 闸北区| 四会市| 沽源县| 宁明县| 神池县| 庆云县| 锡林浩特市| 岗巴县| 无为县| 广南县| 红河县| 华坪县| 盐源县| 曲水县| 洛川县| 侯马市| 寻乌县| 阳新县| 琼结县| 安西县| 游戏| 怀来县| 松溪县| 阿鲁科尔沁旗| 宣恩县| 郧西县| 招远市| 阿尔山市| 定结县| 青州市|