官术网_书友最值得收藏!

Custom subscribers

In a certain scenario, calling a Subscribe method on Publisher is not appropriate and you may want to write custom subscriber with own handling. Reactor framework provides support for defining custom subscribers by extending the reactor.core.publisher.BaseSubscriber<T> abstract class. You don't need to implement the Subscribe interface of Reactive Streams specification directly. Instead, you need to just extend this class to apply the custom implementation as follows:

static class CustomSubscriber extends BaseSubscriber<String>{
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("Fetching the values ...!!");
subscription.request(10);
}
@Override
protected void hookOnNext(String value) {
System.out.println("Fetchig next value in hookOnNext()-->"+value);
}
@Override
protected void hookOnComplete() {
System.out.println("Congratulation, Everything is completed successfully ..!!");
}
@Override
protected void hookOnError(Throwable throwable) {
System.out.println("Opps, Something went wrong ..!! "+throwable.getMessage());
}
@Override
protected void hookOnCancel() {
System.out.println("Oh !!, Operation has been cancelled ..!! ");
}
@Override
protected void hookFinally(SignalType type) {
System.out.println("Shutting down the operation, Bye ..!! "+type.name());
}
}

The  BaseSubscriber class provides various hook methods, which represent the corresponding event. It is a placeholder to provide a custom implementation. Implementing these methods is similar to using various versions of the subscribe() method that we have seen in the Type of subscriber section. For example, if you only implement the hookOnNext, hookOnError , and hookOnComplete methods, then it is equivalent to the fourth version of subscribe().

The  hookOnSubscribe() method facilitates a subscription event. The backpressure is provided with subscription.request(). You can request as many element, as you want. For example, update the code for the hookOnSubscribe() method as follows:

 @Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("Fetching the values ...!!");
for(int index=0; index<6;index++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscription.request(1);
}
}

We are requesting records one-by-one by calling subscription.request(1) in a loop. To get an idea how it works, we put a two-second delay in between so you will get a record for every two requests. Once all data is completed, it will trigger the completion event and the hookOnComplete() method will be called. The output would be as follows:

主站蜘蛛池模板: 郎溪县| 唐山市| 叙永县| 江西省| 肇庆市| 湖口县| 柳林县| 山东| 囊谦县| 洛浦县| 临洮县| 西吉县| 保康县| 东山县| 桦南县| 体育| 南澳县| 桦川县| 沅陵县| 江城| 信丰县| 阿克| 横峰县| 达尔| 博白县| 南投县| 平阳县| 嵩明县| 阿瓦提县| 普陀区| 桃源县| 水城县| 怀安县| 武穴市| 铜鼓县| 凯里市| 施甸县| 通城县| 胶州市| 龙海市| 视频|