10 Matching Annotations
  1. Dec 2023
    1. When should you use multiprocessing vs asyncio or threading?
      1. Use multiprocessing when you need to do many heavy calculations and you can split them up.
      2. Use asyncio or threading when you're performing I/O operations -- communicating with external resources or reading/writing from/to files.
      3. Multiprocessing and asyncio can be used together, but a good rule of thumb is to fork a process before you thread/use asyncio instead of the other way around -- threads are relatively cheap compared to processes.
    2. Is it possible to combine asyncio with multiprocessing?

      We can do that too.

    1. Running the code in a subprocess is much slower than running a thread, not because the computation is slower, but because of the overhead of copying and (de)serializing the data. So how do you avoid this overhead?

      Reducing the performance hit of copying data between processes:

      Option #1: Just use threads

      Processes have overhead, threads do not. And while it’s true that generic Python code won’t parallelize well when using multiple threads, that’s not necessarily true for your Python code. For example, NumPy releases the GIL for many of its operations, which means you can use multiple CPU cores even with threads.

      ``` # numpy_gil.py import numpy as np from time import time from multiprocessing.pool import ThreadPool

      arr = np.ones((1024, 1024, 1024))

      start = time() for i in range(10): arr.sum() print("Sequential:", time() - start)

      expected = arr.sum()

      start = time() with ThreadPool(4) as pool: result = pool.map(np.sum, [arr] * 10) assert result == [expected] * 10 print("4 threads:", time() - start) ```

      When run, we see that NumPy uses multiple cores just fine when using threads, at least for this operation:

      $ python numpy_gil.py Sequential: 4.253053188323975 4 threads: 1.3854241371154785

      Pandas is built on NumPy, so many numeric operations will likely release the GIL as well. However, anything involving strings, or Python objects in general, will not. So another approach is to use a library like Polars which is designed from the ground-up for parallelism, to the point where you don’t have to think about it at all, it has an internal thread pool.

      Option #2: Live with it

      If you’re stuck with using processes, you might just decide to live with the overhead of pickling. In particular, if you minimize how much data gets passed and forth between processes, and the computation in each process is significant enough, the cost of copying and serializing data might not significantly impact your program’s runtime. Spending a few seconds on pickling doesn’t really matter if your subsequent computation takes 10 minutes.

      Option #3: Write the data to disk

      Instead of passing data directly, you can write the data to disk, and then pass the path to this file: * to the subprocess (as an argument) * to parent process (as the return value of the function running in the worker process).

      The recipient process can then parse the file.

      ``` import pandas as pd import multiprocessing as mp from pathlib import Path from tempfile import mkdtemp from time import time

      def noop(df: pd.DataFrame): # real code would process the dataframe here pass

      def noop_from_path(path: Path): df = pd.read_parquet(path, engine="fastparquet") # real code would process the dataframe here pass

      def main(): df = pd.DataFrame({"column": list(range(10_000_000))})

      with mp.get_context("spawn").Pool(1) as pool:
          # Pass the DataFrame to the worker process
          # directly, via pickling:
          start = time()
          pool.apply(noop, (df,))
          print("Pickling-based:", time() - start)
      
          # Write the DataFrame to a file, pass the path to
          # the file to the worker process:
          start = time()
          path = Path(mkdtemp()) / "temp.parquet"
          df.to_parquet(
              path,
              engine="fastparquet",
              # Run faster by skipping compression:
              compression="uncompressed",
          )
          pool.apply(noop_from_path, (path,))
          print("Parquet-based:", time() - start)
      

      if name == "main": main() `` **Option #4:multiprocessing.shared_memory`**

      Because processes sometimes do want to share memory, operating systems typically provide facilities for explicitly creating shared memory between processes. Python wraps this facilities in the multiprocessing.shared_memory module.

      However, unlike threads, where the same memory address space allows trivially sharing Python objects, in this case you’re mostly limited to sharing arrays. And as we’ve seen, NumPy releases the GIL for expensive operations, which means you can just use threads, which is much simpler. Still, in case you ever need it, it’s worth knowing this module exists.

      Note: The module also includes ShareableList, which is a bit like a Python list but limited to int, float, bool, small str and bytes, and None. But this doesn’t help you cheaply share an arbitrary Python object.

      A bad option for Linux: the "fork" context

      You may have noticed we did multiprocessing.get_context("spawn").Pool() to create a process pool. This is because Python has multiple implementations of multiprocessing on some OSes. "spawn" is the only option on Windows, the only non-broken option on macOS, and available on Linux. When using "spawn", a completely new process is created, so you always have to copy data across.

      On Linux, the default is "fork": the new child process has a complete copy of the memory of the parent process at the time of the child process’ creation. This means any objects in the parent (arrays, giant dicts, whatever) that were created before the child process was created, and were stored somewhere helpful like a module, are accessible to the child. Which means you don’t need to pickle/unpickle to access them.

      Sounds useful, right? There’s only one problem: the "fork" context is super-broken, which is why it will stop being the default in Python 3.14.

      Consider the following program:

      ``` import threading import sys from multiprocessing import Process

      def thread1(): for i in range(1000): print("hello", file=sys.stderr)

      threading.Thread(target=thread1).start()

      def foo(): pass

      Process(target=foo).start() ```

      On my computer, this program consistently deadlocks: it freezes and never exits. Any time you have threads in the parent process, the "fork" context can cause in potential deadlocks, or even corrupted memory, in the child process.

      You might think that you’re fine because you don’t start any threads. But many Python libraries start a thread pool on import, for example NumPy. If you’re using NumPy, Pandas, or any other library that depends on NumPy, you are running a threaded program, and therefore at risk of deadlocks, segfaults, or data corruption when using the "fork" multiprocessing context. For more details see this article on why multiprocessing’s default is broken on Linux.

      You’re just shooting yourself in the foot if you take this approach.

    1. In sync code, you might use

      a thread pool and imap_unordered():

      ``` pool = multiprocessing.dummy.Pool(2)

      for result in pool.imap_unordered(do_stuff, things_to_do): print(result) ```

      Here, concurrency is limited by the fixed number of threads.

    1. Tips
      • if name == "main" is important for multiprocessing because it will spawn a new Python, that will import the module. You don't want this module to spawn a new Python that imports the module that will spawn a new Python...
      • If the function to submit to the executor has complicated arguments to be passed to it, use a lambda or functools.partial.
      • max_worker = 1 is a very nice way to get a poor man’s task queue.
    1. Inter-Worker communication

      Whether using sub interpreters or multiprocessing you cannot simply send existing Python objects to worker processes.

      Multiprocessing uses pickle by default. When you start a process or use a process pool, you can use pipes, queues and shared memory as mechanisms to sending data to/from the workers and the main process. These mechanisms revolve around pickling. Pickling is the builtin serialization library for Python that can convert most Python objects into a byte string and back into a Python object.

      Pickle is very flexible. You can serialize a lot of different types of Python objects (but not all) and Python objects can even define a method for how they can be serialized. It also handles nested objects and properties. However, with that flexibility comes a performance hit. Pickle is slow. So if you have a worker model that relies upon continuous inter-worker communication of complex pickled data you’ll likely see a bottleneck.

      Sub interpreters can accept pickled data. They also have a second mechanism called shared data. Shared data is a high-speed shared memory space that interpreters can write to and share data with other interpreters. It supports only immutable types, those are:

      • Strings
      • Byte Strings
      • Integers and Floats
      • Boolean and None
      • Tuples (and tuples of tuples)

      To share data with an interpreter, you can either set it as initialization data or you can send it through a channel.

    2. The next point when using a parallel execution model like multiprocessing or sub interpreters is how you share data.

      Once you get over the hurdle of starting one, this quickly becomes the most important point. You have two questions to answer:

      • How do we communicate between workers?
      • How do we manage the state of workers?
    3. Both multiprocessing processes and interpreters have their own import state. This is drastically different to threads and coroutines. When you await an async function, you don’t need to worry about whether that coroutine has imported the required modules. The same applies for threads.

      For example, you can import something in your module and reference it from inside the thread function:

      ```python import threading from super.duper.module import cool_function

      def worker(info): # This already exists in the interpreter state cool_function()

      info = {'a': 1} thread = Thread(target=worker, args=(info, )) ```

    4. Another important point is that multiprocessing is often used in a model where the processes are long-running and handed lots of tasks instead of being spawned and destroyed for a single workload. One great example is Gunicorn, the popular Python web server. Gunicorn will spawn “workers” using multiprocessing and those workers will live for the lifetime of the main process. The time to start a process or a sub interpreter then becomes irrelevant (at 89 ms or 1 second) when the web worker can be running for weeks, months or years. The ideal way to use these parallel workers for small tasks (like handle a single web request) is to keep them running and use a main process to coordinate and distribute the workload
    5. What is the difference between threading, multiprocessing, and sub interpreters?

      The Python standard library has a few options for concurrent programming, depending on some factors:

      • Is the task you’re completing IO-bound (e.g. reading from a network, writing to disk)
      • Does the task require CPU-heavy work, e.g. computation
      • Can the tasks be broken into small chunks or are they large pieces of work?

      Here are the models:

      • Threads are fast to create, you can share any Python objects between them and have a small overhead. Their drawback is that Python threads are bound to the GIL of the process, so if the workload is CPU-intensive then you won’t see any performance gains. Threading is very useful for background, polling tasks like a function that waits and listens for a message on a queue.
      • Coroutines are extremely fast to create, you can share any Python objects between them and have a miniscule overhead. Coroutines are ideal for IO-based activity that has an underlying API that supports async/await.
      • Multiprocessing is a Python wrapper that creates Python processes and links them together. These processes are slow to start, so the workload that you give them needs to be large enough to see the benefit of parallelising the workload. However, they are truly parallel since each one has it’s own GIL.
      • Sub interpreters have the parallelism of multiprocessing, but with a much faster startup time.