O ile mogę powiedzieć, TimeoutError jest rzeczywiście podnoszony, kiedy można się tego spodziewać, a nie po zakończeniu zadania.
Jednak Twój program będzie działał, dopóki wszystkie uruchomione zadania nie zostaną zakończone. Dzieje się tak dlatego, że aktualnie wykonywane zadania (w twoim przypadku prawdopodobnie wszystkie twoje przesłane zadania, ponieważ rozmiar twojej puli jest równa liczbie zadań), nie są faktycznie "zabijane".
Program TimeoutError zostaje podniesiony, dzięki czemu można zdecydować, aby nie czekać do zakończenia zadania (i zamiast tego zrobić coś innego), ale zadanie będzie kontynuowane aż do zakończenia. A python nie wyjdzie tak długo, jak istnieją niedokończone zadania w wątkach/podprocesach twojego Executora.
Z tego, co wiem, nie można po prostu "zatrzymać" obecnie wykonywania kontraktów Futures, można jedynie "anulować" zaplanowane zadania, które jeszcze nie zostały uruchomione. W twoim przypadku nie będzie żadnych, ale wyobraź sobie, że masz pulę 5 wątków/procesów i chcesz przetworzyć 100 przedmiotów. W pewnym momencie może być 20 ukończonych zadań, 5 uruchomionych zadań i 75 zaplanowanych zadań. W takim przypadku można anulować 76 zaplanowanych zadań, ale 4, które są uruchomione, będą kontynuowane aż do zakończenia, niezależnie od tego, czy oczekiwany jest wynik, czy nie.
Mimo że nie można tego zrobić w ten sposób, wydaje mi się, że powinny istnieć sposoby osiągnięcia pożądanego efektu końcowego. Może ta wersja może pomóc na drodze (nie wiem, czy to jest dokładnie to, czego chciał, ale to może się przydać):
import concurrent.futures
import time
import datetime
max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000]
class Task:
def __init__(self, max_number):
self.max_number = max_number
self.interrupt_requested = False
def __call__(self):
print("Started:", datetime.datetime.now(), self.max_number)
last_number = 0;
for i in xrange(1, self.max_number + 1):
if self.interrupt_requested:
print("Interrupted at", i)
break
last_number = i * i
print("Reached the end")
return last_number
def interrupt(self):
self.interrupt_requested = True
def main():
with concurrent.futures.ThreadPoolExecutor(max_workers=len(max_numbers)) as executor:
tasks = [Task(num) for num in max_numbers]
for task, future in [(i, executor.submit(i)) for i in tasks]:
try:
print(future.result(timeout=1))
except concurrent.futures.TimeoutError:
print("this took too long...")
task.interrupt()
if __name__ == '__main__':
main()
Tworząc wpłacone obiekt dla każdego „zadania”, dając tym do executora zamiast zwykłej funkcji można zapewnić sposób "przerwania" zadania. Wskazówka: usunąć linię task.interrupt()
i zobacz, co się dzieje, to może łatwiej zrozumieć moje długie wyjaśnienia powyższej ;-)
txmc można po prostu zabić jednego z procesów? Czy musisz zabić wszystkich? –