2011-06-28 10 views
9

Próbuję uzyskać limity czasu pracy do pracy w python3.2 za pomocą modułu concurrent.futures. Jednak, kiedy nie ma limitu czasu, tak naprawdę nie zatrzymuje wykonania. Próbowałem z obu wątków i executorów puli procesowej, żaden z nich nie zatrzymał zadania i tylko do jego ukończenia został przekroczony limit czasu. Czy ktokolwiek wie, czy można go uruchomić?Jak korzystać z concurrent.futures z limitami czasu?

import concurrent.futures 
import time 
import datetime 

max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000] 

def run_loop(max_number): 
    print("Started:", datetime.datetime.now(), max_number) 
    last_number = 0; 
    for i in range(1, max_number + 1): 
     last_number = i * i 
    return last_number 

def main(): 
    with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor: 
     try: 
      for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1): 
       print(future.result(timeout=1)) 
     except concurrent.futures._base.TimeoutError: 
      print("This took to long...") 

if __name__ == '__main__': 
    main() 

Odpowiedz

15

O ile mogę powiedzieć, TimeoutError jest rzeczywiście podnoszony, kiedy można się tego spodziewać, a nie po zakończeniu zadania.

Jednak Twój program będzie działał, dopóki wszystkie uruchomione zadania nie zostaną zakończone. Dzieje się tak dlatego, że aktualnie wykonywane zadania (w twoim przypadku prawdopodobnie wszystkie twoje przesłane zadania, ponieważ rozmiar twojej puli jest równa liczbie zadań), nie są faktycznie "zabijane".

Program TimeoutError zostaje podniesiony, dzięki czemu można zdecydować, aby nie czekać do zakończenia zadania (i zamiast tego zrobić coś innego), ale zadanie będzie kontynuowane aż do zakończenia. A python nie wyjdzie tak długo, jak istnieją niedokończone zadania w wątkach/podprocesach twojego Executora.

Z tego, co wiem, nie można po prostu "zatrzymać" obecnie wykonywania kontraktów Futures, można jedynie "anulować" zaplanowane zadania, które jeszcze nie zostały uruchomione. W twoim przypadku nie będzie żadnych, ale wyobraź sobie, że masz pulę 5 wątków/procesów i chcesz przetworzyć 100 przedmiotów. W pewnym momencie może być 20 ukończonych zadań, 5 uruchomionych zadań i 75 zaplanowanych zadań. W takim przypadku można anulować 76 zaplanowanych zadań, ale 4, które są uruchomione, będą kontynuowane aż do zakończenia, niezależnie od tego, czy oczekiwany jest wynik, czy nie.

Mimo że nie można tego zrobić w ten sposób, wydaje mi się, że powinny istnieć sposoby osiągnięcia pożądanego efektu końcowego. Może ta wersja może pomóc na drodze (nie wiem, czy to jest dokładnie to, czego chciał, ale to może się przydać):

import concurrent.futures 
import time 
import datetime 

max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000] 

class Task: 
    def __init__(self, max_number): 
     self.max_number = max_number 
     self.interrupt_requested = False 

    def __call__(self): 
     print("Started:", datetime.datetime.now(), self.max_number) 
     last_number = 0; 
     for i in xrange(1, self.max_number + 1): 
      if self.interrupt_requested: 
       print("Interrupted at", i) 
       break 
      last_number = i * i 
     print("Reached the end") 
     return last_number 

    def interrupt(self): 
     self.interrupt_requested = True 

def main(): 
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(max_numbers)) as executor: 
     tasks = [Task(num) for num in max_numbers] 
     for task, future in [(i, executor.submit(i)) for i in tasks]: 
      try: 
       print(future.result(timeout=1)) 
      except concurrent.futures.TimeoutError: 
       print("this took too long...") 
       task.interrupt() 


if __name__ == '__main__': 
    main() 

Tworząc wpłacone obiekt dla każdego „zadania”, dając tym do executora zamiast zwykłej funkcji można zapewnić sposób "przerwania" zadania. Wskazówka: usunąć linię task.interrupt() i zobacz, co się dzieje, to może łatwiej zrozumieć moje długie wyjaśnienia powyższej ;-)

4

Ostatnio trafiłem również ten problem, i wreszcie wymyślić następującym roztworze przy użyciu ProcessPoolExecutor:


def main(): 
    with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor: 
     try: 
      for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1): 
       print(future.result(timeout=1)) 
     except concurrent.futures._base.TimeoutError: 
      print("This took to long...") 
      stop_process_pool(executor) 

def stop_process_pool(executor): 
    for pid, processes in executor._processes.items(): 
     process.terminate() 
    executor.shutdown() 
+0

txmc można po prostu zabić jednego z procesów? Czy musisz zabić wszystkich? –