- Mastering Concurrency in Python
- Quan Nguyen
- 947字
- 2021-06-10 19:24:04
Queuing in concurrent programming
The concept of a queue is even more prevalent in the sub-field of concurrent programming, especially when we need to implement a fixed number of threads in our program to interact with a varying number of shared resources.
In the previous examples, we have learned to assign a specific task to a new thread. This means that the number of tasks that need to be processed will dictate the number of threads our program should spawn. (For example, in our Chapter03/example3.py file, we had five numbers as our input and we therefore created five threads—each took one input number and processed it.)
Sometimes it is undesirable to have as many threads as the tasks we have to process. Say we have a large number of tasks to be processed, then it will be quite inefficient to spawn the same large number of threads and have each thread execute only one task. It could be more beneficial to have a fixed number of threads (commonly known as a thread pool) that would work through the tasks in a cooperative manner.
Here is when the concept of a queue comes in. We can design a structure in which the pool of threads will not hold any information regarding the tasks they should each execute, instead the tasks are stored in a queue (in other words task queue), and the items in the queue will be fed to individual members of the thread pool. As a given task is completed by a member of the thread pool, if the task queue still contains elements to be processed, then the next element in the queue will be sent to the thread that just became available.
This diagram further illustrates this setup:
Let's consider a quick example in Python, in order to illustrate this point. Navigate to the Chapter03/example5.py file. In this example, we will be considering the problem of printing out all of the positive factors of an element in a given list of positive integers. We are still looking at the previous MyThread class, but with some adjustments:
# Chapter03/example5.py
import queue
import threading
import time
class MyThread(threading.Thread):
def __init__(self, name):
threading.Thread.__init__(self)
self.name = name
def run(self):
print('Starting thread %s.' % self.name)
process_queue()
print('Exiting thread %s.' % self.name)
def process_queue():
while True:
try:
x = my_queue.get(block=False)
except queue.Empty:
return
else:
print_factors(x)
time.sleep(1)
def print_factors(x):
result_string = 'Positive factors of %i are: ' % x
for i in range(1, x + 1):
if x % i == 0:
result_string += str(i) + ' '
result_string += '\n' + '_' * 20
print(result_string)
# setting up variables
input_ = [1, 10, 4, 3]
# filling the queue
my_queue = queue.Queue()
for x in input_:
my_queue.put(x)
# initializing and starting 3 threads
thread1 = MyThread('A')
thread2 = MyThread('B')
thread3 = MyThread('C')
thread1.start()
thread2.start()
thread3.start()
# joining all 3 threads
thread1.join()
thread2.join()
thread3.join()
print('Done.')
There is a lot going on, so let's break the program down into smaller pieces. First, let's look at our key function, as follows:
# Chapter03/example5.py
def print_factors(x):
result_string = 'Positive factors of %i are: ' % x
for i in range(1, x + 1):
if x % i == 0:
result_string += str(i) + ' '
result_string += '\n' + '_' * 20
print(result_string)
This function takes in an argument, x then iterates through all positive numbers between 1 and itself, to check whether a number is a factor of x. It finally prints out a formatted message that contains all of the information that it cumulates through the loop.
In our new MyThread class, when a new instance is initialized and started, the process_queue() function will be called. This function will first attempt to obtain the next element of the queue object that the my_queue variable holds in a non-blocking manner by calling the get(block=False) method. If a queue.Empty exception occurs (which indicates that the queue currently holds no value), then we will end the execution of the function. Otherwise we simply pass that element we just obtained to the print_factors() function.
# Chapter03/example5.py
def process_queue():
while True:
try:
x = my_queue.get(block=False)
except queue.Empty:
return
else:
print_factors(x)
time.sleep(1)
The my_queue variable is defined in our main function as a Queue object from the queue module that contains the elements in the input_ list:
# setting up variables
input_ = [1, 10, 4, 3]
# filling the queue
my_queue = queue.Queue(4)
for x in input_:
my_queue.put(x)
For the rest of the main program, we simply initiate and run three separate threads until all of them finish their respective execution. Here we choose to create only three threads to simulate the design that we discussed earlier—a fixed number of threads processing a queue of input whose number of elements can change independently:
# initializing and starting 3 threads
thread1 = MyThread('A')
thread2 = MyThread('B')
thread3 = MyThread('C')
thread1.start()
thread2.start()
thread3.start()
# joining all 3 threads
thread1.join()
thread2.join()
thread3.join()
print('Done.')
Run the program and you will see the following output:
> python example5.py
Starting thread A.
Starting thread B.
Starting thread C.
Positive factors of 1 are: 1
____________________
Positive factors of 10 are: 1 2 5 10
____________________
Positive factors of 4 are: 1 2 4
____________________
Positive factors of 3 are: 1 3
____________________
Exiting thread C.
Exiting thread A.
Exiting thread B.
Done.
In this example, we have implemented the structure that we discussed earlier: a task queue that holds all the tasks to be executed and a thread pool (threads A, B, and C) that interacts with the queue to process its elements individually.
- 演進式架構(原書第2版)
- Learning LibGDX Game Development(Second Edition)
- 自制編譯器
- INSTANT OpenCV Starter
- SoapUI Cookbook
- Spring Cloud Alibaba微服務架構設計與開發實戰
- 騰訊iOS測試實踐
- AWS Serverless架構:使用AWS從傳統部署方式向Serverless架構遷移
- R語言編程指南
- Java Web程序設計
- 薛定宇教授大講堂(卷Ⅳ):MATLAB最優化計算
- 名師講壇:Java微服務架構實戰(SpringBoot+SpringCloud+Docker+RabbitMQ)
- PhoneGap:Beginner's Guide(Third Edition)
- WordPress 4.0 Site Blueprints(Second Edition)
- 現代C++編程實戰:132個核心技巧示例(原書第2版)