2016-08-09 28 views
6

Próbowałem napisać opakowanie interaktywne (do użytku w ipython) dla biblioteki, która kontroluje niektóre urządzenia. Niektóre wywołania są bardzo obciążone w IO, więc sensowne jest równoległe wykonywanie zadań. Korzystanie z puli wątków (prawie) działa ładnie:Zatrzymywanie procesów w ThreadPool w Pythonie

from multiprocessing.pool import ThreadPool 

class hardware(): 
    def __init__(IPaddress): 
     connect_to_hardware(IPaddress) 

    def some_long_task_to_hardware(wtime): 
     wait(wtime) 
     result = 'blah' 
     return result 

pool = ThreadPool(processes=4) 
Threads=[] 
h=[hardware(IP1),hardware(IP2),hardware(IP3),hardware(IP4)] 
for tt in range(4): 
    task=pool.apply_async(h[tt].some_long_task_to_hardware,(1000)) 
    threads.append(task) 
alive = [True]*4 
Try: 
    while any(alive) : 
     for tt in range(4): alive[tt] = not threads[tt].ready() 
     do_other_stuff_for_a_bit() 
except: 
    #some command I cannot find that will stop the threads... 
    raise 
for tt in range(4): print(threads[tt].get()) 

Problem pojawia się wtedy, gdy użytkownik chce zatrzymać proces lub wystąpi błąd IO w do_other_stuff_for_a_bit(). Naciśnięcie klawisza Ctrl + C zatrzymuje proces główny, ale wątki robocze działają, dopóki ich bieżące zadanie nie zostanie zakończone.
Czy istnieje sposób na zatrzymanie tych wątków bez konieczności przepisywania biblioteki lub wyprowadzania pythona przez użytkownika? pool.terminate() i pool.join(), które widziałem używane w innych przykładach nie wydają się wykonywać tej pracy.

Rzeczywista procedura (zamiast uproszczonej wersji powyżej) korzysta z rejestrowania i chociaż wszystkie wątki robocze są w pewnym momencie wyłączone, widzę, że procesy, które rozpoczęły, są kontynuowane aż do ukończenia (a będąc sprzętem widzę ich efekt, patrząc przez pokój).

To jest w python 2.7.

UPDATE:

rozwiązanie wydaje się, aby przełączyć się z użyciem multiprocessing.Process zamiast puli gwintu. Kod testu starałem się biec foo_pulse:

class foo(object): 
    def foo_pulse(self,nPulse,name): #just one method of *many* 
     print('starting pulse for '+name) 
     result=[] 
     for ii in range(nPulse): 
      print('on for '+name) 
      time.sleep(2) 
      print('off for '+name) 
      time.sleep(2) 
      result.append(ii) 
     return result,name 

Jeśli spróbujesz działa to przy użyciu puli wątków, a następnie Ctrl-C nie zatrzymuje foo_pulse z systemem (choć robi zabić wątki od razu, że sprawozdanie druku dalej nadchodzi:

from multiprocessing.pool import ThreadPool 
import time 
def test(nPulse): 
    a=foo() 
    pool=ThreadPool(processes=4) 
    threads=[] 
    for rn in range(4) : 
     r=pool.apply_async(a.foo_pulse,(nPulse,'loop '+str(rn))) 
     threads.append(r) 
    alive=[True]*4 
    try: 
     while any(alive) : #wait until all threads complete 
      for rn in range(4): 
       alive[rn] = not threads[rn].ready() 
       time.sleep(1) 
    except : #stop threads if user presses ctrl-c 
     print('trying to stop threads') 
     pool.terminate() 
     print('stopped threads') # this line prints but output from foo_pulse carried on. 
     raise 
    else : 
     for t in threads : print(t.get()) 

wersja użyciu multiprocessing.Process działa jednak zgodnie z oczekiwaniami:

import multiprocessing as mp 
import time 
def test_pro(nPulse): 
    pros=[] 
    ans=[] 
    a=foo() 
    for rn in range(4) : 
     q=mp.Queue() 
     ans.append(q) 
     r=mp.Process(target=wrapper,args=(a,"foo_pulse",q),kwargs={'args':(nPulse,'loop '+str(rn))}) 
     r.start() 
     pros.append(r) 
    try: 
     for p in pros : p.join() 
     print('all done') 
    except : #stop threads if user stops findRes 
     print('trying to stop threads') 
     for p in pros : p.terminate() 
     print('stopped threads') 
    else : 
     print('output here') 
     for q in ans : 
      print(q.get()) 
    print('exit time') 

gdzie mam zdefiniowane otoki dla biblioteki foo (tak, aby nie nie trzeba ponownie pisać). Jeśli wartość nie jest potrzebny ani to wrapper:

def wrapper(a,target,q,args=(),kwargs={}): 
    '''Used when return value is wanted''' 
    q.put(getattr(a,target)(*args,**kwargs)) 

Z dokumentacji nie widzę powodu, dlaczego basen nie działa (innych niż bug).

+1

Czy masz jakiś powód, aby korzystać z nieudokumentowanych zajęć? Prawdopodobnie miałbyś więcej szczęścia w module 'concurrent.futures'. – SuperSaiyan

+0

Nie ma prawdziwego powodu, aby używać nieudokumentowanych klas - inne niż to zostało użyte w przykładowym kodzie, który napotkałem podczas badania, jak to zrobić. – SRD

+0

@SuperSaiyan: Dokumentacja jest pod inną nazwą; "ThreadPool" jest ujawniony w udokumentowany sposób w ramach "multiprocessing.dummy.Pool", gdzie ['multiprocessing.dummy' jest bliską kopią API' multiprocessing' wspieranego wątkami zamiast procesów] (https: // docs. python.org/3/library/multiprocessing.html#module-multiprocessing.dummy). – ShadowRanger

Odpowiedz

1

Jest to bardzo interesujące wykorzystanie równoległości.

Jednak jeśli używasz multiprocessing, celem jest równoległe działanie wielu procesów, w przeciwieństwie do jednego procesu z wieloma wątkami.

Rozważ te kilka zmian do wdrożenia go za pomocą multiprocessing:

Trzeba te funkcje, które będą biegły równolegle:

import time 
import multiprocessing as mp 


def some_long_task_from_library(wtime): 
    time.sleep(wtime) 


class MyException(Exception): pass 

def do_other_stuff_for_a_bit(): 
    time.sleep(5) 
    raise MyException("Something Happened...") 

Stwórzmy i uruchomić procesy, powiedzmy 4:

procs = [] # this is not a Pool, it is just a way to handle the 
      # processes instead of calling them p1, p2, p3, p4... 
for _ in range(4): 
    p = mp.Process(target=some_long_task_from_library, args=(1000,)) 
    p.start() 
    procs.append(p) 
mp.active_children() # this joins all the started processes, and runs them. 

Procesy działają równolegle, prawdopodobnie w oddzielnym rdzeniu procesora, ale to system operacyjny decyduje.Możesz sprawdzić swój monitor systemu.

W międzyczasie uruchomić proces, który złamie, i chcesz zatrzymać procesy uruchomione, nie pozostawiając im sierotę:

try: 
    do_other_stuff_for_a_bit() 
except MyException as exc: 
    print(exc) 
    print("Now stopping all processes...") 
    for p in procs: 
     p.terminate() 
print("The rest of the process will continue") 

Jeśli to nie ma sensu kontynuować procesu głównego, gdy jedno lub wszystkie podprocesy zostały zakończone, powinieneś obsługiwać wyjście z głównego programu.

Mam nadzieję, że to pomoże, i możesz dostosować to do swojej biblioteki.

+0

W moim przypadku nie miało znaczenia, czy wszystko działa na tym samym procesorze, powodem równoległego działania jest to, że na IO czekają masowe oczekiwania. Jednak ta metoda działa z jedną wadą, że trudno jest zwrócić wartości z połączeń. Na razie rozwiązałem to za pomocą funkcji wrappera - zobacz mój zaktualizowany post. – SRD

+0

W zależności od rodzaju wartości, które należy zwrócić z połączeń, można użyć 'Kolejka',' Rura', pamięć współdzielona 'Wartość' lub' Tablica', a nawet plik dysku. W niektórych z tych przypadków może być konieczne użycie 'Zablokuj'. – chapelo

0

W odpowiedzi na pytanie, dlaczego basen nie działa to jest spowodowane (cytowany w Documentation) Następnie głównych musi być importable przez procesów potomnych i ze względu na charakter tego projektu interaktywny Pythona używane.

W tym samym czasie nie było jasne, dlaczego wątek będzie - choć wskazówka jest tam w nazwie. ThreadPool tworzy pulę procesów roboczych za pomocą metody wieloprocesorowej.dummy, która jak wspomniano here jest tylko opakowaniem wokół modułu wątków. Pula używa przetwarzania wieloprocesowego. Może to być postrzegane przez ten test:

p=ThreadPool(processes=3) 
p._pool[0] 
<DummyProcess(Thread23, started daemon 12345)> #no terminate() method 

p=Pool(processes=3) 
p._pool[0] 
<Process(PoolWorker-1, started daemon)> #has handy terminate() method if needed 

Jak wątki nie mają sposobu zakończenia wątków roboczych prowadzili działa do momentu zakończenia ich aktualnego zadania. Zabijanie wątków jest nieporządne (dlatego próbowałem użyć modułu wieloprocesorowego), ale rozwiązania są here.

Jedno ostrzeżenie o rozwiązaniu za pomocą powyższego:

def wrapper(a,target,q,args=(),kwargs={}): 
    '''Used when return value is wanted''' 
    q.put(getattr(a,target)(*args,**kwargs)) 

jest to, że zmiany atrybutów wewnątrz instancji obiektu nie są przekazywane z powrotem do programu głównego. Na przykład klasa foo powyżej może również mieć takie metody, jak: def addIP (newIP): self.hardwareIP = newIP Połączenie z r=mp.Process(target=a.addIP,args=(127.0.0.1)) nie aktualizuje a.

Jedynym sposobem obejścia tego złożonego obiektu wydaje się być pamięć współdzielona za pomocą niestandardowego manager, który może zapewnić dostęp zarówno do metod, jak i atrybutów obiektu. a Dla bardzo dużego obiektu złożonego opartego na bibliotece może to być najlepiej zrobione użycie dir(foo) do zapełnienia menedżera. Jeśli mogę wymyślić, jak zaktualizuję tę odpowiedź na przykładzie (dla mojego przyszłego ja tak samo jak innych).