reference, declarationdefinition
definition → references, declarations, derived classes, virtual overrides
reference to multiple definitions → definitions
unreferenced
    1
    2
    3
    4
    5
    6
    7
    8
    9
   10
   11
   12
   13
   14
   15
   16
   17
   18
   19
   20
   21
   22
   23
   24
   25
   26
   27
   28
   29
   30
   31
   32
   33
   34
   35
   36
   37
   38
   39
   40
   41
   42
   43
   44
   45
   46
   47
   48
   49
   50
   51
   52
   53
   54
   55
   56
import sys
import multiprocessing


_current = None
_total = None


def _init(current, total):
    global _current
    global _total
    _current = current
    _total = total


def _wrapped_func(func_and_args):
    func, argument, should_print_progress, filter_ = func_and_args

    if should_print_progress:
        with _current.get_lock():
            _current.value += 1
        sys.stdout.write('\r\t{} of {}'.format(_current.value, _total.value))
        sys.stdout.flush()

    return func(argument, filter_)


def pmap(func, iterable, processes, should_print_progress, filter_=None, *args, **kwargs):
    """
    A parallel map function that reports on its progress.

    Applies `func` to every item of `iterable` and return a list of the
    results. If `processes` is greater than one, a process pool is used to run
    the functions in parallel. `should_print_progress` is a boolean value that
    indicates whether a string 'N of M' should be printed to indicate how many
    of the functions have finished being run.
    """
    global _current
    global _total
    _current = multiprocessing.Value('i', 0)
    _total = multiprocessing.Value('i', len(iterable))

    func_and_args = [(func, arg, should_print_progress, filter_) for arg in iterable]
    if processes == 1:
        result = list(map(_wrapped_func, func_and_args, *args, **kwargs))
    else:
        pool = multiprocessing.Pool(initializer=_init,
                                    initargs=(_current, _total,),
                                    processes=processes)
        result = pool.map(_wrapped_func, func_and_args, *args, **kwargs)
        pool.close()
        pool.join()

    if should_print_progress:
        sys.stdout.write('\r')
    return result