官术网_书友最值得收藏!

The create operator

The create operator is the operator most often used to create custom observables. The implementation of almost all other factory operators is done on top of this one. Its marble diagram is shown in the following figure:

Figure 4.11: The create operator

Its prototype is as follows:

Observable.create(subscribe)

The subscribe parameter is a function which will be called each time an observer subscribes to the observable. The prototype of the subscribe function is as follows:

subscribe(observer)

Its only argument is the observer that subscribed to the observable. The following code shows a simple way to use it:

def on_subscribe(observer):
observer.on_next(1)
observer.on_next(2)
observer.on_next(3)
observer.on_completed()

numbers = Observable.create(on_subscribe)
numbers.subscribe(
on_next=lambda i: print("item: {}".format(i)),
on_error=lambda e: print("error: {}".format(e)),
on_completed=lambda: print("completed")
)

The on_subscribe subscription function emits three items on the observer by calling its on_next method. Then it completes the observable by calling the on_completed method of the observer. This subscribe function is used to create the numbers observable. The subscription provides the following result:

item: 1
item: 2
item: 3
completed

The preceding example was very simple. Let's look at a more realistic example of a very common pattern of the create operator—implementing an observable that reacts from the items of another observable (in other words, an operator). The preceding example sums items from the source observable as long as they are even. Every time an odd number is received, the current sum is emitted on the output observable and its value is reset to the value of the odd number.

Let's start with the subscription to this custom operator, shown as follows:

numbers = Observable.from_([2,2,4,5,2])
sum_even(numbers).subscribe(
on_next=lambda i: print("item: {}".format(i)),
on_error=lambda e: print("error: {}".format(e)),
on_completed=lambda: print("completed")
)

An observable of numbers is created. This observable is provided to the sum_even function, and the resulting observable is subscribed. The skeleton of the sum_even function is as follows:

def sum_even(source):
def on_subscribe(observer):
accumulator = 0
source.subscribe(on_next, on_error, on_completed)

return Observable.create(on_subscribe)

The preceding code just returns an observable, with the nested on_subscribe subscription function. The on_subscribe function initializes the sum accumulator to 0 and subscribes to the source observable. So, when an observer subscribes to the observable returned by sum_evenon_subscribe is called, and the source observable is also subscribed. This is a chain of subscriptions. Finally, the callbacks of the source observer must be implemented as nested functions of on_subscribe, as follows:

        def on_next(i):
nonlocal accumulator
if i % 2 == 0:
accumulator += i
else:
observer.on_next(accumulator)
accumulator = i

def on_error(e):
observer.on_error()

def on_completed():
nonlocal accumulator
observer.on_next(accumulator)
observer.on_completed()

The on_next implementation should be clear. The accumulator is updated with the sum of items when they are even and is reset when they are odd. The value of the accumulator is emitted every time an odd number is received. The error and completion of the source observable are propagated to observer of the output observable. The complete code is as follows:

def sum_even(source):
def on_subscribe(observer):
accumulator = 0
def on_next(i):
nonlocal accumulator
if i % 2 == 0:
accumulator += i
else:
observer.on_next(accumulator)
accumulator = i

def on_error(e):
observer.on_error()

def on_completed():
nonlocal accumulator
observer.on_next(accumulator)
observer.on_completed()

source.subscribe(on_next, on_error, on_completed)

return Observable.create(on_subscribe)

numbers = Observable.from_([2,2,4,5,2])
sum_even(numbers).subscribe(
on_next=lambda i: print("item: {}".format(i)),
on_error=lambda e: print("error: {}".format(e)),
on_completed=lambda: print("completed")
)

The preceding code provides the following output:

item: 8
item: 7
completed

The two items received correspond to the sum of 2, 2, 4, and the sum of 5 and 2. The completion is correctly received after these two items.

主站蜘蛛池模板: 惠东县| 新宾| 芒康县| 乌恰县| 荆州市| 天津市| 上虞市| 剑阁县| 彩票| 达尔| 肃宁县| 东兴市| 凤翔县| 大兴区| 大田县| 民县| 海南省| 华宁县| 龙胜| 靖宇县| 枞阳县| 茌平县| 驻马店市| 攀枝花市| 常德市| 普安县| 罗城| 巴彦淖尔市| 湖州市| 洪洞县| 河津市| 宜兰县| 秭归县| 芮城县| 平远县| 连云港市| 呼伦贝尔市| 玛曲县| 许昌市| 鄱阳县| 彰化县|