2011-08-07 4 views
43

Czy byłoby możliwe utworzenie puli python, która nie jest demoniczna? Chcę, aby pula mogła wywoływać funkcję, która ma inną pulę w środku. Dzięki.Python Process Pool non-daemonic?

+0

AFAIK, nie to nie jest możliwe, wszystko pracownik w basenie są daemonized i to nie jest możliwe, aby __inject zależność__, BTW nie rozumiem drugiej części twojego pytania "Chcę, żeby pula mogła wywoływać funkcję, która ma inną pulę w środku" i jak to przeszkadza w tym, że pracownicy są demonizowani. – mouad

+1

Ponieważ jeśli funkcja a ma pulę, która uruchamia funkcję b, która ma pulę, która uruchamia funkcję c, istnieje problem w b, że jest uruchamiany w procesie demona, a procesy demona nie mogą tworzyć procesów. 'AssertionError: procesy daemoniczne nie mogą mieć dzieci ' – Max

Odpowiedz

68

Klasa multiprocessing.pool.Pool tworzy procesy robocze w swojej metodzie __init__, sprawia im demoniczny i uruchamia je, a to nie jest możliwe, aby ponownie ustawić swój atrybut daemon do False zanim zostaną uruchomione (i potem nie wolno już). Ale możesz stworzyć własną podklasę multiprocesing.pool.Pool (multiprocessing.Pool jest tylko funkcją opakowania) i zastąpić własną podklasę multiprocessing.Process, która jest zawsze nie demoniczna, do użycia w procesach roboczych.

Oto pełny przykład tego, jak to zrobić. Ważne części to dwie klasy: NoDaemonProcess i MyPool u góry i wywołanie pool.close() i pool.join() w instancji MyPool na końcu.

#!/usr/bin/env python 
# -*- coding: UTF-8 -*- 

import multiprocessing 
# We must import this explicitly, it is not imported by the top-level 
# multiprocessing module. 
import multiprocessing.pool 
import time 

from random import randint 


class NoDaemonProcess(multiprocessing.Process): 
    # make 'daemon' attribute always return False 
    def _get_daemon(self): 
     return False 
    def _set_daemon(self, value): 
     pass 
    daemon = property(_get_daemon, _set_daemon) 

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool 
# because the latter is only a wrapper function, not a proper class. 
class MyPool(multiprocessing.pool.Pool): 
    Process = NoDaemonProcess 

def sleepawhile(t): 
    print("Sleeping %i seconds..." % t) 
    time.sleep(t) 
    return t 

def work(num_procs): 
    print("Creating %i (daemon) workers and jobs in child." % num_procs) 
    pool = multiprocessing.Pool(num_procs) 

    result = pool.map(sleepawhile, 
     [randint(1, 5) for x in range(num_procs)]) 

    # The following is not really needed, since the (daemon) workers of the 
    # child's pool are killed when the child is terminated, but it's good 
    # practice to cleanup after ourselves anyway. 
    pool.close() 
    pool.join() 
    return result 

def test(): 
    print("Creating 5 (non-daemon) workers and jobs in main process.") 
    pool = MyPool(5) 

    result = pool.map(work, [randint(1, 5) for x in range(5)]) 

    pool.close() 
    pool.join() 
    print(result) 

if __name__ == '__main__': 
    test() 
+0

Powyższy kod wydaje mi się zawieszony. W szczególności wydaje się, że zawiesza się w pool.close() wewnątrz pracy(). Czy jest coś, czego mi brakuje? –

+1

Po raz kolejny przetestowałem mój kod w Pythonie 2.7/3.2 (po naprawieniu linii "print") w systemie Linux i Pythonie 2.6/2.7/3.2 OS X. Linux i Python 2.7/3.2 na OS X działają dobrze, ale kod naprawdę się zawiesił z Pythonem 2.6 na OS X (Lion). Wydaje się, że jest to błąd w module wieloprocesowym, który został naprawiony, ale tak naprawdę nie sprawdziłem modułu do śledzenia błędów. –

+0

Powinno to zostać naprawione w module wieloprocesowym (opcja dla nie-demonicznych pracowników powinna być dostępna). Czy ktoś wie, kto to utrzymuje? –

6

Moduł multiprocessing ma ładny interfejs do korzystania z basenów z procesów lub wątkach. W zależności od aktualnego przypadku użycia, możesz rozważyć użycie multiprocessing.pool.ThreadPool dla swojej zewnętrznej puli, co spowoduje wątki (które pozwalają odrodzić procesy od wewnątrz) w przeciwieństwie do procesów.

To może być ograniczona przez GIL, ale w moim przypadku (testowałem zarówno), czas uruchamiania procesów z zewnętrznej Pool jako stworzony here zdecydowanie przeważają rozwiązanie ThreadPool.


To naprawdę łatwe do wymiany Processes dla Threads. Więcej informacji na temat korzystania z rozwiązaniahere lub .

0

Wystąpił problem podczas próby importowania globali między modułami, powodując wielokrotne oszacowanie linii ProcessPool().

globals.py

from processing    import Manager, Lock 
from pathos.multiprocessing import ProcessPool 
from pathos.threading  import ThreadPool 

class SingletonMeta(type): 
    def __new__(cls, name, bases, dict): 
     dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self 
     return super(SingletonMeta, cls).__new__(cls, name, bases, dict) 

    def __init__(cls, name, bases, dict): 
     super(SingletonMeta, cls).__init__(name, bases, dict) 
     cls.instance = None 

    def __call__(cls,*args,**kw): 
     if cls.instance is None: 
      cls.instance = super(SingletonMeta, cls).__call__(*args, **kw) 
     return cls.instance 

    def __deepcopy__(self, item): 
     return item.__class__.instance 

class Globals(object): 
    __metaclass__ = SingletonMeta 
    """  
    This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children 

    The root cause is that importing this file from different modules causes this file to be reevalutated each time, 
    thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug  
    """ 
    def __init__(self): 
     print "%s::__init__()" % (self.__class__.__name__) 
     self.shared_manager  = Manager() 
     self.shared_process_pool = ProcessPool() 
     self.shared_thread_pool = ThreadPool() 
     self.shared_lock   = Lock()  # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin 

następnie zaimportować bezpiecznie od gdzie indziej w kodzie

from globals import Globals 
Globals().shared_manager  
Globals().shared_process_pool 
Globals().shared_thread_pool 
Globals().shared_lock