Python Multiprocessing samples in API Backend

Python Multiprocessing in API Backend

Multiprocessing in can significantly improve the of an backend, especially for -bound tasks, by leveraging multiple CPU cores. Unlike multithreading, multiprocessing creates separate Python processes, each with its own memory space, effectively bypassing the Global Interpreter Lock (GIL).

Understanding Multiprocessing

Multiprocessing creates a new process for each task, allowing true parallel execution on multi-core systems. This avoids the GIL limitation. However, inter-process communication (IPC) is more complex and expensive than inter-thread communication.

Use Cases

Here are some common use cases for multiprocessing in an API backend:

  • CPU-bound tasks: For computationally intensive tasks such as /video processing, machine learning inference, or complex calculations, multiprocessing can provide a significant performance boost.
  • Long-running tasks: Offloading long-running tasks to separate processes prevents them from blocking the main process and improves the responsiveness of the API.
  • Resource-intensive operations: When tasks consume significant system resources (e.g., memory or CPU), running them in separate processes can isolate them and prevent them from impacting the main process.

Examples

1. Concurrent CPU-bound Tasks

This example demonstrates how to use multiprocessing to perform CPU-bound tasks concurrently using the concurrent.futures module.


import concurrent.futures
import time
from flask import Flask, jsonify

app = Flask(__name__)

def cpu_bound_task(n):
    """Simulates a CPU-bound task."""
    sum = 0
    for i in range(n):
        sum += i * i
    return sum

@app.route('/concurrent_cpu_tasks/')
def concurrent_cpu_tasks():
    """Executes CPU-bound tasks concurrently using processes."""
    tasks = [10000000, 8000000, 12000000, 9000000]  # Increased task size

    start_time = time.time()
    results = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        future_to_task = {executor.submit(cpu_bound_task, task): task for task in tasks}
        for future in concurrent.futures.as_completed(future_to_task):
            results.append(future.result())
    end_time = time.time()
    total_time = end_time - start_time
    return jsonify({'results': results, 'total_time': total_time})

if __name__ == '__main__':
    app.run(debug=True, processes=4)  # Use processes=4
    

In this example:

  • The cpu_bound_task function simulates a CPU-bound task by performing a calculation.
  • The concurrent_cpu_tasks route uses concurrent.futures.ProcessPoolExecutor to create a pool of worker processes.
  • The executor.submit method schedules the cpu_bound_task function to be executed by a process for each task.
  • The results are collected and returned as a response, including the time taken.
  • The Flask application is run with processes=4. This tells Flask to use 4 processes to handle requests.

2. Background Task with Process

This example demonstrates how to use a process to offload a long-running task to the background.


import multiprocessing
import time
from flask import Flask, jsonify, request

app = Flask(__name__)

def background_task(message, duration):
    """Simulates a long-running background task."""
    print(f"Background task (process) started with message: {message}, duration: {duration} seconds")
    time.sleep(duration)
    print(f"Background task (process) finished with message: {message}")

@app.route('/start_background_process/', methods=['POST'])
def start_background_process():
    """Starts a background task in a separate process."""
    data = request.get_json()
    if not data or 'message' not in data or 'duration' not in data:
        return jsonify({'error': 'Missing message or duration in request'}), 400
    message = data['message']
    duration = data['duration']

    # Create a process and start it
    process = multiprocessing.Process(target=background_task, args=(message, duration))
    process.start()

    return jsonify({'message': 'Background task started successfully'}), 202  # 202 Accepted

if __name__ == '__main__':
    app.run(debug=True, processes=4)
    

In this example:

  • The background_task function simulates a long-running task using time.sleep.
  • The start_background_process route receives a message and duration from the client.
  • A multiprocessing.Process object is created, specifying the background_task function as the target and the message and duration as arguments.
  • The process.start() method starts the process, which executes the background_task function in the background.
  • The API returns a 202 Accepted status code, indicating that the request has been accepted for processing.
  • The Flask application is run with processes=4.

3. Process Pool for Parallel Processing

This example demonstrates how to use a process pool to parallelize a task across multiple input values.


import multiprocessing
from flask import Flask, jsonify
import time

app = Flask(__name__)

def process_data(item):
    """Simulates processing a data item."""
    time.sleep(0.5)  # Simulate some processing time
    return f"Processed: {item * 2}"

@app.route('/parallel_data_processing/')
def parallel_data_processing():
    data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    start_time = time.time()

    with multiprocessing.Pool(processes=5) as pool:
        results = pool.map(process_data, data)

    end_time = time.time()
    total_time = end_time - start_time
    return jsonify({'results': results, 'total_time': total_time})
    

In this example:

  • The process_data function simulates processing a single data item.
  • The /parallel_data_processing/ route defines a list of data and uses multiprocessing.Pool to create a pool of worker processes.
  • The pool.map() function applies the process_data function to each item in the data list, distributing the work across the processes in the pool.
  • The results are collected and returned as a JSON response.

4. Sharing Data Between Processes using a Queue

This example demonstrates how to share data between processes using a multiprocessing.Queue.


import multiprocessing
import time
from flask import Flask, jsonify

app = Flask(__name__)

def producer(queue):
    """Sends data to the queue."""
    for i in range(5):
        time.sleep(0.2)
        item = f"Item {i}"
        queue.put(item)
        print(f"Producer sent: {item}")
    queue.put(None)  # Signal the consumer to stop

def consumer(queue):
    """Receives data from the queue."""
    results = []
    while True:
        item = queue.get()
        if item is None:
            break
        result = f"Consumed: {item}"
        results.append(result)
        print(f"Consumer received: {item}")
    return results

@app.route('/process_queue_communication/')
def process_queue_communication():
    queue = multiprocessing.Queue()
    producer_process = multiprocessing.Process(target=producer, args=(queue,))
    consumer_process = multiprocessing.Process(target=consumer, args=(queue,))

    producer_process.start()
    consumer_process.start()
    producer_process.join()
    consumer_process.join()

    # Get results from consumer
    results = consumer_process.result
    return jsonify({'results': results})
    

In this example:

  • The producer function puts items into a multiprocessing.Queue.
  • The consumer function gets items from the queue and processes them.
  • The /process_queue_communication/ route creates a Queue, starts the producer and consumer processes, and then waits for them to finish.
  • The results from the consumer are then returned.

Important Considerations

  • GIL Bypass: Multiprocessing bypasses the GIL, allowing for true parallelism for CPU-bound tasks.
  • Increased Overhead: Creating and managing processes has higher overhead than threads in terms of memory and CPU usage.
  • IPC Complexity: Inter-process communication (IPC) is more complex than inter-thread communication. Data sharing between processes requires mechanisms like queues, pipes, or shared memory.
  • Memory Management: Each process has its own memory space, so sharing large data structures between processes can be inefficient and may require serialization/deserialization.
  • Flask Configuration: When using Flask, use the processes parameter in app.run() to specify the number of processes.
  • Process Safety: Ensure that any external resources or libraries you use are process-safe.
  • Data Serialization: Data that is sent between processes must be serializable (picklable).

AI AI Agent Algorithm Algorithms apache API Autonomous AWS Azure BigQuery Chatbot cloud cpu database Databricks Data structure Design embeddings gcp gpu indexing java json Kafka Life LLM monitoring N8n Networking nosql Optimization performance Platform Platforms postgres programming python RAG Spark sql tricks Trie vector Vertex AI Workflow

Leave a Reply

Your email address will not be published. Required fields are marked *