2009-11-05 28 views
25

Mam to dość proste pytanie o ThreadPoolExecutor. Mam następującą sytuację: muszę spożywać obiekty z kolejki, tworzyć odpowiednie zadania dla nich i przesyłać je do ThreadPoolExecutor. To całkiem proste. Ale w scenariuszu wyłączania wielu pracowników może zostać umieszczonych w kolejce do wykonania. Ponieważ jedno z tych zadań może być uruchomione przez godzinę, a ja chcę relatywnie szybkiego, pełnego wdzięku zamknięcia aplikacji, chcę odrzucić wszystkie zadania w kolejce od ThreadPoolExecutor, podczas gdy zadania przetwarzania powinny być zakończone normalnie.Usuwanie wszystkich zadań kolejkowania ThreadPoolExecutor

Dokumentacja ThreadPoolExecutor ma metodę remove(), ale pozwala tylko na usunięcie określonych zadań. purge() działa tylko w przypadku już anulowanych zadań na przyszłość. Moim pomysłem było oczyszczenie kolejki z wszystkimi kolejnymi zadaniami. ThreadPoolExecutor zapewnia dostęp do tej kolejki wewnętrznej ale dokumentacja stwierdza:

Method getQueue() pozwala na dostęp do kolejki pracować dla celów monitorowania i debugowanie. Zastosowanie tej metody zniechęca do jakichkolwiek innych celów.

Więc chwytanie tej kolejki i czyszczenie jej nie jest opcją. Również ten fragment dokumentacji mówi:

metod

Dwa dostarczone, remove (java.lang.Runnable) i czyszczenie() są dostępne, aby pomóc w przechowywaniu rekultywacji gdy duża liczba kolejce zadań stać odwołany .

Jak? Oczywiście, mogę zachować listę wszystkich zadań, które przesłałem do executora, i w przypadku zamknięcia, iteruję po wszystkich wpisach i usuwam je z ThreadPoolExecutor za pomocą metody remove() ... ale ... chodź, to jest marnowanie pamięci i kłopot z utrzymaniem tej listy. (Usuwanie wcześniej wykonanych zadań)

Doceniam wszelkie wskazówki i rozwiązania!

Odpowiedz

9

Czy rozważałeś zawijanie ExecutorService? Utwórz nowe, które deleguje wszystkie połączenia do innego Executora, ale zachowuje Futures na liście własnych. CleanShutdownExecutorService może wówczas mieć metodę cancelRemainingTasks(), która wywołuje shutdown(), a następnie wywołuje cancel (false) we wszystkich kontraktach Futures na swojej liście.

+0

To może być najczystsze podejście, dlatego akceptuję tę odpowiedź. :-) Nawet miałem nadzieję, że jest już coś takiego ... więc: pozwala nam to zakodować. :-) – Malax

4

Jak ExecutorService.shutdown() nie robi wystarczająco dużo i ExecutorService.shutdownNow() robi zbyt dużo Chyba trzeba napisać coś w środku: zapamiętaj wszystkich złożonych zadań i usunąć je ręcznie po (lub przed) wywołanie shutdown().

+0

"Uruchamia systematyczne wyłączanie, w którym wykonywane są wcześniej przesłane zadania, ale żadne nowe zadania nie zostaną zaakceptowane." Mam kilka zgłoszonych zadań, które nie powinny już być wykonywane. Ale zadania ** wykonujące ** nie powinny się kończyć. – Malax

+0

Edytowałeś swoją odpowiedź, więc mój komentarz nie jest już całkowicie poprawny. Niemniej jednak, dokumentacja shutdownNow() stwierdza: "Próba zatrzymania wszystkich aktywnie wykonujących zadań." Czego nie chcę. :) – Malax

+0

Och, racja, przeczuwam tę połowę zdania. Cóż, domyślam się, że jedyną opcją jest zapamiętanie wszystkich przesłanych 'Runnable'ów i usunięcie ich ręcznie. Odpowiednio edytuję. – Bombe

1

Odpowiedź Bombe jest dokładnie taka, jak chcesz. shutdownNow() zatrzymuje wszystko przy użyciu podejścia nuke i utorować. To najlepsza rzecz, jaką możesz zrobić, jeśli nie masz podklasy implementacji ThreadPoolExecutor, której używasz.

0

Szalony i nieczysty rozwiązanie, które może działać (nie prawdziwe przemyślane lub testowane) byłoby zastąpić interrupt() swojego WorkerTasks który tylko w przypadku niektórych globalna wartość jest ustawiona odmówić wyłączenia kiedy interrupt() nazywa się na nich przez shutdownNow ().

To powinno pozwolić Ci użyć numeru shutdownNow()?

0

Poinformuj pulę wątków o wyłączeniu, getQueue, dla każdego wyniku w poszczególnych Runnamach, usuń każdy Runnable za pomocą metody remove. W zależności od typu kolejki, możliwe jest wczesne zatrzymanie usuwania na podstawie wartości zwracanych.

Zasadniczo polega to na przechwytywaniu kolejki i czyszczeniu jej, a jedynie czyszczeniu za pomocą działających metod. Zamiast ręcznego zapamiętywania wszystkich zgłoszeń, wykorzystujesz fakt, że pula wątków musi już pamiętać wszystkie przesyłki. Jednak prawdopodobnie będziesz musiał wykonać obronną kopię kolejki, ponieważ uważam, że jest to podgląd na żywo, dlatego usunięcie prawdopodobnie spowodowałoby wyjątek modyfikacji współbieżnej, gdybyś wykonywał/przeglądał podgląd na żywo.

12

Kiedyś pracowałem nad aplikacją z długimi wątkami. Robimy to podczas wyłączania,

BlockingQueue<Runnable> queue = threadPool.getQueue(); 
List<Runnable> list = new ArrayList<Runnable>(); 
int tasks = queue.drainTo(list); 

Lista jest zapisywana do pliku. Podczas uruchamiania lista jest dodawana z powrotem do puli, więc nie tracimy żadnych zadań.

+2

Niezły dodatek do mojego pytania. :-) W moim przypadku uporczywość zadań nie stanowi problemu. Ale twój dodatek może pomóc innym, dzięki! :) – Malax

+0

Nie jestem pewien, czy to właściwe podejście. Jeśli zamkniesz system przy pomocy shutdown(), zaczekasz aż wszystko zostanie zakończone (nawet zadania w kolejce), jeśli zrobisz to za pomocą shutdownNow(), ta metoda już zwróci ci wyczerpaną listę oczekujących zadań. Tylko FYI w przypadku. – Whimusical

0

Czy po wyłączeniu nie działa awaitTermination(long timeout, TimeUnit unit)?

executor.shutdown(); executor.awaitTermination (60, TimeUnit.SECONDS)

1

Można spróbować allowCoreThreadTimeOut(true);

+0

Powinieneś rozwinąć swoją odpowiedź i wyjaśnić, że faktycznie przesłuchujesz OP. –

3

To stara sprawa, ale w przypadku tego pomaga kogoś innego: można ustawić wartość logiczną lotny podczas wywołania shutdown() i każde zakończone zadanie zostanie zakończone, jeśli ta wartość logiczna zostanie ustawiona przed rozpoczęciem. Umożliwi to zadanie, które rzeczywiście się zakończyło, ale uniemożliwi kolejkom wykonywanie rzeczywistych zadań.