官术网_书友最值得收藏!

Queuing in concurrent programming

The concept of a queue is even more prevalent in the sub-field of concurrent programming, especially when we need to implement a fixed number of threads in our program to interact with a varying number of shared resources.

In the previous examples, we have learned to assign a specific task to a new thread. This means that the number of tasks that need to be processed will dictate the number of threads our program should spawn. (For example, in our Chapter03/example3.py file, we had five numbers as our input and we therefore created five threads—each took one input number and processed it.)

Sometimes it is undesirable to have as many threads as the tasks we have to process. Say we have a large number of tasks to be processed, then it will be quite inefficient to spawn the same large number of threads and have each thread execute only one task. It could be more beneficial to have a fixed number of threads (commonly known as a thread pool) that would work through the tasks in a cooperative manner.

Here is when the concept of a queue comes in. We can design a structure in which the pool of threads will not hold any information regarding the tasks they should each execute, instead the tasks are stored in a queue (in other words task queue), and the items in the queue will be fed to individual members of the thread pool. As a given task is completed by a member of the thread pool, if the task queue still contains elements to be processed, then the next element in the queue will be sent to the thread that just became available.

This diagram further illustrates this setup:

Queuing in threading

Let's consider a quick example in Python, in order to illustrate this point. Navigate to the Chapter03/example5.py file. In this example, we will be considering the problem of printing out all of the positive factors of an element in a given list of positive integers. We are still looking at the previous MyThread class, but with some adjustments:

# Chapter03/example5.py
import queue
import threading
import time


class MyThread(threading.Thread):
def __init__(self, name):
threading.Thread.__init__(self)
self.name = name

def run(self):
print('Starting thread %s.' % self.name)
process_queue()
print('Exiting thread %s.' % self.name)

def process_queue():
while True:
try:
x = my_queue.get(block=False)
except queue.Empty:
return
else:
print_factors(x)

time.sleep(1)

def print_factors(x):
result_string = 'Positive factors of %i are: ' % x
for i in range(1, x + 1):
if x % i == 0:
result_string += str(i) + ' '
result_string += '\n' + '_' * 20

print(result_string)


# setting up variables
input_ = [1, 10, 4, 3]

# filling the queue
my_queue = queue.Queue()
for x in input_:
my_queue.put(x)


# initializing and starting 3 threads
thread1 = MyThread('A')
thread2 = MyThread('B')
thread3 = MyThread('C')

thread1.start()
thread2.start()
thread3.start()

# joining all 3 threads
thread1.join()
thread2.join()
thread3.join()

print('Done.')

There is a lot going on, so let's break the program down into smaller pieces. First, let's look at our key function, as follows:

# Chapter03/example5.py

def print_factors(x):
result_string = 'Positive factors of %i are: ' % x
for i in range(1, x + 1):
if x % i == 0:
result_string += str(i) + ' '
result_string += '\n' + '_' * 20

print(result_string)

This function takes in an argument, x then iterates through all positive numbers between 1 and itself, to check whether a number is a factor of x. It finally prints out a formatted message that contains all of the information that it cumulates through the loop.

In our new MyThread class, when a new instance is initialized and started, the process_queue() function will be called. This function will first attempt to obtain the next element of the queue object that the my_queue variable holds in a non-blocking manner by calling the get(block=False) method. If a queue.Empty exception occurs (which indicates that the queue currently holds no value), then we will end the execution of the function. Otherwise we simply pass that element we just obtained to the print_factors() function.

# Chapter03/example5.py

def process_queue():
while True:
try:
x = my_queue.get(block=False)
except queue.Empty:
return
else:
print_factors(x)

time.sleep(1)

The my_queue variable is defined in our main function as a Queue object from the queue module that contains the elements in the input_ list:

# setting up variables
input_ = [1, 10, 4, 3]

# filling the queue
my_queue = queue.Queue(4)
for x in input_:
my_queue.put(x)

For the rest of the main program, we simply initiate and run three separate threads until all of them finish their respective execution. Here we choose to create only three threads to simulate the design that we discussed earlier—a fixed number of threads processing a queue of input whose number of elements can change independently:

# initializing and starting 3 threads
thread1 = MyThread('A')
thread2 = MyThread('B')
thread3 = MyThread('C')

thread1.start()
thread2.start()
thread3.start()

# joining all 3 threads
thread1.join()
thread2.join()
thread3.join()

print('Done.')

Run the program and you will see the following output:

> python example5.py
Starting thread A.
Starting thread B.
Starting thread C.
Positive factors of 1 are: 1
____________________
Positive factors of 10 are: 1 2 5 10
____________________
Positive factors of 4 are: 1 2 4
____________________
Positive factors of 3 are: 1 3
____________________
Exiting thread C.
Exiting thread A.
Exiting thread B.
Done.

In this example, we have implemented the structure that we discussed earlier: a task queue that holds all the tasks to be executed and a thread pool (threads A, B, and C) that interacts with the queue to process its elements individually.

主站蜘蛛池模板: 革吉县| 遂平县| 禹州市| 嘉黎县| 宝丰县| 甘德县| 砚山县| 武邑县| 稷山县| 夏邑县| 旺苍县| 桓仁| 张家口市| 伊春市| 邵东县| 美姑县| 那坡县| 娄底市| 启东市| 明水县| 舒兰市| 井研县| 柘荣县| 扬中市| 时尚| 中方县| 固镇县| 门源| 上饶县| 清徐县| 永川市| 永川市| 嘉义市| 龙南县| 赞皇县| 垫江县| 东乡县| 蓝田县| 德惠市| 新干县| 义马市|