官术网_书友最值得收藏!

Project Reactor version 1.x

When working on the Reactive Streams specification, developers from the Spring Framework team needed a high-throughput data processing framework, especially for the Spring XD project, the goal of which was to simplify the development of big data applications. To fulfill that need, the Spring team initiated a new project. From the beginning, it was designed with support for asynchronous, non-blocking processing. The team called it Project Reactor. Essentially, Reactor version 1.x incorporated best practices for message processing, such as the Reactor Pattern, and functional and reactive programming styles.

The Reactor Pattern is a behavioral pattern that helps with asynchronous event handling and synchronous processing. This means that all events are enqueued and the actual processing of an event happens later by a separate thread. An event is dispatched to all interested parties (event handlers) and processed synchronously. To learn more about the Reactor Pattern, please visit the following link: http://www.dre.vanderbilt.edu/~schmidt/PDF/reactor-siemens.pdf.

By embracing these techniques, Project Reactor version 1.x gives us the ability to write concise code such as the following:

Environment env = new Environment();                               // (1)
Reactor reactor = Reactors.reactor()                               // (2)
                          .env(env)                                //
                          .dispatcher(Environment.RING_BUFFER)     // (2.1)
                          .get();                                  //

reactor.on($("channel"),                                           // (3)
event -> System.out.println(event.getData())); //
Executors.newSingleThreadScheduledExecutor() // (4) .scheduleAtFixedRate( // () -> reactor.notify("channel", Event.wrap("test")), // 0, 100, TimeUnit.MILLISECONDS // ); //

In the preceding code, there are a couple of conceptual points:

  1. Here, we create an Environment instance. The Environment instance is an execution context, which is responsible for creating a particular Dispatcher. This may potentially provide different kinds of dispatchers, ranging from an interprocess dispatcher to distributed ones.
  2. An instance of Reactor is created, which is a direct implementation of the Reactor Pattern. In the preceding example code, we use the Reactors class, which is a fluent builder for concrete Reactor instances. At point (2.1)we use the predefined implementation of Dispatcher based on the RingBuffer structure. To learn more about the internals and overall design of RingBuffer-based Dispatcher, please visit the following link: https://martinfowler.com/articles/lmax.html.
  3. Here, the declaration of a channel Selector and an Event consumer occurs. At this point, we register an event handler (in this case, a lambda that prints all received events to System.out). The filtering of events happens using the string selector, which indicates the name of the event channel. Selectors.$ provides a broader selection of criteria, so a final expression for event selection may be more complicated.
  4. Here, we configure the producer of the Event in the form of a scheduled task. At that point, we use the possibilities of Java's ScheduledExecutorService to schedule periodic tasks that send Event to a specific channel in the previously instantiated Reactor instance.

Under the hood, events are processed by Dispatcher and then sent to destination points. Depending on the Dispatcher implementation, an event may be processed synchronously or asynchronously. This provides a functional decomposition and generally works in a similar way to the Spring Framework event processing approach. Furthermore, Reactor 1.x provides a bunch of helpful wrappers that allow us to compose events' processing with a clear flow:

...                                                                // (1)
Stream<String> stream = Streams.on(reactor, $("channel"));         // (2)
stream.map(s -> "Hello world " + s)                                // (3)
      .distinct()                                                  //
      .filter((Predicate<String>) s -> s.length() > 2)             //
      .consume(System.out::println);                               // (3.1)

Deferred<String, Stream<String>> input = Streams.defer(env);       // (4)

Stream<String> compose = input.compose()                           // (5)
compose.map(m -> m + " Hello World")                               // (6)
.filter(m -> m.contains("1")) // .map(Event::wrap) //
.consume(reactor.prepare("channel")); // (6.1) for (int i = 0; i < 1000; i++) { // (7) input.accept(UUID.randomUUID().toString()); // } //

Let's break down the preceding code:

  1. At this point we have an Environment and a Reactor creation, as in the previous example.
  2. Here we have Stream creation. Stream allows the building of functional transformation chains. By applying the Streams.on method to Reactor with a specified Selector, we receive a Stream object attached to the specified channel in the given Reactor instance.
  3. Here, the processing flow is created. We apply a few intermediate operations, such as map, filter, and consume. The last of these is a terminal operator (3.1).
  4. Here, a Deferred Stream is created. The Deferred class is a special wrapper that makes it possible to provide manual events to the Stream. In our case, the Stream.defer method creates an additional instance of the Reactor class.
  5. At this point we have a Stream instance creation. Here, we retrieve Stream from the Deferred instance by using the compose method on it.
  1. At this point we have a reactive processing flow creation. This part of the pipeline composition is similar to what we have at point (3). At point (6.1), we use the Reactor API shortcut for the code, as follows—e -> reactor.notify("channel", e)
  2. Here, we supply a random element to the Deferred instance.

In the preceding example, we subscribe to the channel and then process all incoming events step by step. In contrast, in that example, we use the reactive programming technique to build a declarative processing flow. Here, we provide two separate processing stages. Furthermore, the code looks like the well-known RxJava API, making it more familiar to RxJava users. At some point, Reactor 1.x had good integration with the Spring Framework. Along with the message processing library, Reactor 1.x provides a bunch of add-ons, such as the add-on for Netty.

To summarize, at that time, Reactor 1.x was good enough at processing events at high speed. With excellent integration with the Spring Framework and composition with Netty, it made it possible to develop high-performance systems that provide asynchronous and non-blocking message processing.

However, Reactor 1.x also has its disadvantages. First of all, the library has no backpressure control. Unfortunately, the event-driven implementation of Reactor 1.x did not offer a way to control backpressure other than blocking the producer thread or skipping events. Furthermore, error handling was quite complicated. Reactor 1.x provides several ways of handling errors and failures. Even though Reactor 1.x was rough around the edges, it was used by the popular Grails web framework. Of course, this significantly influenced the next iteration of the reactive library.

主站蜘蛛池模板: 罗源县| 全州县| 集安市| 吴忠市| 黄浦区| 青铜峡市| 土默特左旗| 惠州市| 民乐县| 灯塔市| 沈丘县| 阳江市| 渝北区| 雷波县| 阿城市| 会理县| 稻城县| 乐都县| 太和县| 铜陵市| 阿勒泰市| 阿城市| 白城市| 建宁县| 治县。| 甘泉县| 岳池县| 蒙山县| 广汉市| 京山县| 宜川县| 岳池县| 綦江县| 翁源县| 镇平县| 兰溪市| 泗阳县| 鄂托克前旗| 保定市| 江津市| 文登市|