官术网_书友最值得收藏!

Stream publisher

As we discussed in the previous chapter, the publisher is responsible for the generation of unbounded asynchronous events, and it pushes them to the associated subscribers. It is represented by the org.reactivestreams.Publisher interface, as follows:

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

The interface provides a single subscribe method. The method is invoked by any party that is interested in listening to events published by the publisher. The interface is quite simple, and it can be used to publish any type of event, be it a UI event (like a mouse-click) or a data event.

Since the interface is simple, let's add an implementation for our custom FibonacciPublisher:

public class FibonacciPublisher implements Publisher<Integer> {
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
int count = 0, a = 0, b = 1;
while (count < 50) {
int sum = a + b;
subscriber.onNext(b);
a = b;
b = sum;
count++;
}
subscriber.onComplete();
}
}

This implementation may look good, but does it comply to publisher behavior according to the specification? The specification prescribes rules that describe publisher behavior. A publisher must generate the following four types of events:

  • Subscription event
  • Data of type T, as declared by the publisher
  • Completion event
  • Error event

According to the specification, a publisher can emit any number of data events. However, it must publish only one event for completion, error, and subscription. Once a completion or an error event is published, the publisher can no longer send data events back to a subscriber.

As backpressure is an important aspect of the specification, a publisher cannot push an arbitrary number of events to a subscriber. Instead, the subscriber must specify how many events it can receive, and a publisher must publish events equal to, or less than, the specified number.

In order to validate a publisher, the Reactive Streams API has published a test compatibility kit. Let's add the reactive-streams-tck in the build.gradle to our project:

dependencies {
// rest removed for brevity
testCompile group: 'org.reactivestreams',
name: 'reactive-streams-tck', version: '1.0.2'
}

The Technology Compatibility Kit (TCK) provides a PublisherVerifier interface that must be implemented in order to validate a publisher. It provides the following two methods:

  • createPublisher(long): This method must provide an instance of the publisher that can produce the specified number of events
  • createFailedPublisher(): This method must try to build a publisher that has raised an error event

Let's add the following implementation to test our FibonacciPublisher:

public class FibonacciPublisherVerifier extends PublisherVerification<Integer> {
public FibonacciPublisherVerifier(){
super(new TestEnvironment());
}
@Override
public Publisher<Integer> createFailedPublisher() {
return null;
}
@Override
public Publisher<Integer> createPublisher(long elements) {
return new FibonacciPublisher();
}
}

Now, let's run the test case to determine whether we comply with the Reactive Streams publisher specification:

As shown in the preceding screenshot, there are around 20 test failures and 16 skipped tests. We could fix each one of them, but the aim here is to understand that even a simple interface of a publisher is governed by many behavior specifications. Therefore, it is overkill to build a custom publisher. As service builders, we can use the Reactor framework. This provides publisher implementations capable of publishing any kind of data.

主站蜘蛛池模板: 阿合奇县| 宁德市| 沙洋县| 东乡族自治县| 沙坪坝区| 张北县| 宜阳县| 苍南县| 鄂伦春自治旗| 宁陵县| 桃园市| 页游| 鲁山县| 定安县| 玉溪市| 科技| 顺昌县| 花莲市| 兴国县| 灯塔市| 民勤县| 常德市| 辽宁省| 钦州市| 胶州市| 固安县| 盐边县| 舒兰市| 内黄县| 青铜峡市| 濉溪县| 淮阳县| 呈贡县| 惠安县| 内江市| 民丰县| 扶绥县| 兴海县| 银川市| 施秉县| 石泉县|