- Hands-On Reactive Programming with Python
- Romain Picard
- 581字
- 2021-06-24 18:25:25
The create operator
The create operator is the operator most often used to create custom observables. The implementation of almost all other factory operators is done on top of this one. Its marble diagram is shown in the following figure:
Its prototype is as follows:
Observable.create(subscribe)
The subscribe parameter is a function which will be called each time an observer subscribes to the observable. The prototype of the subscribe function is as follows:
subscribe(observer)
Its only argument is the observer that subscribed to the observable. The following code shows a simple way to use it:
def on_subscribe(observer):
observer.on_next(1)
observer.on_next(2)
observer.on_next(3)
observer.on_completed()
numbers = Observable.create(on_subscribe)
numbers.subscribe(
on_next=lambda i: print("item: {}".format(i)),
on_error=lambda e: print("error: {}".format(e)),
on_completed=lambda: print("completed")
)
The on_subscribe subscription function emits three items on the observer by calling its on_next method. Then it completes the observable by calling the on_completed method of the observer. This subscribe function is used to create the numbers observable. The subscription provides the following result:
The preceding example was very simple. Let's look at a more realistic example of a very common pattern of the create operator—implementing an observable that reacts from the items of another observable (in other words, an operator). The preceding example sums items from the source observable as long as they are even. Every time an odd number is received, the current sum is emitted on the output observable and its value is reset to the value of the odd number.
Let's start with the subscription to this custom operator, shown as follows:
numbers = Observable.from_([2,2,4,5,2])
sum_even(numbers).subscribe(
on_next=lambda i: print("item: {}".format(i)),
on_error=lambda e: print("error: {}".format(e)),
on_completed=lambda: print("completed")
)
An observable of numbers is created. This observable is provided to the sum_even function, and the resulting observable is subscribed. The skeleton of the sum_even function is as follows:
def sum_even(source):
def on_subscribe(observer):
accumulator = 0
source.subscribe(on_next, on_error, on_completed)
return Observable.create(on_subscribe)
The preceding code just returns an observable, with the nested on_subscribe subscription function. The on_subscribe function initializes the sum accumulator to 0 and subscribes to the source observable. So, when an observer subscribes to the observable returned by sum_even, on_subscribe is called, and the source observable is also subscribed. This is a chain of subscriptions. Finally, the callbacks of the source observer must be implemented as nested functions of on_subscribe, as follows:
def on_next(i):
nonlocal accumulator
if i % 2 == 0:
accumulator += i
else:
observer.on_next(accumulator)
accumulator = i
def on_error(e):
observer.on_error()
def on_completed():
nonlocal accumulator
observer.on_next(accumulator)
observer.on_completed()
The on_next implementation should be clear. The accumulator is updated with the sum of items when they are even and is reset when they are odd. The value of the accumulator is emitted every time an odd number is received. The error and completion of the source observable are propagated to observer of the output observable. The complete code is as follows:
def sum_even(source):
def on_subscribe(observer):
accumulator = 0
def on_next(i):
nonlocal accumulator
if i % 2 == 0:
accumulator += i
else:
observer.on_next(accumulator)
accumulator = i
def on_error(e):
observer.on_error()
def on_completed():
nonlocal accumulator
observer.on_next(accumulator)
observer.on_completed()
source.subscribe(on_next, on_error, on_completed)
return Observable.create(on_subscribe)
numbers = Observable.from_([2,2,4,5,2])
sum_even(numbers).subscribe(
on_next=lambda i: print("item: {}".format(i)),
on_error=lambda e: print("error: {}".format(e)),
on_completed=lambda: print("completed")
)
The preceding code provides the following output:
The two items received correspond to the sum of 2, 2, 4, and the sum of 5 and 2. The completion is correctly received after these two items.
- Linux設(shè)備驅(qū)動開發(fā)詳解:基于最新的Linux4.0內(nèi)核
- Mobile-first Bootstrap
- 網(wǎng)絡(luò)操作系統(tǒng):Windows Server 2003管理與應(yīng)用
- 嵌入式Linux系統(tǒng)開發(fā):基于Yocto Project
- 混沌工程:復(fù)雜系統(tǒng)韌性實現(xiàn)之道
- Linux就該這么學(xué)
- 8051軟核處理器設(shè)計實戰(zhàn)
- 直播系統(tǒng)開發(fā):基于Nginx與Nginx-rtmp-module
- 嵌入式系統(tǒng)原理及開發(fā)
- AutoCAD 2014中文版從入門到精通
- jQuery UI Cookbook
- Vim 8文本處理實戰(zhàn)
- Windows 7使用詳解(修訂版)
- 大學(xué)計算機應(yīng)用基礎(chǔ)實踐教程(Windows 7+MS Office 2010)
- 完美應(yīng)用Ubuntu(第2版)