Skip to content

Multiprocessing to Parrallel and Speed Up

This guide will show you how to enable multiprocessing to speed up your Python program.

Problem

You have a Python program that takes a long time to run and is performing CPU-bound (math, data processing, etc.), none I/O-bound (network, disk, etc.) tasks. You want to speed up the program by making it work in parallel.

Background

Note:

  1. Not all libraries support multiprocessing.
  2. This guide assumes you are familiar with Python and have a basic understanding of CPU and memory usage.
  3. Tasks that are CPU-bound can be sped up by this.
  4. Tasks that are I/O-bound can be sped up by using threading.
  5. Tasks are independent of each other. (next task does not depend on the previous task's result)

The idea of multi-processing is to use multiple CPU-cores (1 core per process) to run multiple independent tasks at the same time. This is different from multi-threading, which uses multiple threads (1 core multiple tasks) to share CPU resources when independent tasks take a lot of time waiting for external triggers (network request, GPIO input, time) by idling.

Each newly created processes will have its own memory space without sharing memory with the parent process. Therefore, data sharing between processes and the parent process is not as easy as sharing data between threads. Methods like spawning with input argument, and multiprocessing.Queue can be used.

A example of using multiprocessing to speed up network package (pcap) processing can be found in my GitHub.

Solution

Step 1: Make Sure to have Sufficent Resources

  1. CPU: The more cores you have, the more tasks you can run in parallel.
    • If you are using Windows, you can check Task Manager to see how many cores you have.
    • If you are using Linux, you can use tools like htop, top, or echo $(nproc) to see how many cores you have.
    • If you want to check with Python code, you can use the following code:
      import os
      print(os.cpu_count())
      
  2. Memory: Make sure you have enough memory to run multiple tasks at the same time. You can approximate the memory usage by multiplying the memory usage of one task by the number of tasks you want to run in parallel.
    • If you are using Windows, you can check Task Manager to see how much memory you have.
    • If you are using Linux, you can use tools like htop, top, or free -h to see how much memory you have.

Step 2: Use the multiprocessing Library

  • Using multiprocessing.Pool:
    code
    import multiprocessing as mp
    import time
    import os
    
    def new_process(required_iterable_item):
        time.sleep(1)
        print(f"Time: {time.time():.2f} | Process ID: {os.getpid()} | Process Name: {mp.current_process().name}")
    
    if __name__ == '__main__':
        mp.freeze_support()
        cpu_count = 4 #os.cpu_count()
        with mp.Pool(processes=cpu_count) as p:
            p.map(func=new_process, iterable=range(cpu_count))
    
    code output
    Time: 1713118619.71 | Process ID: 16624 | Process Name: SpawnPoolWorker-1
    Time: 1713118619.71 | Process ID: 12120 | Process Name: SpawnPoolWorker-2
    Time: 1713118619.71 | Process ID: 7640 | Process Name: SpawnPoolWorker-3
    Time: 1713118619.72 | Process ID: 4232 | Process Name: SpawnPoolWorker-4
    
    This method is useful when you have a list of tasks (more than CPU core count) to run in parallel. The Pool class will automatically launch processes when a core is available.
  • Using multiprocessing.Process:
    code
    import multiprocessing as mp
    import time
    import os
    
    def new_process(process_id):
        time.sleep(1)
        print(f"Time: {time.time():.2f} | Process ID: {os.getpid()} | Process Name: {mp.current_process().name}")
    
    if __name__ == '__main__':
        mp.freeze_support()
        cpu_count = 4 #os.cpu_count()
        for i in range(cpu_count):
            p = mp.Process(target=new_process, args=(i,))
            p.start() # Control when the process starts
            # p.join() # Halts the main process until the child process is done
    
    code output
    Time: 1713118698.65 | Process ID: 8864 | Process Name: Process-1
    Time: 1713118698.66 | Process ID: 17096 | Process Name: Process-2
    Time: 1713118698.66 | Process ID: 5608 | Process Name: Process-3
    Time: 1713118698.66 | Process ID: 10104 | Process Name: Process-4
    
    This method is useful when you want to manually control the number of processes to run in parallel. After each process is created, you can call process.start() to start the process and process.join() to wait for the process to finish.

Note:

  1. if __name__ == '__main__': in line 8 is required to prevent infinite recursion when creating new processes. This is because the new processes will import the main module again, and the if __name__ == '__main__': block will prevent the new processes from running the code inside the block.
  2. multiprocessing.freeze_support() in line 9 is recommended to be called after the if __name__ == '__main__': block to allow the program.
  3. Format whatever process you want to run into a function for both methods. In this case, new_process in line 4 is the function that runs in parallel, can you can see they all stops at the same time instead of executing linearly.

Step 3: Send Parent Process Data to Child Process

For the following example, we will only use multiprocessing.Pool method. It is interchangable with multiprocessing.Process method.

code
import multiprocessing as mp
import time

def new_process(input_data):
    time.sleep(1)
    idx_start, idx_end = input_data
    print(f"Time: {time.time():.2f} | Start: {idx_start} | End: {idx_end}")

if __name__ == '__main__':
    mp.freeze_support()
    cpu_count = 4 #os.cpu_count()

    # Create a list of input data
    increment = 1
    input_data = [(i*increment, (i+1)*increment) for i in range(cpu_count)]

    with mp.Pool(processes=cpu_count) as p:
        p.map(func=new_process, iterable=input_data)
code output
Time: 1713111614.02 | Start: 0 | End: 1
Time: 1713111614.02 | Start: 1 | End: 2
Time: 1713111614.02 | Start: 2 | End: 3
Time: 1713111614.02 | Start: 3 | End: 4

  • Line 14-15: Create a list of tuples containing different arguments for the function.
  • Line 18: Spawn a pool of processes with independent input arguments.
  • Line 6: Unpack the tuple to retrieve the arguments.

You can see from the output that the parent process sends different arguments to the child process, and the child process receives the arguments correctly and prints them out.

Step 4: Return Child Process Data to Parent Process with multiprocessing.Manager

For the following example, we will only use multiprocessing.Pool method and use multiprocessing.Manager().dict() to create a shared memory space between the parent and child processes.

With this method, child processes can also read and write to the shared memory space, which is useful when different child processes are responsible for different tasks and need to share data with themselves.

code
import multiprocessing as mp
import time
import collections

def new_process(input_data):
    time.sleep(1)
    idx_start, idx_end, return_dict = input_data
    calculation_result = idx_start + idx_end
    print(f"Time: {time.time():.2f} | Start: {idx_start} | End: {idx_end} | Result: {calculation_result}")
    return_dict[idx_start] = calculation_result

if __name__ == '__main__':
    mp.freeze_support()
    cpu_count = 4 #os.cpu_count()
    manager = mp.Manager()
    return_dict = manager.dict()

    # Create a list of input data
    increment = 1
    input_data = [(i*increment, (i+1)*increment, return_dict) for i in range(cpu_count)]

    with mp.Pool(processes=cpu_count) as p:
        p.map(func=new_process, iterable=input_data)
    print(f"Return Dict: {return_dict}")

    return_dict = collections.OrderedDict(sorted(return_dict.items()))
    print(f"Ordered Dict: {return_dict}")

    final_arr = []
    for key, value in return_dict.items():
        #print(f"Key: {key} / Value: {value}")
        final_arr.append(value)
    print(f"Final Array: {final_arr}")
code output
Time: 1713112527.44 | Start: 1 | End: 2 | Result: 3
Time: 1713112527.44 | Start: 0 | End: 1 | Result: 1
Time: 1713112527.44 | Start: 2 | End: 3 | Result: 5
Time: 1713112527.44 | Start: 3 | End: 4 | Result: 7
Return Dict: {1: 3, 0: 1, 2: 5, 3: 7}
Ordered Dict: OrderedDict({0: 1, 1: 3, 2: 5, 3: 7})
Final Array: [1, 3, 5, 7]

  • Line 15-16: Use multiprocessing.Manager().dict() to create a shared dictionary between the parent and child processes. In this case, child processes write to the dictionary, and the parent process read from the dictionary, this process is bi-directional.
  • Line 20: Add the shared dictionary to the input arguments.
  • Line 8: Perform some kind of operation in the child process.
  • Line 10: Write the result to the shared dictionary with idx_start as the key. This helps to keep track of the order of the results.
  • Line 24: Read the shared dictionary populated by the child processes.
  • Line 26-27: Order the dictionary by key and print the result, since child processes write to the dictionary once they finish, the order of the dictionary is not guaranteed. Depending on your use case, this step is optional.
  • Line 29-33: Retrieve data from shared dictionary as a list and print the result.

Step 5: Share Data Between Parent and Child Processes with multiprocessing.Queue

For the following example, we will only use multiprocessing.Pool method and use multiprocessing.Queue() to create a shared memory space between the parent and child processes.

With this method, child processes can also read and write to the shared memory space, which is useful when different child processes are responsible for different tasks and need to share data with themselves.

Before using this, you need to know that a queue is a First-In-First-Out (FIFO) data structure, which means that the first element added to the queue will be the first element removed from the queue. Following are the common method for queue

  • put(): Add an item to the queue. (at the end, last one to be removed)
  • get(): Remove and return an item from the queue. (from the front, the oldest one)

code
import multiprocessing as mp
import time
import os

def worker_main(input_queue, output_queue):
    print(f"Time: {time.time():.2f} | Process ID: {os.getpid()} | working")
    while True:
        item = input_queue.get()
        print(f"Time: {time.time():.2f} | Process ID: {os.getpid()} | Received: {item}")
        time.sleep(1) # simulate a "long" operation
        calculation_result = item["idx_start"] + item["idx_end"]
        item["result"] = calculation_result
        item["status"] = "done"
        output_queue.put(item)
        print(f"Time: {time.time():.2f} | Process ID: {os.getpid()} | Result: {calculation_result}")

if __name__ == '__main__':
    mp.freeze_support()
    cpu_count = 4 #os.cpu_count()
    input_queue = mp.Queue()
    output_queue = mp.Queue()

    the_pool = mp.Pool(processes=cpu_count, initializer=worker_main, initargs=(input_queue, output_queue))

    increment = 1
    for i in range(cpu_count*2):
        data = {}
        data["status"] = "new"
        data["idx_start"] = i
        data["idx_end"] = (i+1)*increment
        input_queue.put(data)

    while (output_queue.qsize() < cpu_count*2):
        time.sleep(1)

    results = []
    while not output_queue.empty():
        item = output_queue.get()
        results.append(item)
    print(f"Time: {time.time():.2f} | Results: {results}")
code output
Time: 1713117240.12 | Process ID: 16284 | working
Time: 1713117240.12 | Process ID: 16284 | Received: {'status': 'new', 'idx_start': 0, 'idx_end': 1}
Time: 1713117240.12 | Process ID: 3852 | working
Time: 1713117240.12 | Process ID: 3852 | Received: {'status': 'new', 'idx_start': 1, 'idx_end': 2}
Time: 1713117240.12 | Process ID: 16912 | working
Time: 1713117240.12 | Process ID: 16912 | Received: {'status': 'new', 'idx_start': 2, 'idx_end': 3}
Time: 1713117240.12 | Process ID: 884 | working
Time: 1713117240.12 | Process ID: 884 | Received: {'status': 'new', 'idx_start': 3, 'idx_end': 4}
Time: 1713117241.12 | Process ID: 16284 | Result: 1
Time: 1713117241.12 | Process ID: 16284 | Received: {'status': 'new', 'idx_start': 4, 'idx_end': 5}
Time: 1713117241.12 | Process ID: 3852 | Result: 3
Time: 1713117241.12 | Process ID: 3852 | Received: {'status': 'new', 'idx_start': 5, 'idx_end': 6}
Time: 1713117241.12 | Process ID: 16912 | Result: 5
Time: 1713117241.12 | Process ID: 16912 | Received: {'status': 'new', 'idx_start': 6, 'idx_end': 7}
Time: 1713117241.13 | Process ID: 884 | Result: 7
Time: 1713117241.13 | Process ID: 884 | Received: {'status': 'new', 'idx_start': 7, 'idx_end': 8}
Time: 1713117242.12 | Process ID: 16284 | Result: 9
Time: 1713117242.12 | Process ID: 3852 | Result: 11
Time: 1713117242.12 | Process ID: 16912 | Result: 13
Time: 1713117242.13 | Process ID: 884 | Result: 15
Time: 1713117242.96 | Results: [{'status': 'done', 'idx_start': 0, 'idx_end': 1, 'result': 1}, {'status': 'done', 'idx_start': 1, 'idx_end': 2, 'result': 3}, {'status': 'done', 'idx_start': 2, 'idx_end': 3, 'result': 5}, {'status': 'done', 'idx_start': 3, 'idx_end': 4, 'result': 7}, {'status': 'done', 'idx_start': 4, 'idx_end': 5, 'result': 9}, {'status': 'done', 'idx_start': 5, 'idx_end': 6, 'result': 11}, {'status': 'done', 'idx_start': 6, 'idx_end': 7, 'result': 13}, {'status': 'done', 'idx_start': 7, 'idx_end': 8, 'result': 15}]

  • Line 20-21: Create a queue to let parent send data to child processes, and another queue to let child processes send data back to the parent.
  • Line 23: Spawn a pool of processes and expose the queues to the child processes. Note that these processes will be reused for different tasks.
  • Line 31: Put the data into the queue to send to the child processes. The data can be any Python object, I use a dictionary for this example.
  • Line 8: While the queue is not empty, get the data from the queue.
  • Line 14: After processing the data, put the result into another queue to send back to the parent process.
  • Line 33-34: Wait for all child processes to finish. Note that these processes are not terminated, and will execute till the end of the program. You can terminate them if you want.
  • Line 36-39: Get the result from the queue.

This method is useful when each child process requires relaively small amount of time to process or you do not have sufficient memory to store several copies of the data. Each child process will live until all data is processed and dynamically accepts new tasks passed by parent process.

Reference

  1. https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing
  2. https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool
  3. https://docs.python.org/3/library/queue.html
  4. https://stackoverflow.com/questions/44660676/python-using-multiprocessing
  5. https://stackoverflow.com/questions/17241663/filling-a-queue-and-managing-multiprocessing-in-python
  6. https://stackoverflow.com/questions/11515944/how-to-use-multiprocessing-queue-in-python
  7. https://stackoverflow.com/questions/1540822/dumping-a-multiprocessing-queue-into-a-list

Error Correction

If you find any mistakes in the document, please create an Issue or a Pull request or leave a message in Discussions or send me a mail directly with the mail icon at the bottom right. Thank you!