- Distributed Computing with Python
- Francesco Pierfederici
- 978字
- 2021-07-09 19:30:14
An asynchronous example
To keep things simple but still interesting, let's write a tool that, given a text file, will count the occurrences of a given word. This example builds on the silly coroutine that we implemented in the previous section, adding some useful behavior to it.
It should be noted that, at least on a Linux or Mac OS X machine, one can achieve the same result very simply using the grep
command, as we will see. Let's start by downloading a significantly large text that we will use as input data for our experiments. Let's just choose a public domain book from Project Gutenberg: War and Peace by Leo Tolstoy, which is freely available at http://www.gutenberg.org/cache/epub/2600/pg2600.txt.
The following snippet shows how we can download this text very easily:
$ curl -sO http://www.gutenberg.org/cache/epub/2600/pg2600.txt $ wc pg2600.txt 65007 566320 3291648 pg2600.txt
Next, we will start by counting the number of occurrences of the word love, regardless of case, in the file we just downloaded using grep
, as the following snippet shows:
$ time (grep -io love pg2600.txt | wc -l) 677 (grep -io love pg2600.txt) 0.11s user 0.00s system 98% cpu 0.116 total
Let's now do the same thing in Python using coroutines, as shown in the following script (grep.py
):
def coroutine(fn): def wrapper(*args, **kwargs): c = fn(*args, **kwargs) next(c) return c return wrapper def cat(f, case_insensitive, child): if case_insensitive: line_processor = lambda l: l.lower() else: line_processor = lambda l: l for line in f: child.send(line_processor(line)) @coroutine def grep(substring, case_insensitive, child): if case_insensitive: substring = substring.lower() while True: text = (yield) child.send(text.count(substring)) @coroutine def count(substring): n = 0 try: while True: n += (yield) except GeneratorExit: print(substring, n) if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument('-i', action='store_true', dest='case_insensitive') parser.add_argument('pattern', type=str) parser.add_argument('infile', type=argparse.FileType('r')) args = parser.parse_args() cat(args.infile, args.case_insensitive, grep(args.pattern, args.case_insensitive, count(args.pattern)))
Before we walk through the code, let's run it and see how it compares to grep
:
$ time python3.5 grep.py -i love pg2600.txt love 677 python3.5 grep.py -i love pg2600.txt 0.09s user 0.01s system 97% cpu 0.097 total
As we saw, our pure Python version that uses coroutines is competitive with the Unix grep
command piped into wc
for counting lines. Of course, the Unix grep
command is significantly more powerful than our simple Python version. We cannot simply claim that Python is faster than C! At the same time, this is a pretty impressive result.
Let's walk through the code and see what is going on. We start off by reimplementing the coroutine
decorator we saw earlier. After that, we break the problem into three distinct steps:
- Reading the file line by line (done by the
cat
function) - Counting the occurrences of
substring
in each line (thegrep
coroutine) - Adding up all the numbers and printing out the total (the
count
coroutine)
In the main body of the script, we parse command-line options and then pipe the output of cat
to grep
and the output of grep
to count
, just like we would do with regular Unix tools.
This chaining is done very simply; we pass the coroutine that receives data as an argument (child
in the preceding example) to the function or coroutine that produces the data. We then, inside the data source, simply call the send
coroutine method.
The first function, cat
, acts as the data source for the whole program; it reads the file line by line and sends each line to grep
(child.send(line)
). If we want a case-insensitive match, then we simply make line
lowercase; otherwise, we pass it unchanged.
The grep
command is our first coroutine. In it, we enter an infinite loop where we keep receiving data (text = (yield))
, count the occurrences of substring
in text
, and send that number of occurrences to the next coroutine (count in our case): child.send(text.count(substring)))
.
The count
coroutine keeps a running total, n
, of the numbers it receives, (n += (yield))
, from grep
. It catches the GeneratorExit
exception sent to each coroutine when they are closed (which in our case happens automatically when we reach the end of the file) to know when to print out substring and n
.
Things become interesting when we start organizing coroutines into complex graphs. For instance, we might want to count the concurrence of multiple words in the input file.
The following code shows one way of doing this via a single extra coroutine responsible for broadcasting its input to an arbitrary number of child coroutines (mgrep.py
):
def coroutine(fn): def wrapper(*args, **kwargs): c = fn(*args, **kwargs) next(c) return c return wrapper def cat(f, case_insensitive, child): if case_insensitive: line_processor = lambda l: l.lower() else: line_processor = lambda l: l for line in f: child.send(line_processor(line)) @coroutine def grep(substring, case_insensitive, child): if case_insensitive: substring = substring.lower() while True: text = (yield) child.send(text.count(substring)) @coroutine def count(substring): n = 0 try: while True: n += (yield) except GeneratorExit: print(substring, n) @coroutine def fanout(children): while True: data = (yield) for child in children: child.send(data) if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument('-i', action='store_true', dest='case_insensitive') parser.add_argument('patterns', type=str, nargs='+',) parser.add_argument('infile', type=argparse.FileType('r')) args = parser.parse_args() cat(args.infile, args.case_insensitive, fanout([grep(p, args.case_insensitive, count(p)) for p in args.patterns]))
The code is virtually identical to the previous example (grep.py
). Let's look at the differences. We define the broadcaster: fanout
. The fanout()
coroutine takes a list of coroutines as input and then sits (as usual) in an infinite loop waiting for data. Once it receives data (data = (yield)
), it simply sends it to all registered coroutines (for child in children: child.send(data)
).
Without changing the code for cat
, grep
, and count
, we are able to generalize our program and search for an arbitrary number of strings in our text!
Performance is still very good, as the following snippet shows:
$ time python3.5 mgrep.py -i love hate hope pg2600.txt hate 103 love 677 hope 158 python3.5 mgrep.py -i love hate hope pg2600.txt 0.16s user 0.01s system 98% cpu 0.166 total
- 數(shù)據(jù)科學(xué)實(shí)戰(zhàn)手冊(cè)(R+Python)
- 從零開始構(gòu)建企業(yè)級(jí)RAG系統(tǒng)
- C語言最佳實(shí)踐
- Learning Apache Kafka(Second Edition)
- 匯編語言程序設(shè)計(jì)(第3版)
- Flux Architecture
- Elasticsearch Server(Third Edition)
- C語言程序設(shè)計(jì)
- Learning R for Geospatial Analysis
- Microsoft Dynamics AX 2012 R3 Financial Management
- QGIS Python Programming Cookbook(Second Edition)
- Django 3.0應(yīng)用開發(fā)詳解
- SQL Server 入門很輕松(微課超值版)
- Scratch從入門到精通
- scikit-learn Cookbook(Second Edition)