Próbowałem napisać opakowanie interaktywne (do użytku w ipython) dla biblioteki, która kontroluje niektóre urządzenia. Niektóre wywołania są bardzo obciążone w IO, więc sensowne jest równoległe wykonywanie zadań. Korzystanie z puli wątków (prawie) działa ładnie:Zatrzymywanie procesów w ThreadPool w Pythonie
from multiprocessing.pool import ThreadPool
class hardware():
def __init__(IPaddress):
connect_to_hardware(IPaddress)
def some_long_task_to_hardware(wtime):
wait(wtime)
result = 'blah'
return result
pool = ThreadPool(processes=4)
Threads=[]
h=[hardware(IP1),hardware(IP2),hardware(IP3),hardware(IP4)]
for tt in range(4):
task=pool.apply_async(h[tt].some_long_task_to_hardware,(1000))
threads.append(task)
alive = [True]*4
Try:
while any(alive) :
for tt in range(4): alive[tt] = not threads[tt].ready()
do_other_stuff_for_a_bit()
except:
#some command I cannot find that will stop the threads...
raise
for tt in range(4): print(threads[tt].get())
Problem pojawia się wtedy, gdy użytkownik chce zatrzymać proces lub wystąpi błąd IO w do_other_stuff_for_a_bit()
. Naciśnięcie klawisza Ctrl + C zatrzymuje proces główny, ale wątki robocze działają, dopóki ich bieżące zadanie nie zostanie zakończone.
Czy istnieje sposób na zatrzymanie tych wątków bez konieczności przepisywania biblioteki lub wyprowadzania pythona przez użytkownika? pool.terminate()
i pool.join()
, które widziałem używane w innych przykładach nie wydają się wykonywać tej pracy.
Rzeczywista procedura (zamiast uproszczonej wersji powyżej) korzysta z rejestrowania i chociaż wszystkie wątki robocze są w pewnym momencie wyłączone, widzę, że procesy, które rozpoczęły, są kontynuowane aż do ukończenia (a będąc sprzętem widzę ich efekt, patrząc przez pokój).
To jest w python 2.7.
UPDATE:
rozwiązanie wydaje się, aby przełączyć się z użyciem multiprocessing.Process zamiast puli gwintu. Kod testu starałem się biec foo_pulse:
class foo(object):
def foo_pulse(self,nPulse,name): #just one method of *many*
print('starting pulse for '+name)
result=[]
for ii in range(nPulse):
print('on for '+name)
time.sleep(2)
print('off for '+name)
time.sleep(2)
result.append(ii)
return result,name
Jeśli spróbujesz działa to przy użyciu puli wątków, a następnie Ctrl-C nie zatrzymuje foo_pulse z systemem (choć robi zabić wątki od razu, że sprawozdanie druku dalej nadchodzi:
from multiprocessing.pool import ThreadPool
import time
def test(nPulse):
a=foo()
pool=ThreadPool(processes=4)
threads=[]
for rn in range(4) :
r=pool.apply_async(a.foo_pulse,(nPulse,'loop '+str(rn)))
threads.append(r)
alive=[True]*4
try:
while any(alive) : #wait until all threads complete
for rn in range(4):
alive[rn] = not threads[rn].ready()
time.sleep(1)
except : #stop threads if user presses ctrl-c
print('trying to stop threads')
pool.terminate()
print('stopped threads') # this line prints but output from foo_pulse carried on.
raise
else :
for t in threads : print(t.get())
wersja użyciu multiprocessing.Process działa jednak zgodnie z oczekiwaniami:
import multiprocessing as mp
import time
def test_pro(nPulse):
pros=[]
ans=[]
a=foo()
for rn in range(4) :
q=mp.Queue()
ans.append(q)
r=mp.Process(target=wrapper,args=(a,"foo_pulse",q),kwargs={'args':(nPulse,'loop '+str(rn))})
r.start()
pros.append(r)
try:
for p in pros : p.join()
print('all done')
except : #stop threads if user stops findRes
print('trying to stop threads')
for p in pros : p.terminate()
print('stopped threads')
else :
print('output here')
for q in ans :
print(q.get())
print('exit time')
gdzie mam zdefiniowane otoki dla biblioteki foo (tak, aby nie nie trzeba ponownie pisać). Jeśli wartość nie jest potrzebny ani to wrapper:
def wrapper(a,target,q,args=(),kwargs={}):
'''Used when return value is wanted'''
q.put(getattr(a,target)(*args,**kwargs))
Z dokumentacji nie widzę powodu, dlaczego basen nie działa (innych niż bug).
Czy masz jakiś powód, aby korzystać z nieudokumentowanych zajęć? Prawdopodobnie miałbyś więcej szczęścia w module 'concurrent.futures'. – SuperSaiyan
Nie ma prawdziwego powodu, aby używać nieudokumentowanych klas - inne niż to zostało użyte w przykładowym kodzie, który napotkałem podczas badania, jak to zrobić. – SRD
@SuperSaiyan: Dokumentacja jest pod inną nazwą; "ThreadPool" jest ujawniony w udokumentowany sposób w ramach "multiprocessing.dummy.Pool", gdzie ['multiprocessing.dummy' jest bliską kopią API' multiprocessing' wspieranego wątkami zamiast procesów] (https: // docs. python.org/3/library/multiprocessing.html#module-multiprocessing.dummy). – ShadowRanger