- PHP Reactive Programming
- Martin Sikora
- 1460字
- 2021-07-09 19:06:17
Writing CURLObservable
As we said, we're going to work with API calls and, for this reason, we need a comfortable way of creating HTTP requests. It's probably no surprise that we'll write a custom Observable that downloads a URL and passes it's response to its observers, where we'll decode it from JSON using the operator we created just a couple of lines above.
We're going to use PHP's cURL module, which is a wrapper around libcurl ( https://curl.haxx.se/libcurl/ ) - a C library for transferring data via any protocols imaginable.
We'll start by using plain simple cURL in PHP and we'll see that it supports some sort of asynchronous approach out-of-the-box.
Imperative approach and cURL
If we just wanted to download a single URL, we wouldn't need anything special. However, we want to make this, and all future applications of CURLObservable
class, more interactive, so we'll also keep track of the downloading progress.
A plain and simple approach could look like this:
// curl_01.php $ch = curl_init(); curl_setopt($ch, CURLOPT_URL, "http://google.com"); curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); curl_setopt($ch, CURLOPT_PROGRESSFUNCTION, 'progress'); curl_setopt($ch, CURLOPT_NOPROGRESS, false); curl_setopt($ch, CURLOPT_HEADER, 0); $html = curl_exec($ch); curl_close($ch); function progress($res, $downtotal, $down, $uptotal, $up) { if ($download_size > 0) { printf("%.2f\n", $down / $downtotal * 100); } ob_flush(); usleep(100 * 1000); }
We're using CURLOPT_PROGRESSFUNCTION
option to set a callback function which is invoked internally by the cURL module. It takes four arguments that help us keep track of how much of the page's total size already has been downloaded.
We probably don't need to show its output because it's pretty obvious.
There's also a small subset of cURL functions that work with multiple cURL handles simultaneously. These are all prefixed with curl_multi_
and are executed by calling curl_multi_exec()
. Nonetheless, the curl_multi_exec()
function is blocking and the interpreter needs to wait until it finishes.
Implementing cURL into a custom Observable
We've already seen how to write a custom observer, Subject and operator. Now is the right time to write an Observable as well. We want the Observable to emit values when downloading the URL and, at the end, return a complete response. We can distinguish between the two types of messages by checking their type. Progress will always be a double, while response will always be a string.
Let's start with our class synopsis to see how it's going to work and then implement each method separately with a short description:
use Rx\Observable; use Rx\ObserverInterface as ObserverI; class CURLObservable extends Observable { public function __construct($url) {} public function subscribe(ObserverI $obsr, $sched = null) {} private function startDownload() {} private function progress($r, $downtot, $down, $uptot, $up) {} }
Every time we write an Observable, we'll extend the base Rx\Observable
class. We could theoretically just implement Rx\ObservableInterface
, but, most of the time, we also want to inherit all its internal logic and all existing operators.
The constructor and method startDownload()
are going to be very simple. In startDownload()
, we start downloading the URL while monitoring its progress.
Please note that this code goes inside the CURLObservable
class; we're just trying to keep the code short and easy to read, so we have omitted indentation and class definition in this example:
public function __construct($url) { $this->url = $url; } private function startDownload() { $ch = curl_init(); curl_setopt($ch, CURLOPT_URL, $this->url); curl_setopt($ch, CURLOPT_PROGRESSFUNCTION,[$this,'progress']); curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); curl_setopt($ch, CURLOPT_NOPROGRESS, false); curl_setopt($ch, CURLOPT_HEADER, 0); curl_setopt($ch, CURLOPT_USERAGENT, 'Mozilla/5.0 ...'); // Disable gzip compression curl_setopt($ch, CURLOPT_ENCODING, 'gzip;q=0,deflate,sdch'); $response = curl_exec($ch); curl_close($ch); return $response; }
This is mostly the same as the example using an imperative approach. The only interesting difference is that we're using a callable [$this,'progress']
instead of just a function name, as we did earlier.
The actual emission of values happens inside the progress()
method:
private function progress($res, $downtotal, $down, $uptotal, $up){ if ($downtotal > 0) { $percentage = sprintf("%.2f", $down / $downtotal * 100); foreach ($this->observers as $observer) { /** @var ObserverI $observer */ $observer->onNext(floatval($percentage)); } } }
Since we inherited the original Observable, we can make use of its protected property $observers
that holds all subscribed observers, as its name suggests. To emit a value to all of them, we can simply iterate the array and call onNext
on each observer.
The only method we haven't seen so far is subscribe()
:
public function subscribe(ObserverI $obsr, $sched = null) { $disp1 = parent::subscribe($obsr, $sched); if (null === $sched) { $sched = new ImmediateScheduler(); } $disp2 = $sched->schedule(function() use ($obsr, $started) { $response = $this->startDownload(); if ($response) { $obsr->onNext($response); $obsr->onCompleted(); } else { $msg = 'Unable to download ' . $this->url); $obsr->onError(new Exception($msg)); } }); return new CompositeDisposable([$disp1, $disp2]); }
This method combines many of the things we've seen in this chapter:
- We definitely want to keep the original functionality of the Observable, so we'll call its parent implementation. This adds the observer to the array of observers, as mentioned a moment ago.
- The
parent::subscribe()
method returns a disposable. That's the object we can use to unsubscribe the observer from this Observable. - If we don't specify what Scheduler this Observable should use, it'll fall back to
ImmediateScheduler
. We've already mentionedImmediateScheduler
when we were talking about Schedulers in general. In RxPHP 2, we'd useScheduler::getImmediate()
instead of directly using the class name. - Right after that, we schedule the work (in terms of Schedulers, it's usually referred to as "action") to be executed by the Scheduler. Note that the action itself is a closure.
- Then, we start downloading the URL. If we subscribe another observer to the same Observable, it'll re-download the same URL again. Download progress is then emitted with frequency according to cURL's internals. We'll talk more about the subscription process in the next chapter.
- When downloading finishes, we emit the response or an error.
- At the end of this method, it returns another disposable. This time, it's
CompositeDisposable
that is used to wrap other disposables. When calling itsdispose()
method, these wrapped ones are properly disposed as well.
So, that's it. Now we can test our Observable and see what its output is. We can try to grab a list of the most recent questions on www.stackoverflow.com tagged with functional-programming":
$url = 'https://api.stack...&tagged=functional-programming'; $observable = new CurlObservable($url); $observable->subscribe(new DebugSubject());
This prints a couple of numbers and then the response JSON string:
16:17:52 onNext: 21.39 (double) 16:17:52 onNext: 49.19 (double) 16:17:52 onNext: 49.19 (double) 16:17:52 onNext: 76.99 (double) 16:17:52 onNext: 100 (double) 16:17:52 onNext: {"items":[{"tags":["javascript","... (string) 16:17:52 onCompleted
You can see that one value was emitted twice. This is because of the timing and network latency when cURL evaluates the callback, which is nothing unusual. If we didn't want to see repeated values, we could use the distinct()
operator that we saw when talking about "marble diagrams".
Now let's combine it with our JSONDecodeOperator
. Since we're now interested only in the string response and want to ignore all progress emissions, we'll also use the filter()
operator:
// rxphp_curl.php $observable ->filter(function($value) { return is_string($value); }) ->lift(function() { return new JSONDecodeOperator(); }) ->subscribe(new DebugSubject(null, 128));
This returns part of the response array (for demonstration purposes, we added indentation and made the output a little longer):
$ php rxphp_curl.php 16:23:55 [] onNext: { "items": [ { "tags": [ "javascript", "functional-programming", ... (array) 16:23:55 [] onCompleted
When we used the filter()
operator, you might notice that we called it Observable::filter()
without necessarily using the lift()
method. This is because almost all operators are, in fact, just lift()
calls with predefined Closures that return an appropriate operator class. A good question is whether we can write our own shorthand for JSONDecodeOperator
when we're already extending the base Observable class. Maybe something like Observable::jsonDecode()
?
The answer is yes, we can. However, in RxPHP 1.x, it wouldn't help us a lot. When we chain operators, they return other instances of Observables that aren't under our control. We could theoretically use Observable::jsonDecode()
right after creating CurlObservable
because we'd know that it's going to be an instance of this class, but chaining it with filter()
brings us back to the original Observable that doesn't know any jsonDecode()
methods. In particular, the filter()
operator returns an instance of Rx\Observable\AnonymousObservable
.
Running multiple requests asynchronously
An interesting use case could be to start multiple requests asynchronously. All calls to curl_exec()
are blocking, which means that they block the execution context until they're finished.
Unfortunately, this is a very tricky problem that's hard to solve without using any extra PHP modules, such as pthreads, as we'll see much later in Chapter 9, Multithreaded and Distributed Computing with pthreads and Gearman.
We can, however, make use of PHP's standard proc_open()
to spawn non-blocking subprocesses that can run in parallel and then just ask for their output.
- 摩登創(chuàng)客:與智能手機和平板電腦共舞
- Effective C#:改善C#代碼的50個有效方法(原書第3版)
- 控糖控脂健康餐
- 數(shù)據(jù)結(jié)構(gòu)簡明教程(第2版)微課版
- SQL語言從入門到精通
- Web全棧工程師的自我修養(yǎng)
- The Data Visualization Workshop
- Interactive Applications Using Matplotlib
- Bootstrap 4:Responsive Web Design
- 網(wǎng)絡爬蟲原理與實踐:基于C#語言
- TypeScript項目開發(fā)實戰(zhàn)
- Visual Foxpro 9.0數(shù)據(jù)庫程序設計教程
- 基于SpringBoot實現(xiàn):Java分布式中間件開發(fā)入門與實戰(zhàn)
- Hacking Android
- Hands-On Dependency Injection in Go