Let's have a look at a very simple example of RxPHP, similar to what we did in the previous chapter, and use it to demonstrate some of the basic principles behind Reactive Extensions.
We won't bother with defining an observer right now and will focus only on Observables and operators:
In this example, we have one Observable, two operators and one observer.
An Observable can be chained with operators. In this example, the operators are map() and filter().
Observables have the subscribe() method that is used by observers to start receiving values at the end of the chain.
We can represent this chain by the following diagram:
Each arrow shows the direction of propagation of items and notifications
We should probably explain the difference between using Observables and just iterating the array.
Observables are like a push model, where a value is pushed down the operator chain when it's ready. This is very important because it's the Observable that decides when it should emit the next value. The internal logic of Observables can do whatever it needs to (for example, it can run some asynchronous task) and still remain completely hidden.
A similar concept to Observables are Promises. However, while a Promise represents a single value that will exist in the future, an Observable represents a stream of values.
On the other hand, iterating the array is like a pull model. We'd be pulling one item after another. The important consequence is that we'd have to have the array prepared beforehand (that's before we start iterating it).
Another important difference is that Observables behave like a data stream (or data flow). We talked about streams in Chapter 1, Introduction to Reactive Programming. In practice, this means that an Observable knows when it has emitted all its items, or when an error has occurred and is able to send proper notification down the chain.
For this reason, Observables can call three different methods on their observers (we'll see how this is implemented later in this chapter when we write a custom operator and a custom Observable):
onNext: This method is called when the next item is ready to be emitted. We typically say that "an Observable emits an item".
onError: Notification called when an error has occurred. This could be any type of error represented by an instance of the Exception class.
onComplete: Notification called when there're no more items to be emitted.
Each Observable can emit zero or more items.
Each Observable can send one error, or one complete notification; but never both.
This is why the CallbackObserver class we used in Chapter 1, Introduction to Reactive Programming, takes three callables as arguments. These callables are called when the observer receives a next item, on error notification or on complete notification, respectively. All three callables are optional parameters and we can decide to ignore any of them.
For example, we can make an observer like the following:
We can see that only three values passed the filter() operator, followed by a proper complete notification at the end.
In RxPHP, every operator that takes a callable as an argument wraps its call internally with try…catch block. If the callable throws Exception, then this Exception is sent as onError notification. Consider the following example:
It's important to see that, when an error occurred, no more items were emitted, there's also no complete notification. This is because, when the observer received an error, it automatically unsubscribed.
We'll talk more about the process behind subscribing and unsubscribing in Chapter 3, Writing a Reddit Reader with RxPHP, and in Chapter 10, Using Advanced Operators and Techniques in RxPHP.
In Chapter 8, Multicasting in RxPHP and PHP7 pthreads Extension, we'll look more in-depth into what happens inside observers when they receive an error or complete notification.
One last thing before we move on. We said that Observables represent data streams. The great advantage of this is that we can easily combine or split streams, similar to what we saw in Chapter 1, Introduction to Reactive Programming, when talking about the gulp build tool.
Let's have a look at a slightly more advanced example of merging two Observables:
We used the merge() operator to combine the existing Observable with another Observable. Notice that we can add the operator anywhere we want. Since we added it after the filter() operator and before the subscribe() call, the items from the second Observable are going to be emitted right into the observer and will skip the preceding operator chain.
We can represent this chain by the following diagram:
The output for this example looks like the following:
These principles apply to all Rx implementations. Now, we should have a basic idea of what working with Observables, observers and operators in Rx looks like and we can talk more about each of them separately.