官术网_书友最值得收藏!

Subscription

The subscription is an important component in Reactive Streams. It provides the necessary control flow, so that publishers do not over-run a subscriber. This is known as backpressure.

Once the subscriber receives the subscription event, it must request that the publisher publish a specified count of events over their respective subscription. This is done by invoking the request(long) method of the subscription object.

As data events are generated, they are received by the subscriber. Once the limit has been reached, the publisher must stop publishing more events. As the subscriber processes these events, it must request additional events from the publisher:

public interface Subscription {
public void request(long n);
public void cancel();
}

The subscription object allows a subscriber to control the events it wants to receive. Whenever the subscriber determines that it no longer wants the events, it can invoke the cancel() method of the subscription. Once invoked, a subscriber may receive fewer data events, in accordance with the demand raised before the cancellation. Post-cancellation, the subscription will become void, meaning that it cannot be used to request additional data.

A value of Long.MaxValue for the request method would result in an infinite flow of events from the publisher.

A subscriber can cancel an active subscription with the onSubscribe() method before any demand can be raised using the request method. In this case, the publisher will drop the subscription without raising any events.

Now that we have gone over the subscriber interface in detail, we can try to build a FibonacciSubscriber, as follows:

public class FibonacciSubscriber implements Subscriber<Long> {
private Subscription sub;
@Override
public void onSubscribe(Subscription s) {
sub = s;
sub.request(10);
}
@Override
public void onNext(Long fibNumber) {
System.out.println(fibNumber);
sub.cancel();
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
sub=null;
}
@Override
public void onComplete() {
System.out.println("Finished");
sub=null;
}
}

The preceding implementation does the following things:

  1. Upon receiving the subscription event, a request is raised to handle 10 events.
  2. When received, all data events are printed to the output console.
  3. After processing a single event, the subscriber cancels the subscription.
  4. The onCompletion method sets the subscription to null.
  5. The onError method prints the error message to the console and sets the subscription as null.

Now, let's validate the subscriber by using the SubscriberBlackboxVerification<T> abstract class. We need to implement the createSubsciber() method, as shown in the following code:

public class FibonacciSubsciberVerification extends SubscriberBlackboxVerification<Long> {
public FibonacciSubsciberVerification(){
super(new TestEnvironment());
}
@Override
public Subscriber<Long> createSubscriber() {
return new FibonacciSubscriber();
}
@Override
public Long createElement(int element) {
return new Long(element);
}
}

Let's run the test case to determine whether our subscriber meets the Reactive Streams criteria:

Here, we can also find a large number of broken test cases. These broken test cases define the behavior for a subscriber. We could fix these, but the better option would be to use Reactor to create our services. In the following section, we will describe the publisher and subscriber implementations available in Reactor. These implementations conform to the specification behaviors.

主站蜘蛛池模板: 鹤壁市| 峨眉山市| 蒲城县| 隆子县| 皋兰县| 大理市| 诏安县| 和龙市| 安义县| 内乡县| 宣汉县| 金坛市| 上栗县| 义马市| 珠海市| 张家川| 剑河县| 岑巩县| 中牟县| 楚雄市| 灯塔市| 昌黎县| 县级市| 宝清县| 普陀区| 凤城市| 元朗区| 普格县| 衡南县| 仙居县| 古浪县| 寻乌县| 高平市| 永登县| 兰州市| 株洲市| 朝阳市| 开封县| 剑阁县| 札达县| 延寿县|