#StackBounty: #python #multiprocessing Class with multiple workers

Bounty: 200

I am building a tool that interacts with a batched stream of incoming data. This data needs to be processed and the result returned. To split up the work I have created a class that has inbound (_in) and outbound (out) queues and workers that are getting, processing, and depositing the work.

This example takes an iterable of numbers (in pass_data) and multiplies them by f.

import queue, random, time
from multiprocessing import Process, Queue

def _worker(_in, out, f):
    """Get work from _in and output processed data to out"""
    while True:
            work = _in.get()
        except queue.Empty:
        # simulate blocking for some time
        time.sleep(random.uniform(0.01, 0.5))
        out.put(work * f)

class C:
    def __init__(self, f, threads=2):
        self.f = f
        self.threads = threads
        self._in, self.out = Queue(), Queue()
        self.args = (self._in, self.out, self.f)
        self.workers = [
            Process(target=_worker, args=self.args) for _ in range(self.threads)

    def __repr__(self):
        return f"{self.__class__.__name__}(threads={self.threads})"

    def start(self):
        """Start all workers"""
        for worker in self.workers:

    def terminate(self):
        """Terminate all workers"""
        for worker in self.workers:

    def pass_data(self, data):
        """Pass data to the queue to be processed"""
        for rec in data:

    def get_completed(self):
        """Return a list of processed data"""
        items = []
        while True:
            except queue.Empty:
        return items

if __name__ == "__main__":
    c = C(f=12, threads=2)

    for i in range(5):
        s = 0
        n = random.randint(1, 20)
        print(f"sent: {n}")
        while s < n:
            r = c.get_completed()
            s += len(r)
            if r:
                print(len(r), end=", ")
            time.sleep(random.uniform(0.01, 0.4))

This is, at the moment, a proof of concept. Are there any pitfalls to this method? Is there a better way to do this already?!

Aspects that I intend to address:

  • queue size limits
  • thread number limits

Get this bounty!!!

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.