Czy byłoby możliwe utworzenie puli python, która nie jest demoniczna? Chcę, aby pula mogła wywoływać funkcję, która ma inną pulę w środku. Dzięki.Python Process Pool non-daemonic?
Odpowiedz
Klasa multiprocessing.pool.Pool
tworzy procesy robocze w swojej metodzie __init__
, sprawia im demoniczny i uruchamia je, a to nie jest możliwe, aby ponownie ustawić swój atrybut daemon
do False
zanim zostaną uruchomione (i potem nie wolno już). Ale możesz stworzyć własną podklasę multiprocesing.pool.Pool
(multiprocessing.Pool
jest tylko funkcją opakowania) i zastąpić własną podklasę multiprocessing.Process
, która jest zawsze nie demoniczna, do użycia w procesach roboczych.
Oto pełny przykład tego, jak to zrobić. Ważne części to dwie klasy: NoDaemonProcess
i MyPool
u góry i wywołanie pool.close()
i pool.join()
w instancji MyPool
na końcu.
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time
from random import randint
class NoDaemonProcess(multiprocessing.Process):
# make 'daemon' attribute always return False
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)
# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
Process = NoDaemonProcess
def sleepawhile(t):
print("Sleeping %i seconds..." % t)
time.sleep(t)
return t
def work(num_procs):
print("Creating %i (daemon) workers and jobs in child." % num_procs)
pool = multiprocessing.Pool(num_procs)
result = pool.map(sleepawhile,
[randint(1, 5) for x in range(num_procs)])
# The following is not really needed, since the (daemon) workers of the
# child's pool are killed when the child is terminated, but it's good
# practice to cleanup after ourselves anyway.
pool.close()
pool.join()
return result
def test():
print("Creating 5 (non-daemon) workers and jobs in main process.")
pool = MyPool(5)
result = pool.map(work, [randint(1, 5) for x in range(5)])
pool.close()
pool.join()
print(result)
if __name__ == '__main__':
test()
Powyższy kod wydaje mi się zawieszony. W szczególności wydaje się, że zawiesza się w pool.close() wewnątrz pracy(). Czy jest coś, czego mi brakuje? –
Po raz kolejny przetestowałem mój kod w Pythonie 2.7/3.2 (po naprawieniu linii "print") w systemie Linux i Pythonie 2.6/2.7/3.2 OS X. Linux i Python 2.7/3.2 na OS X działają dobrze, ale kod naprawdę się zawiesił z Pythonem 2.6 na OS X (Lion). Wydaje się, że jest to błąd w module wieloprocesowym, który został naprawiony, ale tak naprawdę nie sprawdziłem modułu do śledzenia błędów. –
Powinno to zostać naprawione w module wieloprocesowym (opcja dla nie-demonicznych pracowników powinna być dostępna). Czy ktoś wie, kto to utrzymuje? –
Moduł multiprocessing ma ładny interfejs do korzystania z basenów z procesów lub wątkach. W zależności od aktualnego przypadku użycia, możesz rozważyć użycie multiprocessing.pool.ThreadPool
dla swojej zewnętrznej puli, co spowoduje wątki (które pozwalają odrodzić procesy od wewnątrz) w przeciwieństwie do procesów.
To może być ograniczona przez GIL, ale w moim przypadku (testowałem zarówno), czas uruchamiania procesów z zewnętrznej Pool
jako stworzony here zdecydowanie przeważają rozwiązanie ThreadPool
.
To naprawdę łatwe do wymiany Processes
dla Threads
. Więcej informacji na temat korzystania z rozwiązaniahere lub .
Wystąpił problem podczas próby importowania globali między modułami, powodując wielokrotne oszacowanie linii ProcessPool().
globals.py
from processing import Manager, Lock
from pathos.multiprocessing import ProcessPool
from pathos.threading import ThreadPool
class SingletonMeta(type):
def __new__(cls, name, bases, dict):
dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self
return super(SingletonMeta, cls).__new__(cls, name, bases, dict)
def __init__(cls, name, bases, dict):
super(SingletonMeta, cls).__init__(name, bases, dict)
cls.instance = None
def __call__(cls,*args,**kw):
if cls.instance is None:
cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)
return cls.instance
def __deepcopy__(self, item):
return item.__class__.instance
class Globals(object):
__metaclass__ = SingletonMeta
"""
This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children
The root cause is that importing this file from different modules causes this file to be reevalutated each time,
thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug
"""
def __init__(self):
print "%s::__init__()" % (self.__class__.__name__)
self.shared_manager = Manager()
self.shared_process_pool = ProcessPool()
self.shared_thread_pool = ThreadPool()
self.shared_lock = Lock() # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin
następnie zaimportować bezpiecznie od gdzie indziej w kodzie
from globals import Globals
Globals().shared_manager
Globals().shared_process_pool
Globals().shared_thread_pool
Globals().shared_lock
AFAIK, nie to nie jest możliwe, wszystko pracownik w basenie są daemonized i to nie jest możliwe, aby __inject zależność__, BTW nie rozumiem drugiej części twojego pytania "Chcę, żeby pula mogła wywoływać funkcję, która ma inną pulę w środku" i jak to przeszkadza w tym, że pracownicy są demonizowani. – mouad
Ponieważ jeśli funkcja a ma pulę, która uruchamia funkcję b, która ma pulę, która uruchamia funkcję c, istnieje problem w b, że jest uruchamiany w procesie demona, a procesy demona nie mogą tworzyć procesów. 'AssertionError: procesy daemoniczne nie mogą mieć dzieci ' – Max