- PHP Reactive Programming
- Martin Sikora
- 1070字
- 2021-07-09 19:06:17
Writing JSONDecodeOperator
We're going to work with calls to remote API's a few times throughout this book, so it would be very handy to have an operator that transforms JSON string responses into their PHP array representations.
This example looks like something that could be easily done with just the map()
operator:
// rxphp_06.php Rx\Observable::just('{"value":42}') ->map(function($value) { return json_decode($value, true); }) ->subscribe(new DebugSubject());
This prints the correct result for sure, as we can see in the following output:
$ php rxphp_06.php 16:39:50 [] onNext: {"value": 42} (array) 16:39:50 [] onCompleted
Well, but what about malformed JSON strings? What happens if we try to decode the following:
Rx\Observable::just('NA') ->map(function($value) { return json_decode($value, true); }) ->subscribe(new DebugSubject());
The function json_decode()
doesn't throw an exception when trying to process an invalid JSON string; it just returns null
:
15:51:06 [] onNext: (NULL)
This is probably not what we want. If the JSON string is invalid, then something is wrong because this situation should never happen and we want to send an onError
notification.
If we wanted to know any further information about which error occurred, we'd have to call json_last_error()
. So, this is a perfect opportunity to write a custom operator that decodes JSON strings that, if any error occurs, will send an onError
.
All operators implement the OperatorInterface
and __invoke()
method. This so-called "magic" method is supported from PHP 5.3+ and allows the use of objects as functions:
// __invoke.php class InvokeExampleClass { public function __invoke($x) { echo strlen($x); } } $obj = new InvokeExampleClass(); $obj('apple'); var_dump(is_callable($obj));
When class implements __invoke()
, it's automatically considered as callable as well:
$ php __invoke.php int(5) bool(true)
Writing operators is very similar. A stub for our class will look like the following:
// JSONDecodeOperator.php use Rx\ObservableInterface as ObservableI; use Rx\ObserverInterface as ObserverI; use Rx\SchedulerInterface as SchedulerI; use Rx\Operator\OperatorInterface as OperatorI; class JSONDecodeOperator implements OperatorI { public function __invoke(ObservableI $observable, ObserverI $observer, SchedulerI $scheduler = null) { // ... } }
Method __invoke()
takes three arguments and returns a Disposable object. Right now, we'll use just the first two and not worry about the $scheduler
:
ObservableInterface $observable
: This is our input Observable that we'll subscribe toObserverInterface $observer
: This is where we'll emit all output values from this operator
We'll follow almost the same principle as when writing a custom Subject
class. We're going to use CallbackObserver
to subscribe to the Observable and perform all of our logic:
class JSONDecodeOperator implements OperatorI { public function __invoke(ObservableI $observable, ObserverI $observer, SchedulerI $scheduler = null) { $obs = new CallbackObserver( function ($value) use ($observer) { $decoded = json_decode($value, true); if (json_last_error() == JSON_ERROR_NONE) { $observer->onNext($decoded); } else { $msg = json_last_error_msg(); $e = new InvalidArgumentException($msg); $observer->onError($e); } }, function ($error) use ($observer) { $observer->onError($error); }, function () use ($observer) { $observer->onCompleted(); } ); return $observable->subscribe($obs, $scheduler); } }
There're a few interesting things to notice:
- When
onError
oronComplete
notifications occur, we just pass them along without any further logic. - The operator can send any signal any time it wants. Inside
CallbackObserver
class'sonNext
closure, we check whether any error occurred while decoding the input JSON string coming from the source Observable usingjson_last_error()
. - The operator has full access to the source Observable.
- The operator can emit values independently on values from the source Observable.
In order to use our operator, we have to use the Observable::lift()
, method which takes a Closure as an argument that needs to return an instance of an operator (this function is a so-called operator factory):
// rxphp_07.php Rx\Observable::just('{"value":42}') ->lift(function() { return new JSONDecodeOperator(); }) ->subscribe(new DebugSubject());
Using custom operators was significantly simplified in RxPHP 2, but using the lift()
method is universal and works in both versions of RxPHP.
Valid JSON string is decoded as expected:
$ php rxphp_07.php 17:58:49 [] onNext: {"value": 42} (array) 17:58:49 [] onCompleted
On the other hand, the same invalid JSON string that we used above doesn't call onNext
, but onError
instead. It sends this notification with an instance of InvalidArgumentException
class and the error message from json_last_error_msg()
, as shown in the following output:
17:59:25 onError (InvalidArgumentException): Syntax error
As usual, we're going to reuse this class throughout this book. The next chapter is going to work with remote APIs a lot, so this operator is going to be very handy.
Simplifying propagation of notifications
In the JSONDecodeOperator
class, we didn't want to modify either onError
nor onComplete
notifications and we just passed them along. However, there's an easier way to do this thanks to how PHP works with callables. A valid callable is also an array with two items: an object and a method name.
This means we can rewrite the above CallbackObserver
instantiation as follows:
$callbackObserver = new CallbackObserver( function ($value) use ($observer) { // ... }, [$observer, 'onError'], [$observer, 'onCompleted'] );
The functionality is exactly the same. Instead of creating an anonymous function for each notification, we can just pass the callable directly.
Using custom operators in RxPHP 2
In Chapter 1, Introduction to Reactive Programming, we mentioned a magic __call()
method. RxPHP 2 uses this method to allow the use of custom operators by auto-discovering them in two namespace formats.
The first option is defining our operator class in the Rx\Operator
namespace:
// JSONDecodeOperator.php namespace Rx\Operator; use Rx\ObservableInterface as ObservableI; use Rx\ObserverInterface as ObserverI; use Rx\Operator\OperatorInterface as OperatorI; use Rx\DisposableInterface as DisposableI; class JSONDecodeOperator implements OperatorI { public function __invoke(ObservableI $observable, ObserverI $observer): DisposableI { return $observable->subscribe( function ($value) use ($observer) { $decoded = json_decode($value, true); if (json_last_error() == JSON_ERROR_NONE) { $observer->onNext($decoded); } else { $msg = json_last_error_msg(); $e = new InvalidArgumentException($msg); $observer->onError($e); } }, [$observer, 'onError'], [$observer, 'onCompleted'] ); } }
It's the same JSONDecodeOperator
class, just updated for RxPHP 2. Using this operator is, then, very simple:
Observable::just('{"value":42}') ->JSONDecode() ->subscribe(new DebugSubject());
Since our operator resides under the Rx\Operator
namespace, it's expanded by the __call()
method to Rx\Operator\JSONDecodeOperator
. This means we don't need to use the lift()
method at all.
Another way is to prefix the operator name and namespace with underscores _
which are then merged into a full class name. This means we can put all application specific operators under a custom namespace:
// JSONDecodeOperator.php namespace MyApp\Rx\Operator; ... class JSONDecodeOperator implements OperatorI { ... }
Now we can use the operator as follows:
Observable::just('{"value":42}') ->_MyApp_JSONDecode() ->subscribe(new DebugSubject());
- CockroachDB權威指南
- 算法零基礎一本通(Python版)
- Vue.js 3.x從入門到精通(視頻教學版)
- Learning Flask Framework
- Java 9 Programming Blueprints
- Django Design Patterns and Best Practices
- Podman實戰
- MySQL數據庫管理與開發(慕課版)
- Learning PHP 7
- Spring Boot+MVC實戰指南
- Spring技術內幕:深入解析Spring架構與設計原理(第2版)
- Extending Unity with Editor Scripting
- Oracle數據庫編程經典300例
- Distributed Computing in Java 9
- Tableau Dashboard Cookbook