官术网_书友最值得收藏!

Writing JSONDecodeOperator

We're going to work with calls to remote API's a few times throughout this book, so it would be very handy to have an operator that transforms JSON string responses into their PHP array representations.

This example looks like something that could be easily done with just the map() operator:

// rxphp_06.php  
Rx\Observable::just('{"value":42}') 
    ->map(function($value) { 
        return json_decode($value, true); 
    }) 
    ->subscribe(new DebugSubject()); 

This prints the correct result for sure, as we can see in the following output:

$ php rxphp_06.php
16:39:50 [] onNext: {"value": 42} (array)
16:39:50 [] onCompleted

Well, but what about malformed JSON strings? What happens if we try to decode the following:

Rx\Observable::just('NA') 
    ->map(function($value) { 
        return json_decode($value, true); 
    }) 
    ->subscribe(new DebugSubject()); 

The function json_decode() doesn't throw an exception when trying to process an invalid JSON string; it just returns null:

15:51:06 [] onNext: (NULL)

This is probably not what we want. If the JSON string is invalid, then something is wrong because this situation should never happen and we want to send an onError notification.

If we wanted to know any further information about which error occurred, we'd have to call json_last_error(). So, this is a perfect opportunity to write a custom operator that decodes JSON strings that, if any error occurs, will send an onError.

All operators implement the OperatorInterface and __invoke() method. This so-called "magic" method is supported from PHP 5.3+ and allows the use of objects as functions:

// __invoke.php 
class InvokeExampleClass { 
    public function __invoke($x) { 
        echo strlen($x); 
    } 
} 
$obj = new InvokeExampleClass(); 
$obj('apple'); 
var_dump(is_callable($obj)); 

When class implements __invoke(), it's automatically considered as callable as well:

$ php __invoke.php
int(5)
bool(true)

Writing operators is very similar. A stub for our class will look like the following:

// JSONDecodeOperator.php 
use Rx\ObservableInterface as ObservableI; 
use Rx\ObserverInterface as ObserverI; 
use Rx\SchedulerInterface as SchedulerI; 
use Rx\Operator\OperatorInterface as OperatorI; 
 
class JSONDecodeOperator implements OperatorI { 
    public function __invoke(ObservableI $observable, 
            ObserverI $observer, SchedulerI $scheduler = null) { 
        // ... 
    } 
} 

Method __invoke() takes three arguments and returns a Disposable object. Right now, we'll use just the first two and not worry about the $scheduler:

  • ObservableInterface $observable: This is our input Observable that we'll subscribe to
  • ObserverInterface $observer: This is where we'll emit all output values from this operator

We'll follow almost the same principle as when writing a custom Subject class. We're going to use CallbackObserver to subscribe to the Observable and perform all of our logic:

class JSONDecodeOperator implements OperatorI { 
  public function __invoke(ObservableI $observable, 
      ObserverI $observer, SchedulerI $scheduler = null) { 
 
    $obs = new CallbackObserver( 
      function ($value) use ($observer) { 
        $decoded = json_decode($value, true); 
        if (json_last_error() == JSON_ERROR_NONE) { 
          $observer->onNext($decoded); 
        } else { 
          $msg = json_last_error_msg(); 
          $e = new InvalidArgumentException($msg); 
          $observer->onError($e); 
        } 
      }, 
      function ($error) use ($observer) { 
        $observer->onError($error); 
      }, 
      function () use ($observer) { 
        $observer->onCompleted(); 
      } 
    ); 
 
    return $observable->subscribe($obs, $scheduler); 
  } 
} 

There're a few interesting things to notice:

  • When onError or onComplete notifications occur, we just pass them along without any further logic.
  • The operator can send any signal any time it wants. Inside CallbackObserver class's onNext closure, we check whether any error occurred while decoding the input JSON string coming from the source Observable using json_last_error().
  • The operator has full access to the source Observable.
  • The operator can emit values independently on values from the source Observable.

In order to use our operator, we have to use the Observable::lift(), method which takes a Closure as an argument that needs to return an instance of an operator (this function is a so-called operator factory):

// rxphp_07.php 
Rx\Observable::just('{"value":42}') 
    ->lift(function() { 
        return new JSONDecodeOperator(); 
    }) 
    ->subscribe(new DebugSubject()); 

Using custom operators was significantly simplified in RxPHP 2, but using the lift() method is universal and works in both versions of RxPHP.

Valid JSON string is decoded as expected:

$ php rxphp_07.php
17:58:49 [] onNext: {"value": 42} (array)
17:58:49 [] onCompleted

On the other hand, the same invalid JSON string that we used above doesn't call onNext, but onError instead. It sends this notification with an instance of InvalidArgumentException class and the error message from json_last_error_msg(), as shown in the following output:

17:59:25 onError (InvalidArgumentException): Syntax error

As usual, we're going to reuse this class throughout this book. The next chapter is going to work with remote APIs a lot, so this operator is going to be very handy.

Simplifying propagation of notifications

In the JSONDecodeOperator class, we didn't want to modify either onError nor onComplete notifications and we just passed them along. However, there's an easier way to do this thanks to how PHP works with callables. A valid callable is also an array with two items: an object and a method name.

This means we can rewrite the above CallbackObserver instantiation as follows:

$callbackObserver = new CallbackObserver( 
    function ($value) use ($observer) { 
        // ... 
    }, 
    [$observer, 'onError'], 
    [$observer, 'onCompleted'] 
); 

The functionality is exactly the same. Instead of creating an anonymous function for each notification, we can just pass the callable directly.

Using custom operators in RxPHP 2

In Chapter 1, Introduction to Reactive Programming, we mentioned a magic __call() method. RxPHP 2 uses this method to allow the use of custom operators by auto-discovering them in two namespace formats.

The first option is defining our operator class in the Rx\Operator namespace:

// JSONDecodeOperator.php 
namespace Rx\Operator; 
 
use Rx\ObservableInterface as ObservableI; 
use Rx\ObserverInterface as ObserverI; 
use Rx\Operator\OperatorInterface as OperatorI; 
use Rx\DisposableInterface as DisposableI; 
 
class JSONDecodeOperator implements OperatorI { 
  public function __invoke(ObservableI $observable, 
      ObserverI $observer): DisposableI { 
 
   return $observable->subscribe( 
     function ($value) use ($observer) { 
       $decoded = json_decode($value, true); 
       if (json_last_error() == JSON_ERROR_NONE) { 
         $observer->onNext($decoded); 
       } else { 
         $msg = json_last_error_msg(); 
         $e = new InvalidArgumentException($msg); 
         $observer->onError($e); 
       } 
     }, 
     [$observer, 'onError'], 
     [$observer, 'onCompleted'] 
   ); 
  } 
} 

It's the same JSONDecodeOperator class, just updated for RxPHP 2. Using this operator is, then, very simple:

Observable::just('{"value":42}') 
    ->JSONDecode() 
    ->subscribe(new DebugSubject()); 

Since our operator resides under the Rx\Operator namespace, it's expanded by the __call() method to Rx\Operator\JSONDecodeOperator. This means we don't need to use the lift() method at all.

Another way is to prefix the operator name and namespace with underscores _ which are then merged into a full class name. This means we can put all application specific operators under a custom namespace:

// JSONDecodeOperator.php 
namespace MyApp\Rx\Operator; 
... 
class JSONDecodeOperator implements OperatorI { ... } 

Now we can use the operator as follows:

Observable::just('{"value":42}') 
    ->_MyApp_JSONDecode() 
    ->subscribe(new DebugSubject()); 
主站蜘蛛池模板: 柘城县| 临江市| 老河口市| 随州市| 锡林郭勒盟| 泗洪县| 南平市| 揭阳市| 黄平县| 潜山县| 泰州市| 错那县| 萝北县| 藁城市| 浙江省| 新竹县| 吴堡县| 巩义市| 建德市| 长子县| 休宁县| 连州市| 灵石县| 竹溪县| 古丈县| 乌什县| 伊吾县| 阳城县| 浮山县| 杂多县| 安康市| 当雄县| 南靖县| 土默特右旗| 明溪县| 桐城市| 新巴尔虎左旗| 永嘉县| 深州市| 元阳县| 永丰县|