官术网_书友最值得收藏!

Adding the program logic

Now that all the elements needed to implement the asynchronous reactive server are implemented, the application logic can be added. This part is implemented as a function that takes an object of observables as input and returns an object of observables as output. It is a component.

The following figure shows the reactivity diagram of this component:

Figure 3.9: The echo_server component reactivity diagram

The empty skeleton of this function is the following one:

def echo_server(source):
return {
'http': Observable.empty()
}

This component is implemented as a pure function. Its behavior depends only on the value of the input parameter; that is, the content of the http observable contained in the source object. For now, this function returns an empty observable. This will be completed once the body of the function is completed.

The first part of this component is the configuration and initialization of the HTTP server. This is done in a dedicated observable that contains the definition of the HTTP route and the request to start the server:

init = Observable.from_([
{
'what': 'add_route',
'methods': ['GET'],
'path': '/echo/{what}',
}, {
'what': 'start_server',
'host': 'localhost',
'port': 8080
}
])

The first item will add a new route for the GET method on the /echo/ path. Note the usage of a variable resource, the {what} part of the path, to retrieve the text to echo. The second items effectively start the server on localhost and port 8080.

The second part of the component consists of answering the echo requests. The echo requests come from the observable present in the http field of the source dictionary. The answer is simply built by mapping the source request item to a response item:

    echo = source['http'] \
.map(lambda i: {
'what': 'response',
'status': 200,
'context': i['context'],
'data': i['match_info']['what'].encode('utf-8'),
})

The implementation is straightforward and consists of lambda. The returned item is a response item, with status 200 (the status code for OK in an HTTP response), and the data retrieved from the variable resource that was declared in the route. This value is retrieved from the match_info field of the request item. This text value must be encoded so that aiohttp can put it in the body of the response. The response items are available in the echo observable.

Now that all the logic is implemented, these two observables must be returned so that they can feed the HTTP driver. This is done with the merge operator:

    return {
'http': Observable.merge(init, echo),
}

The full code of the echo component is the following one:

def echo_server(source):
init = Observable.from_([
{
'what': 'add_route',
'methods': ['GET'],
'path': '/echo/{what}',
}, {
'what': 'start_server',
'host': 'localhost',
'port': 8080
}
])

echo = source['http'] \
.map(lambda i: {
'what': 'response',
'status': 200,
'context': i['context'],
'data': i['match_info']['what'].encode('utf-8'),
})

return {
'http': Observable.merge(init, echo),
}

The full code of the server is available in the rx_http_echo.py script. It can be tested the same way as with the previous implementation of asyncio. Start the server in a terminal, and then in another Terminal, use curl to test it:

$ curl http://localhost:8080/echo/hello
hello
$ curl http://localhost:8080/echo/foo
foo

This implementation does not stop the server. As an exercise, you can add another route such as /exit that will ask the HTTP driver to stop the server.

主站蜘蛛池模板: 波密县| 玉环县| 绿春县| 普宁市| 北碚区| 沂水县| 漳州市| 吉林省| 丹阳市| 五峰| 师宗县| 广水市| 虹口区| 哈密市| 烟台市| 萨嘎县| 缙云县| 新兴县| 大洼县| 永寿县| 日土县| 凉城县| 西充县| 砀山县| 郧西县| 聂荣县| 山东| 湖南省| 改则县| 博客| 义乌市| 白玉县| 仲巴县| 普兰店市| 德清县| 南华县| 原平市| 琼海市| 巴彦县| 灵寿县| 平顺县|