官术网_书友最值得收藏!

Writing CURLObservable

As we said, we're going to work with API calls and, for this reason, we need a comfortable way of creating HTTP requests. It's probably no surprise that we'll write a custom Observable that downloads a URL and passes it's response to its observers, where we'll decode it from JSON using the operator we created just a couple of lines above.

We're going to use PHP's cURL module, which is a wrapper around libcurl ( https://curl.haxx.se/libcurl/ ) - a C library for transferring data via any protocols imaginable.

We'll start by using plain simple cURL in PHP and we'll see that it supports some sort of asynchronous approach out-of-the-box.

Imperative approach and cURL

If we just wanted to download a single URL, we wouldn't need anything special. However, we want to make this, and all future applications of CURLObservable class, more interactive, so we'll also keep track of the downloading progress.

A plain and simple approach could look like this:

// curl_01.php 
$ch = curl_init(); 
curl_setopt($ch, CURLOPT_URL, "http://google.com"); 
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); 
curl_setopt($ch, CURLOPT_PROGRESSFUNCTION, 'progress'); 
curl_setopt($ch, CURLOPT_NOPROGRESS, false); 
curl_setopt($ch, CURLOPT_HEADER, 0); 
$html = curl_exec($ch); 
curl_close($ch); 
 
function progress($res, $downtotal, $down, $uptotal, $up) { 
    if ($download_size > 0) { 
        printf("%.2f\n", $down / $downtotal * 100); 
    } 
    ob_flush(); 
    usleep(100 * 1000); 
} 

We're using CURLOPT_PROGRESSFUNCTION option to set a callback function which is invoked internally by the cURL module. It takes four arguments that help us keep track of how much of the page's total size already has been downloaded.

We probably don't need to show its output because it's pretty obvious.

There's also a small subset of cURL functions that work with multiple cURL handles simultaneously. These are all prefixed with curl_multi_ and are executed by calling curl_multi_exec(). Nonetheless, the curl_multi_exec() function is blocking and the interpreter needs to wait until it finishes.

Implementing cURL into a custom Observable

We've already seen how to write a custom observer, Subject and operator. Now is the right time to write an Observable as well. We want the Observable to emit values when downloading the URL and, at the end, return a complete response. We can distinguish between the two types of messages by checking their type. Progress will always be a double, while response will always be a string.

Let's start with our class synopsis to see how it's going to work and then implement each method separately with a short description:

use Rx\Observable; 
use Rx\ObserverInterface as ObserverI; 
 
class CURLObservable extends Observable { 
    public function __construct($url) {} 
    public function subscribe(ObserverI $obsr, $sched = null) {} 
    private function startDownload() {} 
    private function progress($r, $downtot, $down, $uptot, $up) {} 
} 

Every time we write an Observable, we'll extend the base Rx\Observable class. We could theoretically just implement Rx\ObservableInterface, but, most of the time, we also want to inherit all its internal logic and all existing operators.

The constructor and method startDownload() are going to be very simple. In startDownload(), we start downloading the URL while monitoring its progress.

Please note that this code goes inside the CURLObservable class; we're just trying to keep the code short and easy to read, so we have omitted indentation and class definition in this example:

public function __construct($url) { 
    $this->url = $url; 
} 
 
private function startDownload() { 
    $ch = curl_init(); 
    curl_setopt($ch, CURLOPT_URL, $this->url); 
    curl_setopt($ch, CURLOPT_PROGRESSFUNCTION,[$this,'progress']); 
    curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); 
    curl_setopt($ch, CURLOPT_NOPROGRESS, false); 
    curl_setopt($ch, CURLOPT_HEADER, 0); 
    curl_setopt($ch, CURLOPT_USERAGENT, 'Mozilla/5.0 ...'); 
    // Disable gzip compression 
    curl_setopt($ch, CURLOPT_ENCODING, 'gzip;q=0,deflate,sdch'); 
    $response = curl_exec($ch); 
    curl_close($ch); 
 
    return $response; 
} 

This is mostly the same as the example using an imperative approach. The only interesting difference is that we're using a callable [$this,'progress'] instead of just a function name, as we did earlier.

The actual emission of values happens inside the progress() method:

private function progress($res, $downtotal, $down, $uptotal, $up){ 
    if ($downtotal > 0) { 
        $percentage = sprintf("%.2f", $down / $downtotal * 100); 
        foreach ($this->observers as $observer) { 
            /** @var ObserverI $observer */ 
            $observer->onNext(floatval($percentage)); 
        } 
    } 
} 

Since we inherited the original Observable, we can make use of its protected property $observers that holds all subscribed observers, as its name suggests. To emit a value to all of them, we can simply iterate the array and call onNext on each observer.

The only method we haven't seen so far is subscribe():

public function subscribe(ObserverI $obsr, $sched = null) { 
    $disp1 = parent::subscribe($obsr, $sched); 
 
    if (null === $sched) { 
        $sched = new ImmediateScheduler(); 
    } 
 
    $disp2 = $sched->schedule(function() use ($obsr, $started) { 
        $response = $this->startDownload(); 
        if ($response) { 
            $obsr->onNext($response); 
            $obsr->onCompleted(); 
        } else { 
            $msg = 'Unable to download ' . $this->url); 
            $obsr->onError(new Exception($msg)); 
        } 
    }); 
 
    return new CompositeDisposable([$disp1, $disp2]); 
} 

This method combines many of the things we've seen in this chapter:

  • We definitely want to keep the original functionality of the Observable, so we'll call its parent implementation. This adds the observer to the array of observers, as mentioned a moment ago.
  • The parent::subscribe() method returns a disposable. That's the object we can use to unsubscribe the observer from this Observable.
  • If we don't specify what Scheduler this Observable should use, it'll fall back to ImmediateScheduler. We've already mentioned ImmediateScheduler when we were talking about Schedulers in general. In RxPHP 2, we'd use Scheduler::getImmediate() instead of directly using the class name.
  • Right after that, we schedule the work (in terms of Schedulers, it's usually referred to as "action") to be executed by the Scheduler. Note that the action itself is a closure.
  • Then, we start downloading the URL. If we subscribe another observer to the same Observable, it'll re-download the same URL again. Download progress is then emitted with frequency according to cURL's internals. We'll talk more about the subscription process in the next chapter.
  • When downloading finishes, we emit the response or an error.
  • At the end of this method, it returns another disposable. This time, it's CompositeDisposable that is used to wrap other disposables. When calling its dispose() method, these wrapped ones are properly disposed as well.

So, that's it. Now we can test our Observable and see what its output is. We can try to grab a list of the most recent questions on www.stackoverflow.com tagged with functional-programming":

$url = 'https://api.stack...&tagged=functional-programming'; 
$observable = new CurlObservable($url); 
$observable->subscribe(new DebugSubject()); 

This prints a couple of numbers and then the response JSON string:

16:17:52 onNext: 21.39 (double)
16:17:52 onNext: 49.19 (double)
16:17:52 onNext: 49.19 (double)
16:17:52 onNext: 76.99 (double)
16:17:52 onNext: 100 (double)
16:17:52 onNext: {"items":[{"tags":["javascript","... (string)
16:17:52 onCompleted

You can see that one value was emitted twice. This is because of the timing and network latency when cURL evaluates the callback, which is nothing unusual. If we didn't want to see repeated values, we could use the distinct() operator that we saw when talking about "marble diagrams".

Now let's combine it with our JSONDecodeOperator. Since we're now interested only in the string response and want to ignore all progress emissions, we'll also use the filter() operator:

// rxphp_curl.php 
$observable 
    ->filter(function($value) { 
        return is_string($value); 
    }) 
    ->lift(function() { 
        return new JSONDecodeOperator(); 
    }) 
    ->subscribe(new DebugSubject(null, 128)); 

This returns part of the response array (for demonstration purposes, we added indentation and made the output a little longer):

$ php rxphp_curl.php 
16:23:55 [] onNext: { 
    "items": [ 
        { 
            "tags": [ 
                "javascript", 
                "functional-programming", 
       ... (array) 
16:23:55 [] onCompleted 

When we used the filter() operator, you might notice that we called it Observable::filter() without necessarily using the lift() method. This is because almost all operators are, in fact, just lift() calls with predefined Closures that return an appropriate operator class. A good question is whether we can write our own shorthand for JSONDecodeOperator when we're already extending the base Observable class. Maybe something like Observable::jsonDecode()?

The answer is yes, we can. However, in RxPHP 1.x, it wouldn't help us a lot. When we chain operators, they return other instances of Observables that aren't under our control. We could theoretically use Observable::jsonDecode() right after creating CurlObservable because we'd know that it's going to be an instance of this class, but chaining it with filter() brings us back to the original Observable that doesn't know any jsonDecode() methods. In particular, the filter() operator returns an instance of Rx\Observable\AnonymousObservable.

Running multiple requests asynchronously

An interesting use case could be to start multiple requests asynchronously. All calls to curl_exec() are blocking, which means that they block the execution context until they're finished.

Unfortunately, this is a very tricky problem that's hard to solve without using any extra PHP modules, such as pthreads, as we'll see much later in Chapter 9Multithreaded and Distributed Computing with pthreads and Gearman.

We can, however, make use of PHP's standard proc_open() to spawn non-blocking subprocesses that can run in parallel and then just ask for their output.

主站蜘蛛池模板: 安宁市| 胶州市| 沁阳市| 赤水市| 周口市| 泌阳县| 台中市| 隆昌县| 长葛市| 上犹县| 博客| 共和县| 白河县| 抚宁县| 济南市| 原平市| 荥经县| 天长市| 上杭县| 五常市| 敦煌市| 津市市| 六枝特区| 宿松县| 松滋市| 嵩明县| 哈巴河县| 莒南县| 台山市| 墨竹工卡县| 安龙县| 乐都县| 高碑店市| 红安县| 清远市| 南宫市| 潍坊市| 砚山县| 余江县| 晋中市| 芦溪县|