2013-03-18 32 views
16

Moje ThreadPoolExecutor nie tworzy nowych wątków. W rzeczywistości napisałem nieco hacky LinkedBlockingQueue, który zaakceptuje dowolne zadanie (tj. Jest nieograniczony), ale wywoła dodatkowy moduł obsługi - który w moich aplikacjach wypluwa ostrzeżenie śledzące, że pula jest w tyle - co daje mi bardzo wyraźną informację, że TPE odmawia tworzyć nowe wątki, mimo że kolejka zawiera tysiące wpisów. Mój konstruktor wygląda następująco:ThreadPoolExecutor z nieograniczoną kolejką nie tworząc nowych wątków

private final ExecutorService s3UploadPool = 
new ThreadPoolExecutor(1, 40, 1, TimeUnit.HOURS, unboundedLoggingQueue); 

Dlaczego nie tworzy nowych wątków?

+0

Związane z http://stackoverflow.com/questions/19528304/how-to-get-tread-examplepoolexecutor-to-increase-threads-to-max-before-queueing/19528305#19528305 – Gray

Odpowiedz

16

Ten haczyka jest pokryta this blog post:

Ta budowa puli wątków po prostu nie działać zgodnie z oczekiwaniami. Wynika to z logiki wewnątrz ThreadPoolExecutor, w której dodawane są nowe wątki, jeśli nie można zaoferować zadania kolejce. W naszym przypadku używamy nieograniczonego LinkedBlockingQueue, w którym zawsze możemy zaproponować zadanie kolejce. Oznacza to, że nigdy nie przekroczymy podstawowego rozmiaru puli i maksymalnej wielkości puli.

Jeśli musisz oddzielić minimalne od maksymalnych rozmiarów basenu, będziesz musiał wykonać rozszerzone kodowanie. Nie jestem świadomy rozwiązania, które istnieje w bibliotekach Java lub Apache Commons. Rozwiązaniem jest utworzenie sprzężonego BlockingQueue, który jest świadomy TPE, i zrobi wszystko, aby odrzucić zadanie, jeśli wie, że TPE nie ma dostępnych wątków, a następnie ręcznie je zgłoś. Jest to bardziej szczegółowo omówione w postach z linkami. Ostatecznie Twoja konstrukcja będzie wyglądać następująco:

public static ExecutorService newScalingThreadPool(int min, int max, long keepAliveTime) { 
    ScalingQueue queue = new ScalingQueue(); 
    ThreadPoolExecutor executor = 
     new ScalingThreadPoolExecutor(min, max, keepAliveTime, TimeUnit.MILLISECONDS, queue); 
    executor.setRejectedExecutionHandler(new ForceQueuePolicy()); 
    queue.setThreadPoolExecutor(executor); 
    return executor; 
} 

jednak prościej ustawić corePoolSize do maxPoolSize i nie martw się o to nonsens.

+0

uwaga, nadal możesz uzyskać ograniczony efekt skalowania, pozwalając na przekroczenie limitu czasu dla wątków rdzeniowych (można skalować od 0 do maks. wątków). – jtahlborn

+0

Zgodnie z javadocs: Ustawiając corePoolSize i maximumPoolSize to samo, tworzysz pulę wątków o ustalonym rozmiarze. Więc jeśli podążasz za swoim ostatnim zdaniem, lepiej byłoby po prostu użyć Executors.newFixedThreadPool (poolSize). – darrickc

+0

@darrickc wygląda na to, że twoja edycja została już odrzucona, ale należy ją uwzględnić w komentarzu. Nie sądzę, że jest to poprawne, ponieważ w moich przypadkach użycia nie chcę, aby wątki wygasały. – djechlin

3

Jak wspomniano przez @dłamlin, jest to część (zaskakujące dla wielu) określonego zachowania ThreadPoolExecutor. Wierzę, że znalazłem trochę eleganckie rozwiązanie wokół tego zachowania, które pokazuję w moim odpowiedź tutaj:

How to get the ThreadPoolExecutor to increase threads to max before queueing?

Zasadniczo rozszerzyć LinkedBlockingQueue mieć zawsze return false dla queue.offer(...) który doda dodatkowe wątki do puli, jeśli to konieczne. Jeśli pula jest już w max wątków i wszystkie są zajęte, zostanie wywołany RejectedExecutionHandler. Jest to program obsługujący, który następnie wprowadza put(...) do kolejki.

Zobacz mój kod tam.

+1

Dzięki. Naprawiony. @kevinarpe. – Gray

3

Istnieje obejście tego problemu. Rozważmy następujące wykonania: Na

int corePoolSize = 40; 
int maximumPoolSize = 40; 
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 
    60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); 
threadPoolExecutor.allowCoreThreadTimeOut(true); 

przez ustawienie allowCoreThreadTimeOut() do true gwinty w basenie mogą zakończyć po upływie określonego czasu oczekiwania (60 sekund w tym przykładzie). W przypadku tego rozwiązania jest to argument konstruktora corePoolSize, który określa w praktyce maksymalny rozmiar puli, ponieważ pula wątków wyrośnie do corePoolSize, a następnie rozpocznie dodawanie zadań do kolejki. Prawdopodobnie pula nigdy nie będzie większa, ponieważ pula nie będzie tworzyć nowych wątków, dopóki kolejka nie będzie pełna (co, biorąc pod uwagę, że LinkedBlockingQueue ma pojemność Integer.MAX_VALUE, nigdy nie może się wydarzyć).W związku z tym nie ma sensu ustawianie wartości maximumPoolSize na wartość większą niż corePoolSize.

Uwaga: Pula wątków ma 0 bezczynnych wątków po upłynięciu limitu czasu, co oznacza, że ​​przed utworzeniem wątków będzie pewne opóźnienie (zwykle zawsze dostępne są wątki corePoolSize).

Więcej szczegółów można znaleźć w JavaDoc z ThreadPoolExecutor.