官术网_书友最值得收藏!

The create operator

The create operator is the operator most often used to create custom observables. The implementation of almost all other factory operators is done on top of this one. Its marble diagram is shown in the following figure:

Figure 4.11: The create operator

Its prototype is as follows:

Observable.create(subscribe)

The subscribe parameter is a function which will be called each time an observer subscribes to the observable. The prototype of the subscribe function is as follows:

subscribe(observer)

Its only argument is the observer that subscribed to the observable. The following code shows a simple way to use it:

def on_subscribe(observer):
observer.on_next(1)
observer.on_next(2)
observer.on_next(3)
observer.on_completed()

numbers = Observable.create(on_subscribe)
numbers.subscribe(
on_next=lambda i: print("item: {}".format(i)),
on_error=lambda e: print("error: {}".format(e)),
on_completed=lambda: print("completed")
)

The on_subscribe subscription function emits three items on the observer by calling its on_next method. Then it completes the observable by calling the on_completed method of the observer. This subscribe function is used to create the numbers observable. The subscription provides the following result:

item: 1
item: 2
item: 3
completed

The preceding example was very simple. Let's look at a more realistic example of a very common pattern of the create operator—implementing an observable that reacts from the items of another observable (in other words, an operator). The preceding example sums items from the source observable as long as they are even. Every time an odd number is received, the current sum is emitted on the output observable and its value is reset to the value of the odd number.

Let's start with the subscription to this custom operator, shown as follows:

numbers = Observable.from_([2,2,4,5,2])
sum_even(numbers).subscribe(
on_next=lambda i: print("item: {}".format(i)),
on_error=lambda e: print("error: {}".format(e)),
on_completed=lambda: print("completed")
)

An observable of numbers is created. This observable is provided to the sum_even function, and the resulting observable is subscribed. The skeleton of the sum_even function is as follows:

def sum_even(source):
def on_subscribe(observer):
accumulator = 0
source.subscribe(on_next, on_error, on_completed)

return Observable.create(on_subscribe)

The preceding code just returns an observable, with the nested on_subscribe subscription function. The on_subscribe function initializes the sum accumulator to 0 and subscribes to the source observable. So, when an observer subscribes to the observable returned by sum_evenon_subscribe is called, and the source observable is also subscribed. This is a chain of subscriptions. Finally, the callbacks of the source observer must be implemented as nested functions of on_subscribe, as follows:

        def on_next(i):
nonlocal accumulator
if i % 2 == 0:
accumulator += i
else:
observer.on_next(accumulator)
accumulator = i

def on_error(e):
observer.on_error()

def on_completed():
nonlocal accumulator
observer.on_next(accumulator)
observer.on_completed()

The on_next implementation should be clear. The accumulator is updated with the sum of items when they are even and is reset when they are odd. The value of the accumulator is emitted every time an odd number is received. The error and completion of the source observable are propagated to observer of the output observable. The complete code is as follows:

def sum_even(source):
def on_subscribe(observer):
accumulator = 0
def on_next(i):
nonlocal accumulator
if i % 2 == 0:
accumulator += i
else:
observer.on_next(accumulator)
accumulator = i

def on_error(e):
observer.on_error()

def on_completed():
nonlocal accumulator
observer.on_next(accumulator)
observer.on_completed()

source.subscribe(on_next, on_error, on_completed)

return Observable.create(on_subscribe)

numbers = Observable.from_([2,2,4,5,2])
sum_even(numbers).subscribe(
on_next=lambda i: print("item: {}".format(i)),
on_error=lambda e: print("error: {}".format(e)),
on_completed=lambda: print("completed")
)

The preceding code provides the following output:

item: 8
item: 7
completed

The two items received correspond to the sum of 2, 2, 4, and the sum of 5 and 2. The completion is correctly received after these two items.

主站蜘蛛池模板: 永靖县| 余干县| 南宁市| 新建县| 南郑县| 沈阳市| 云霄县| 叙永县| 开原市| 扎兰屯市| 通许县| 张家界市| 台中市| 丰城市| 绿春县| 延寿县| 开封县| 泰安市| 本溪市| 甘洛县| 密山市| 云霄县| 盖州市| 渭源县| 交城县| 双城市| 石狮市| 乐东| 湟源县| 宁阳县| 临汾市| 略阳县| 浦县| 泰安市| 荃湾区| 旬邑县| 长葛市| 灵宝市| 武威市| 洛阳市| 永宁县|