- Hands-On Reactive Programming with Reactor
- Rahul Sharma
- 506字
- 2021-08-13 15:22:55
Subscription
The subscription is an important component in Reactive Streams. It provides the necessary control flow, so that publishers do not over-run a subscriber. This is known as backpressure.
Once the subscriber receives the subscription event, it must request that the publisher publish a specified count of events over their respective subscription. This is done by invoking the request(long) method of the subscription object.
As data events are generated, they are received by the subscriber. Once the limit has been reached, the publisher must stop publishing more events. As the subscriber processes these events, it must request additional events from the publisher:
public interface Subscription {
public void request(long n);
public void cancel();
}
The subscription object allows a subscriber to control the events it wants to receive. Whenever the subscriber determines that it no longer wants the events, it can invoke the cancel() method of the subscription. Once invoked, a subscriber may receive fewer data events, in accordance with the demand raised before the cancellation. Post-cancellation, the subscription will become void, meaning that it cannot be used to request additional data.
A subscriber can cancel an active subscription with the onSubscribe() method before any demand can be raised using the request method. In this case, the publisher will drop the subscription without raising any events.
Now that we have gone over the subscriber interface in detail, we can try to build a FibonacciSubscriber, as follows:
public class FibonacciSubscriber implements Subscriber<Long> {
private Subscription sub;
@Override
public void onSubscribe(Subscription s) {
sub = s;
sub.request(10);
}
@Override
public void onNext(Long fibNumber) {
System.out.println(fibNumber);
sub.cancel();
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
sub=null;
}
@Override
public void onComplete() {
System.out.println("Finished");
sub=null;
}
}
The preceding implementation does the following things:
- Upon receiving the subscription event, a request is raised to handle 10 events.
- When received, all data events are printed to the output console.
- After processing a single event, the subscriber cancels the subscription.
- The onCompletion method sets the subscription to null.
- The onError method prints the error message to the console and sets the subscription as null.
Now, let's validate the subscriber by using the SubscriberBlackboxVerification<T> abstract class. We need to implement the createSubsciber() method, as shown in the following code:
public class FibonacciSubsciberVerification extends SubscriberBlackboxVerification<Long> {
public FibonacciSubsciberVerification(){
super(new TestEnvironment());
}
@Override
public Subscriber<Long> createSubscriber() {
return new FibonacciSubscriber();
}
@Override
public Long createElement(int element) {
return new Long(element);
}
}
Let's run the test case to determine whether our subscriber meets the Reactive Streams criteria:

Here, we can also find a large number of broken test cases. These broken test cases define the behavior for a subscriber. We could fix these, but the better option would be to use Reactor to create our services. In the following section, we will describe the publisher and subscriber implementations available in Reactor. These implementations conform to the specification behaviors.
- Hands-On Deep Learning with Apache Spark
- 控制與決策系統仿真
- Hadoop 2.x Administration Cookbook
- Mastering D3.js
- 數據挖掘實用案例分析
- 空間傳感器網絡復雜區域智能監測技術
- Security Automation with Ansible 2
- Ceph:Designing and Implementing Scalable Storage Systems
- 精通數據科學算法
- 學會VBA,菜鳥也高飛!
- 工業機器人運動仿真編程實踐:基于Android和OpenGL
- 悟透JavaScript
- SAP Business Intelligence Quick Start Guide
- 分析力!專業Excel的制作與分析實用法則
- 從零開始學Java Web開發