HEX
Server: Apache
System: Linux eisbus 6.8.12-9-pve #1 SMP PREEMPT_DYNAMIC PMX 6.8.12-9 (2025-03-16T19:18Z) x86_64
User: www-data (33)
PHP: 8.2.29
Disabled: NONE
Upload Files
File: //proc/400/root/lib/python2.7/dist-packages/multiprocessing_utils.py
# Copyright (c) 2011-2015 Liraz Siri <liraz@turnkeylinux.org>
#
# This file is part of turnkey-pylib.
#
# turnkey-pylib is open source software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.

import signal

import time
from multiprocessing import Process, Event, Condition, Value
from multiprocessing.queues import Queue, Empty

from threadloop import ThreadLoop

class WaitableQueue(Queue):
    """Queue that uses a semaphore to reliably count items in it"""
    class Vacuum(ThreadLoop):
        def __init__(self, q, l):
            def callback():
                q.wait_notempty(0.1)

                while True:
                    try:
                        val = q.get(False)
                        l.append(val)

                    except Empty:
                        break

            ThreadLoop.__init__(self, callback)

    def __init__(self, maxsize=0):
        self.cond_empty = Condition()
        self.cond_notempty = Condition()
        self._put_counter = Value('i', 0)

        Queue.__init__(self, maxsize)

    def put(self, obj, block=True, timeout=None):
        Queue.put(self, obj, block, timeout)
        self._put_counter.value += 1

        if self.qsize() != 0:
            self.cond_notempty.acquire()
            try:
                self.cond_notempty.notify_all()
            finally:
                self.cond_notempty.release()

    @property
    def put_counter(self):
        return self._put_counter.value

    def get(self, block=True, timeout=None):
        ret = Queue.get(self, block, timeout)
        if self.qsize() == 0:
            self.cond_empty.acquire()
            try:
                self.cond_empty.notify_all()
            finally:
                self.cond_empty.release()

        return ret

    def wait_empty(self, timeout=None):
        """Wait for all items to be got"""
        self.cond_empty.acquire()
        try:
            if self.qsize():
                self.cond_empty.wait(timeout)
        finally:
            self.cond_empty.release()

    def wait_notempty(self, timeout=None):
        """Wait for all items to be got"""
        self.cond_notempty.acquire()
        try:
            if self.qsize() == 0:
                self.cond_notempty.wait(timeout)
        finally:
            self.cond_notempty.release()

class Deferred:
    def __init__(self, callable, *args, **kwargs):
        self.callable = callable
        self.args = args
        self.kwargs = kwargs

    def __call__(self):
        return self.callable(*self.args, **self.kwargs)

class Parallelize:
    """
    Usage:

    1) You pass a sequence of callables to Parallelize. In the most simple
       usage case these will be functions, but they can also be callable
       instances.

    2) You get back a Parallelize instance which is another callable, which
       you call with input arguments to queue a parallelized call in one of
       the underlying processes.

    3) The .wait() method waits for all queued execution to finish.

    4) The .stop() method stops parallel execution.

       Returns an empty array if you call it after wait() has completed.

       Returns aborted inputs if you call it mid-execution (e.g., handling an
       exception such as Ctrl-C)

    5) The .results is an iterator that iterates over return values from workers.

       A background thread collects return values in real-time as soon as the
       underlying functions in various processes finish executing and return
       values.

       After .wait() the number of return values will be the same size as the number of calls
       to the Parallelize instance, unless parallelize was aborted.

    Note that you have to call either wait or stop or Parallelize to collect results.

    Exception handling:

        An uncaught exception inside a worker kills the worker, resubmits the
        input into the queue where it will be picked up by another worker.

    """
    class Error(Exception):
        pass

    class Worker(Process):
        class Retry(Exception):
            pass

        class Terminated(Exception):
            pass

        @classmethod
        def worker(cls, initialized, done, idle, q_input, q_output, executor):
            def raise_exception(s, f):
                signal.signal(s, signal.SIG_IGN)
                raise cls.Terminated

            signal.signal(signal.SIGTERM, raise_exception)
            signal.signal(signal.SIGINT, raise_exception)

            idle.clear()
            try:
                if isinstance(executor, Deferred):
                    executor = executor()
                    if not callable(executor):
                        raise Parallelize.Error("product of deferred executor %s is not callable" % `executor`)

                initialized.set()
            except cls.Terminated:
                return
            finally:
                idle.set()

            class UNDEFINED:
                pass

            try:
                while True:
                    if done.is_set():
                        return

                    retval = UNDEFINED
                    try:
                        input = q_input.get(timeout=0.1)
                    except Empty:
                        continue

                    idle.clear()

                    try:
                        if isinstance(input, tuple):
                            retval = executor(*input)
                        else:
                            retval = executor(input)
                        q_output.put(retval)

                    except cls.Retry:
                        q_input.put(input)

                    except: # uncaught exceptions destroy worker
                        if retval is UNDEFINED:
                            q_input.put(input)

                        raise

                    finally:
                        idle.set()

            except cls.Terminated:
                pass # just exit peacefully

        def __init__(self, q_input, q_output, executor):
            self.initialized = Event()
            self.idle = Event()
            self.done = Event()

            self.idle.set()

            Process.__init__(self,
                             target=self.worker,
                             args=(self.initialized, self.done, self.idle, q_input, q_output, executor))

        def is_busy(self):
            return self.is_alive() and not self.idle.is_set()

        def is_initialized(self):
            return self.initialized.is_set()

        def is_stopped(self):
            return self.done.is_set()

        def wait(self, timeout=None):
            """wait until Worker is idle"""
            return self.idle.wait(timeout)

        def stop(self):
            """let worker finish what it was doing and join"""
            if not self.is_alive():
                return

            self.done.set()

    class IterResults:
        def __init__(self, parent):
            self.parent = parent
            self.yielded = 0

        def __iter__(self):
            self.yielded = 0
            return self

        def next(self):
            finished = False

            while True:
                if len(self.parent._results) > self.yielded:
                    result = self.parent._results[self.yielded]
                    self.yielded += 1
                    return result

                if not finished:
                    finished = self.parent.wait(block=False)

                if finished and self.yielded == len(self.parent._results):
                    raise StopIteration

        def __len__(self):
            return len(self.parent._results)

        def __getitem__(self, key):
            return self.parent._results[key]

        def __repr__(self):
            return repr(self.parent._results)

    def __init__(self, executors):
        for executor in executors:
            if not callable(executor):
                raise self.Error("executor %s is not callable" % `executor`)

        q_input = WaitableQueue()
        q_output = WaitableQueue()

        self.workers = []
        for executor in executors:
            worker = self.Worker(q_input, q_output, executor)
            worker.start()

            self.workers.append(worker)

        self.size = len(executors)

        self.q_input = q_input

        self._results = []
        self._results_vacuum = WaitableQueue.Vacuum(q_output, self._results)

        self.results = self.IterResults(self)

    def any_alive(self):
        """Return True if any workers are alive, else False"""
        for worker in self.workers:
            if worker.is_alive():
                return True

        return False

    def _wait_nonblock(self, keepalive=True, keepalive_spares=0):

        def find_busy_worker():
            for worker in self.workers:
                if not worker.is_alive() or worker.is_stopped():
                    continue

                if worker.is_initialized() and worker.is_busy():
                    return worker

        # gauntlet of checks to make sure the input queue is empty and all
        # workers are idle

        self.q_input.wait_empty(0.1)

        if not self.any_alive():
            return True

        saved_put_counter = self.q_input.put_counter

        if not keepalive:

            # if input queue is empty and keepalive is False shutdown idle workers
            if self.q_input.qsize() != 0:
                return False

            idle_workers = [ worker for worker in self.workers
                                if worker.is_alive() and \
                                not worker.is_busy() and \
                                not worker.is_stopped() ]

            if len(idle_workers) > keepalive_spares:
                for worker in idle_workers:

                    # check is_busy() again just to make sure
                    if not worker.is_busy():
                        worker.stop()
                        break

                return False

        busy_worker = find_busy_worker()
        if busy_worker:
            return False

        # give puts to the input Queue a chance to make it through
        time.sleep(0.1)

        # workers may have written to the input Queue
        if self.q_input.put_counter != saved_put_counter:
            return False

        return True

    def wait(self, keepalive=True, keepalive_spares=0, block=True):
        """wait for all input to be processed by workers.

        Returns True if finished (always True when block=True) else False

        Arguments:

        If keepalive=False: stop idle workers once there's nothing left to do.
        If keepalive=False and keepalive_spares > 0: keep alive at least
        keepalive_spares spare workers.

        """

        while True:

            finished = self._wait_nonblock(keepalive, keepalive_spares)
            if finished or not block:
                return finished

            time.sleep(0.1)

    def stop(self, finish_timeout=None):
        """Stop workers and return any unprocessed input values"""

        if not self.workers:
            return

        # ignore SIGINT and SIGTERM for now (restore later)
        sigint_handler = signal.getsignal(signal.SIGINT)
        sigterm_handler = signal.getsignal(signal.SIGTERM)

        signal.signal(signal.SIGINT, signal.SIG_IGN)
        signal.signal(signal.SIGTERM, signal.SIG_IGN)

        for worker in self.workers:
            worker.stop()

        aborted = []
        inputs_vacuum = WaitableQueue.Vacuum(self.q_input, aborted)

        started = time.time()

        try:
            while True:
                if not self.any_alive():
                    break

                if finish_timeout is not None and (time.time() - started) > finish_timeout:
                    break

                time.sleep(0.1)

            for worker in self.workers:
                if worker.is_alive():
                    worker.terminate()

            for worker in self.workers:
                if worker.is_alive():
                    worker.join()

            self.workers = []
        finally:
            time.sleep(0.1)
            inputs_vacuum.stop()

        self._results_vacuum.stop()

        signal.signal(signal.SIGINT, sigint_handler)
        signal.signal(signal.SIGTERM, sigterm_handler)

        return aborted

    def __call__(self, *args):
        if len(args) == 1:
            self.q_input.put(args[0])
        else:
            self.q_input.put(args)

    def __enter__(self):
        return self

    def __exit__(self, type, value, tb):
        self.wait(keepalive=False)
        self.stop()

    def __del__(self):
        self.stop()

def parallel_map(max_procs, f, sequence):
    items = list(sequence)
    if not items:
        return

    if max_procs > len(items):
        max_procs = len(items)

    if max_procs == 1:
        for item in items:
            yield f(item)

    else:
        with Parallelize([f] * max_procs) as executor:
            for item in items:
                executor(item)

            for result in executor.results:
                yield result

def test():
    import time
    def sleeper(seconds):
        time.sleep(seconds)
        return seconds

    # pickle doesn't like embedded functions
    globals()[sleeper.__name__] = sleeper

    sleeper = Parallelize([ sleeper ] * 250)
    print "Allocated children"

    try:
        for i in range(1000):
            sleeper(1)

        print "Queued parallelized invocations. Ctrl-C to abort!"
        sleeper.wait()

        print "Finished waiting"

    finally:
        aborted = sleeper.stop()
        if aborted:
            print "len(aborted) = %d" % len(aborted)
            print "len(aborted) + len(results) = %d" % (len(aborted) + len(sleeper.results))

        print "len(pool.results) = %d" % len(sleeper.results)

def test2():
    class ExampleExecutor:
        def __init__(self, name):
            import os

            self.name = name
            self.pid = os.getpid()

            #import random
            # if we want to test what happens to failed initializations
            #if random.randint(0, 1):
            #    raise Exception

            print "%s.__init__: pid %d" % (self.name, self.pid)

        def __call__(self, *args):
            print "%s.__call__(%s)" % (self.name, `args`)
            return args

        def __del__(self):
            import os
            print "%s.__del__: self.pid=%d, os.getpid=%d" % (self.name, self.pid, os.getpid())

    # pickle doesn't like embedded classes
    globals()[ExampleExecutor.__name__] = ExampleExecutor

    deferred = []
    for i in range(2):
        deferred_executor = Deferred(ExampleExecutor, i)
        deferred.append(deferred_executor)

    p = Parallelize(deferred)
    try:
        print "len(p.workers) = %d" % len(p.workers)
        for i in range(2):
            p(i)

        p.wait()
        print "p.results: " + `p.results`
    finally:
        p.stop()
        print "after stop"

def test3():
    def square(i):
        import time
        time.sleep(1)
        return i * i

    # pickle doesn't like embedded functions
    globals()[square.__name__] = square

    square = Parallelize([ square ] * 10)
    print "Allocated children"

    try:
        for i in range(10):
            square(i)

        print "Queued parallelized invocations. Ctrl-C to abort!"
        for i, result in enumerate(square.results):
            print result

        print "len(iresults) == " + `i + 1`

    finally:
        aborted = square.stop()
        if aborted:
            print "len(aborted) = %d" % len(aborted)
            print "len(aborted) + len(results) = %d" % (len(aborted) + len(square.results))

        print "len(pool.results) = %d" % len(square.results)

def test4():
    def square(i):
        import time
        time.sleep(1)
        return i * i

    # pickle doesn't like embedded functions
    globals()[square.__name__] = square

    with Parallelize([ square ] * 10) as square:
        for i in range(10):
            square(i)

        print "Queued parallelized invocations. Ctrl-C to abort!"
        for i, result in enumerate(square.results):
            print result

def test5():

    def square(i):
        import time
        time.sleep(1)
        return i * i

    # pickle doesn't like embedded functions
    globals()[square.__name__] = square

    with Parallelize([ square ] * 10) as square:

        for i in range(10):
            square(i)

    print "results: " + `square.results`

if __name__ == "__main__":
    test5()