- PHP Reactive Programming
- Martin Sikora
- 3420字
- 2021-07-09 19:06:17
Components of RxPHP
Since this chapter is going to be mostly about Observables, observers and operators, we're going to start with them.
We've already seen a sneak peak in this chapter, and now we'll go into more detail.
Observables
Observables emit items. In other words, Observables are sources of values. Observers can subscribe to Observables in order to be notified when the next item is ready, all items have been emitted, or an error has occurred.
The main difference between an Observable (in the sense of reactive programming) and the observer pattern is that an Observable can tell you when all of the data has been emitted and when an error occurs. All three types of events are consumed by observers.
RxPHP comes with several basic types of Observables for general usage. Here are a few that are easy to use:
ArrayObservable
: This creates an Observable from an array and emits all values right after the first observer subscribes.RangeObservable
: This generates a sequence of numbers from a predefined range.IteratorObservable
: This iterates and emits each item in the iterable. This can be any array wrapped as Iterator. Consider the following example, where we iterate an array instead of usingArrayObservable
:$fruits = ['apple', 'banana', 'orange', 'raspberry']; new IteratorObservable(new ArrayIterator($fruits));
Note that this also includes generators. Consider another example with an anonymous function and
yield
keyword.$iterator = function() use ($fruits) { foreach ($fruits as $fruit) { yield $fruit; } }; new IteratorObservable($iterator()) ->subscribe(new DebugSubject());
Calling the $iterator()
function returns an instance of a Generator
class that implements the Iterator interface. However, these basic Observables are good mostly for demonstration purposes and are not very practical in real-world usage. In a PHP environment, we can't create Observables from mouse events as in JavaScript and RxJS, so we'll have to learn how to write custom Observables very soon in this chapter in order to create some real-world examples. In Chapter 3, Writing a Reddit Reader with RxPHP, we'll learn about the Observable::create()
static method to create Observables with some basic custom logic. But, more on that later.
Observables can be pided into two groups based on when they start emitting values:
- Hot: In this group, values are emitted even when there are no observers subscribed. This is, for example,
Rx.Observable.fromEvent
from RxJS that we used in Chapter 1 , Introduction to Reactive Programming. This creates an Observable from any JavaScript event. Values are emitted immediately, so when you subscribe to this Observable some time later, you receive only new values and no previously emitted values. - Cold: In this group, values are emitted when at least one observer has been subscribed. This is, for example, RxPHP's
ArrayObservable
. It creates an Observable and, every time we subscribe, we receive all values passed as an input to thefromArray()
method.
All built-in Observables in RxPHP can be instantiated easily by calling static methods from the Rx\Observable
namespace. The following list represents the three Observables mentioned above:
- The
RxObservable::fromArray()
method returnsRx\Observable\ArrayObservable
- The
RxObservable::range()
method returnsRx\Observable\RangeObservable
- The
RxObservable::fromIterator()
method returnsRx\Observable\IteratorObservable
Don't be surprised that static method names don't necessarily match returned class names. Also, it's usually easier to use static calls than to instantiate Observables directly.
Observers
Observers are consumers of Observables. In other words, observers react to Observables. We've already seen the CallbackObserver
class, which takes three optional arguments representing callables for each type of signal.
Consider a similar example that we used at the end of Chapter 1 , Introduction to Reactive Programming, where we defined our observer:
$observer = new Rx\Observer\CallbackObserver(function($value) { printf("%s\n", $value); }, function() { print("onError\n"); }, function() { print("onCompleted\n"); });
The CallbackObserver
class lets us create a custom observer without necessarily extending the base class. Its constructor takes three optional arguments:
onNext
: This callable is called when a new item from the source Observable is emitted. This is the most common callback we'll use.onComplete
: This callable is called when there are no items left and the Observable is done emitting items. Some Observables produce an infinite number of items and this callback is never called.onError
: This callable is called when an error has occurred somewhere in the chain.
We can write the same example in a more reusable form to quickly test what's going on inside Observable chains:
// rxphp_03.php $fruits = ['apple', 'banana', 'orange', 'raspberry']; class PrintObserver extends Rx\Observer\AbstractObserver { protected function completed() { print("Completed\n"); } protected function next($item) { printf("Next: %s\n", $item); } protected function error(Exception $err) { $msg = $err->getMessage(); printf("Error: %s\n", $msg); } } $source = Rx\Observable::fromArray($fruits); $source->subscribe(new PrintObserver());
When extending AbstractObserver
, the methods we need to implement are completed()
, next()
, and error()
, with the same functionality as described previously.
We're using the subscribe()
method to subscribe an observer to an Observable.
There's also the subscribeCallback()
method that takes just three callables as arguments. Since RxPHP 2, the subscribeCallback()
method is deprecated and its functionality has been merged with subscribe()
.
This means that, in RxPHP 2, we can also write the following code:
$source->subscribe(function($item) { printf("Next: %sn", $item); });
We made a single callable instead of subscribing with an observer. This handles only onNext
signals.
Singles
Singles are like Observables; the only difference is that they always emit just one value. In RxPHP, we don't distinguish any difference between Observables and Singles, so we can use the Observable::just()
static method:
// single_01.php/ require __DIR__ . '/PrintObserver.php'; RxObservable::just(42) ->subscribe(new PrintObserver());
This creates a new Observable that calls onNext()
with the value 42
, and immediately after that onComplete()
. The output for this very simple example is the following:
$ php single_01.php Next: 42 Completed
Similar to the preceding explanation, calling RxObservable::just()
static method returns an instance of Rx\Observable\ReturnObservable
.
Note
The term "Single" was used mostly in RxJS 4. Since RxPHP was originally ported from RxJS 4, and later also took things from RxJS 5, you might encounter this term sometimes. If you're familiar only with RxJS 5, then you've probably never heard of it. Nonetheless, we'll always refer to all sources of values as Observables, even when they emit just a single, or no value at all.
Subject
The Subject
is a class that acts as an Observable and observer at the same time. This means that it can subscribe to an Observable just like an observer, and also emit values like an Observable does. Eventually, it can also emit its own values independently of its source Observable.
In order to see how the Subject
class can be used in different situations, we'll work through three examples based on the same example we used at the beginning of this chapter.
We can use a Subject
class instead of an Observable. However, we need to emit items manually by calling onNext()
on the Subject
instance:
// subject_01.php use Rx\Subject\Subject; $subject = new Subject(); $subject ->map(function($value) { return strlen($value); }) ->filter(function($len) { return $len > 5; }) ->subscribe(new PrintObserver()); $subject->onNext('apple'); $subject->onNext('banana'); $subject->onNext('orange'); $subject->onNext('raspberry');
This code produces the same output as the original example with Observable:
$ php subject_01.php Next: 6 Next: 6 Next: 9
Another use case could be using Subject
to subscribe to an Observable. We'll reuse the PrintObserver
class we made a moment ago to print all of the items and notifications that went through the Subject
instance:
// subject_02.php use Rx\Subject\Subject; use Rx\Observable; $subject = new Subject(); $subject->subscribe(new PrintObserver()); $fruits = ['apple', 'banana', 'orange', 'raspberry']; Observable::fromArray($fruits) ->map(function($value) { return strlen($value); }) ->filter(function($len) { return $len > 5; }) ->subscribe($subject);
Notice that we subscribed PrintObserver
to the Subject
and then subscribed the Subject
at the end of the operator chain. As we can see, by default the Subject
class just passes through both items and notifications. The output is the same as in the previous example.
The final situation we want to demonstrate is using an instance of Subject in the middle of an operator chain:
// subject_03.php use Rx\Subject\Subject; use Rx\Observable; $fruits = ['apple', 'banana', 'orange', 'raspberry']; $subject = new Subject(); $subject ->filter(function($len) { return $len > 5; }) ->subscribe(new PrintObserver()); Observable::fromArray($fruits) ->map(function($value) { return strlen($value); }) ->subscribe($subject);
Yet again, the console output is the same.
Later in this chapter, we'll write the DebugSubject
class, that we'll use many times throughout this book, to quickly see what's going on in our Observable chains.
Disposable
All Rx implementations internally use the Dispose pattern. This design decision has two reasons:
- To be able to unsubscribe from an Observable
- To be able to release all data used by that Observable
For example, if we had an Observable that downloads a large file from the Internet and saves it to a temporary location until it's completely downloaded, we'd like to remove the temporary file if its observer unsubscribed, or any error occurred.
There're already a couple of classes available out-of-the-box with RxPHP, each with a different purpose. We don't need to worry about Disposables right now. We'll have a look at how they are used inside built-in Observables and operators in the next Chapter 3, Writing a Reddit Reader with RxPHP.
Note
You can read more about the dispose pattern on Wikipedia https://en.wikipedia.org/wiki/Dispose_pattern or, more specifically, why it's used in reactive extensions on StackOverflow http://stackoverflow.com/a/7707768/310726 .
However, it's good to know that something like releasing resources in Rx is important and we need to aware of it.
Scheduler
Observables and operators usually don't execute their work directly, but use an instance of the Scheduler
class to decide how and when it should be executed.
In practice, a Scheduler
receives an action as an anonymous function and schedules its execution according to its internal logic. This is particularly relevant to all Observables and operators that need to work with time. For example, all delayed or periodical emissions need to schedule via a Scheduler.
In languages such as JavaScript, this is relatively simple with, for example, the setTimeout()
function and the event-based nature of JavaScript interpreters. However, in PHP, where all code is executed strictly sequentially, we'll have to use an event loop.
In most situations in RxPHP, we don't have to even worry about Schedulers because, if not set differently, all Observables and operators internally use the ImmediateScheduler
class, which executes all actions immediately without any further logic.
We'll encounter Schedulers once more at the end of this chapter, when talking about event loops.
In Chapter 6, PHP Streams API and Higher-Order Observables, we'll go into much more detail about event loops in PHP. We'll also talk about the Event Loop Interopability specification (https://github.com/async-interop/event-loop) and how it's related to RxPHP.
Note
In RxPHP 2, using Schedulers has been significantly simplified and, most of the time, we don't need to worry about event loops at all, as we'll see in Chapter 6, PHP Streams API and Higher-Order Observables.
Operators
We've used operators already without any further explanation, but now that we know how to use Observables, observers, and Subjects, it's time to see how operators glue this all together.
The core principle of Rx is using various operators to modify data flow. Typically, an operator returns another Observable and therefore allows the chaining of operator calls.
In Rx, there are tons of operators, and in RxPHP in particular, there are about 40 already. Other implementations such as RxJS have even more. Those include all we saw in the previous chapter when talking about functional programming, such as map()
, filter()
, and a lot more. This also includes operators for very specific use cases, such as merge()
, buffer()
, or retry()
, just to name a few.
The process of creating operator chains is a little more complicated under the hood than it seems. We don't need to worry about it for now because we'll talk about it again in Chapter 3, Writing a Reddit Reader with RxPHP. Before we start using more advanced operators in practice, we should have a look at how each operator is described in the documentation. This is mostly because some functionality isn't obvious at first sight and, when it comes to asynchronous events, it's sometimes hard to understand what each operator does.
Understanding the operator diagrams
Each operator is described in the documentation using a diagram called the marble diagram, where each marble represents an emitted value.
The filter() operator
First, we'll have a look at how the filter()
operator is defined. We used the PHP function array_filter()
in the previous chapter, so we know that it takes values and a predicate function as input. Then it evaluates each value with the predicate and, based on whether it returns true or false, it adds or skips the value in its response array. The behavior of the filter()
operator is the same, it just works with data flows instead of arrays. This means it receives items from its source (the preceding Observable) and propagates them to its consequent observer (or chained operator).
Using a marble diagram, it will look like the following figure:

Marble diagram representing the filter() Operator from http://reactivex.io/documentation/operators/filter.html
Let's exaplain this diagram in more detail:
- At the top and bottom, we have two timelines that represent Observables. The arrow in the top right corner suggests that time goes from left to right.
- We can think of everything above the rectangle as input Observable and everything below the rectangle as output Observable. There're usually one or more input and only one output.
- Each circle (marble) represents a single value in time emitted by its respective Observable. The number inside each circle stands for its value. All values are ordered by the time they were emitted, which goes from left to right. Different colors are used to make it obvious that values at the top and bottom are the same (for example the blue "30" at the top is the same value as the bottom "30").
- The rectangle in the middle represents the transformation between the top and bottom Observables. Its functionality is usually described in words or pseudocode. In this case, we have an expression that looks like ES6 syntax, which says that it returns
true
ifx
is greater than10
. Rewritten to PHP, it's equal to the following:
function($x) { return $x > 10; }
- The bottom line, therefore, only contains circles with a value greater than
10
. - Vertical lines on the right side of each line mark the point where these Observables complete. This means they have emitted all values and sent an
onComplete
notification. Thefilter()
operator has no effect on theonComplete
notification, so both Observables end at the same time.
This was pretty simple. Marble diagrams are a very comfortable way of representing data flows without worrying about implementation details (this reminds us of declarative programming, as we defined it in the first chapter, doesn't it?).
In some diagrams, you can also see a cross sign on the timeline, which represents an error (an onError
notification to be precise). We'll see further on in this chapter that we can work with onComplete
and onError
notifications just as with onNext
.
The debounceTime() operator
Let's have a look at another diagram. This time we have a debounceTime()
operator from RxJS 5, which we saw in the first chapter, in the Autocomplete with RxJS example:

Marble diagram representing the debounceTime() operator from http://reactivex.io/documentation/operators/debounce.html
In the rectangle in the middle, we don't have any pseudocode this time; just a single expression debounceTime(20)
. Well, in order to figure out what it does, we need to look at the documentation, or try to analyze the diagram.
When the debounceTime()
operator receives a value, it waits a certain interval before reemitting it. If any other values arrive before the interval expires, the original value is discarded and the later value is used instead; the interval is restarted as well. This can go on for an infinite number of values.
The diagram exactly describes the previous paragraph:
- First, value a arrives. The transformation function waits until 20ms interval expires, and after that, the operator reemits the value further. The interval is represented by shifting the bottom values on the timeline slightly to the right. As we said previously, the horizontal lines represent values in time. When the bottom circle, labeled a, is shifted to the right, it means this event happened after the top a circle.
- Then, two more values arrive, both of them in a very short time. The first one is discarded, but after the second one, there's another longer time gap, so only the second value gets reemitted.
- The process with the last value d is analogous to the first one.
This operator is useful when we know we can ignore some events that occur quickly after one another. A prime example is using debounceTime()
for autocomplete features when we want to start searching after a user has stopped typing a keyword.
The concat operator
Now we can have a look at a slightly more complicated operator, which is concat()
. Look at the following diagram and try to guess what it does:

Marble diagram representing the concat() operator from http://reactivex.io/documentation/operators/concat.html
Let's analyze this together before looking to the documentation:
- At the top, we have two Observables as inputs to the operator.
- Both Observables should emit a value at the same time, but only the value from the first Observable is passed through. The same applies for the second and third values as well.
- Then the first Observable reaches the end and sends an
onComplete
notification. - Right after that, the operator starts emitting values from the second Observable.
The concat()
operator merges multiple Observables into one. It internally subscribes to each input Observable in order, one after another. This means that, when the first Observable completes, it subscribes to the next one. It's important to know that there's only ever one source Observable subscribed at a time (we'll work with concat()
and a similar merge()
operator in Chapter 4, Reactive versus a typical Event-Driven Approach).
In other words, the concat()
operator concatenates multiple data streams into a single stream.
In the first chapter, we talked about functional programming and how most principles are the same in reactive programming. Implementing such a feature would be rather complicated because there's no built-in PHP function designed to deal with such a use case.
If we go back to the first chapter once more, we said that one key concept of reactive programming is to "express data flows with ease". This operator shows what that means in action.
Other common operators
These were only three operators out of more than 40 available in RxPHP. Apart from very simple ones like filter()
and map()
, there're also more sophisticated ones. We've seen concat()
already, but here are a few interesting ones that we'll use in further chapters:
buffer()
: This operator has multiple variants, but all of them collect received values and reemits them in groups of a predefined size. For example, we can create groups of three items as follows:Rx\Observable::range(1, 4) ->bufferWithCount(3) ->subscribe(new DebugSubject());
Which prints the following output:
13:58:13 [] onNext: [1, 2, 3] (array) 13:58:13 [] onNext: [4] (array) 13:58:13 onCompleted
Note
Note that the last array contains just one value because the Observable sent an
onComplete
notification.merge()
: This operator merges all input Observables into a single output Observable, reemitting all values immediately (in contrast toconcat()
).distinct()
: This operator reemits only those values that haven't passed this operator before.take()
: This operator reemits only a set number of values that arrive to the operator first, then sends anonComplete
notification.retry()
: When source Observable sendsonError
, this operator tries to resubscribe automatically. You can also tell it to retry only a limited number of times until signalingonError
(we'll use this operator in Chapter 4, Reactive versus a Typical Event-Driven Approach).catchError()
: This operator lets us continue by subscribing to another Observable returned from its callback when anonError
notification occurs.toArray()
: This operator collects all items from its source Observable and reemits them as a single array when the source Observable completes.timeout()
: This operator sends anonError
notification if no values arrived within a certain time span.
Enough theory; let's start writing our first custom class, which we'll utilize a few times throughout this book.
- The Complete Rust Programming Reference Guide
- Learning C# by Developing Games with Unity 2020
- Progressive Web Apps with React
- Java EE 6 企業級應用開發教程
- 算法基礎:打開程序設計之門
- React Native Cookbook
- 算法精粹:經典計算機科學問題的Java實現
- The DevOps 2.4 Toolkit
- Access 2016數據庫管
- HTML5從入門到精通(第4版)
- Quantum Computing and Blockchain in Business
- 自學Python:編程基礎、科學計算及數據分析(第2版)
- Java Web開發實例大全(基礎卷) (軟件工程師開發大系)
- 石墨烯改性塑料
- 精通Oracle 12c 數據庫管理