2012-03-28 27 views
9

Mam do czynienia z różnymi strategiami łączenia wątków za pomocą ThreadPoolExecutor z JDK6. Mam kolejkę Priority działającą, ale nie byłem pewien, czy podoba mi się, że pula nie ma rozmiaru po keepAliveTime (co otrzymujesz w nieograniczonej kolejce). Tak, patrzę na ThreadPoolExecutor przy użyciu zasady LinkedBlockingQueue i CallerRuns.Dlaczego ThreadPoolExecutor redukuje wątki poniżej corePoolSize po keepAliveTime?

Problem, który mam teraz, polega na tym, że pula wzrasta, ponieważ dokumentacja wyjaśnia, że ​​powinna, ale po zakończeniu zadań i uruchomieniu keepAliveTime, funkcja getPoolSize pokazuje, że pula zostaje zmniejszona do zera. Przykładowy kod poniżej powinny pozwolić widać podstawę do mojego pytania:

public class ThreadPoolingDemo { 
    private final static Logger LOGGER = 
     Logger.getLogger(ThreadPoolingDemo.class.getName()); 

    public static void main(String[] args) throws Exception { 
     LOGGER.info("MAIN THREAD:starting"); 
     runCallerTestPlain(); 
    } 

    private static void runCallerTestPlain() throws InterruptedException { 
     //10 core threads, 
     //50 max pool size, 
     //100 tasks in queue, 
     //at max pool and full queue - caller runs task 
     ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 50, 
      5L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100), 
      new ThreadPoolExecutor.CallerRunsPolicy()); 

     //dump 5000 tasks on the queue 
     for (int i = 0; i < 5000; i++) { 
      tpe.submit(new Runnable() { 
       @Override 
       public void run() { 
        //just to eat some time and give a little feedback 
        for (int j = 0; j < 20; j++) { 
         LOGGER.info("First-batch Task, looping:" + j + "[" 
           + Thread.currentThread().getId() + "]"); 
        } 
       } 
      }, null); 
     } 
     LOGGER.info("MAIN THREAD:!!Done queueing!!"); 

     //check tpe statistics forever 
     while (true) { 
      LOGGER.info("Active count: " + tpe.getActiveCount() + " Pool size: " 
       + tpe.getPoolSize() + " Largest Pool: " + tpe.getLargestPoolSize()); 
      Thread.sleep(1000); 
     } 
    } 
} 

znalazłem starą błąd, który wydaje się być to problem, ale było zamknięte: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6458662. Czy to może być nadal obecne w wersji 1.6 czy coś mi brakuje?

Wygląda na to, że I Rubber Ducked this (http://www.codinghorror.com/blog/2012/03/rubber-duck-problem-solving.html). Błąd, który podałem powyżej, jest związany z tym: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6576792, gdzie problem wydaje się być rozwiązany w 1.7 (załadowałem 1.7 i zweryfikowałem - naprawiono ...). Myślę, że moim głównym problemem było to, że błąd ten fundamentalny pozostał przez prawie dziesięć lat. Spędziłem zbyt dużo czasu, pisząc to, aby nie opublikować go teraz, mam nadzieję, że to pomoże komuś.

+0

+1 Miłe znalezisko, byłem zaskoczony, widząc to zachowanie. –

+1

Być może lepiej byłoby ustrukturyzować swój post jako pytanie, a następnie podać to, czego nauczyłeś się jako odpowiedź? –

Odpowiedz

6

... po zakończeniu zadań i uruchomieniu keepAliveTime, funkcja getPoolSize pokazuje, że pula została zmniejszona do zera.

Wygląda więc na to, że wyścig jest w stanie ThreadPoolExecutor. Wydaje mi się, że działa zgodnie z projektem, choć nieoczekiwane. W metodzie getTask() której pętla wątków roboczych przez uzyskać zadania z kolejki blokującej, widzisz ten kod:

if (state == SHUTDOWN) // Help drain queue 
    r = workQueue.poll(); 
else if (poolSize > corePoolSize || allowCoreThreadTimeOut) 
    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); 
else 
    r = workQueue.take(); 
if (r != null) 
    return r; 
if (workerCanExit()) { 
    if (runState >= SHUTDOWN) // Wake up others 
     interruptIdleWorkers(); 
    return null; 
} 

Jeśli poolSize rośnie powyżej corePoolSize następnie, jeśli czasy Ankieta odchodzący po keepAliveTime kod spada do workerCanExit() od r jest null. Wszystkie nici może powrócić true z tej metody, ponieważ jest tylko sprawdzenie stanu z poolSize:

mainLock.lock(); 
    boolean canExit; 
    try { 
     canExit = runState >= STOP || 
      workQueue.isEmpty() || 
      (allowCoreThreadTimeOut && 
      poolSize > Math.max(1, corePoolSize)); << test poolSize here 
    } finally { 
     mainLock.unlock();       << race to workerDone() begins 
    } 

Raz, że zwraca true następnie wyjść wątku roboczego i następniepoolSize jest zmniejszany. Jeśli wszystkie wątki robocze wykonają ten test w tym samym czasie, wszystkie zostaną zakończone z powodu wyścigu między testowaniem poolSize a zatrzymaniem pracownika, gdy wystąpi --poolSize.

Co mnie zaskakuje, to jak konsekwentny jest ten stan wyścigu. Jeśli dodasz trochę randomizacji do sleep() poniżej run() poniżej, możesz uzyskać kilka rdzennych wątków, aby nie rezygnować, ale myślałem, że warunki wyścigu byłyby trudniejsze do trafienia.


Widać to zachowanie w poniższym teście:

@Test 
public void test() throws Exception { 
    int before = Thread.activeCount(); 
    int core = 10; 
    int max = 50; 
    int queueSize = 100; 
    ThreadPoolExecutor tpe = 
      new ThreadPoolExecutor(core, max, 1L, TimeUnit.SECONDS, 
        new LinkedBlockingQueue<Runnable>(queueSize), 
        new ThreadPoolExecutor.CallerRunsPolicy()); 
    tpe.allowCoreThreadTimeOut(false); 
    assertEquals(0, tpe.getActiveCount()); 
    // if we start 1 more than can go into core or queue, poolSize goes to 0 
    int startN = core + queueSize + 1; 
    // if we only start jobs the core can take care of, then it won't go to 0 
    // int startN = core + queueSize; 
    for (int i = 0; i < startN; i++) { 
     tpe.submit(new Runnable() { 
      @Override 
      public void run() { 
       try { 
        Thread.sleep(100); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
      } 
     }); 
    } 
    while (true) { 
     System.out.println("active = " + tpe.getActiveCount() + ", poolSize = " + tpe.getPoolSize() 
       + ", largest = " + tpe.getLargestPoolSize() + ", threads = " + (Thread.activeCount() - before)); 
     Thread.sleep(1000); 
    } 
} 

Jeśli zmienić linię wewnątrz metody run()sleep aby coś takiego:

private final Random random = new Random(); 
... 
    Thread.sleep(100 + random.nextInt(100)); 

uczyni stan wyścigu trudniejszy do trafienia, więc niektóre wątki rdzenia będą nadal dostępne.

+0

OP jest bardziej zainteresowany 'getPoolSize()', a nie 'getActiveCount()' Po zakończeniu wszystkich zadań i ważności keepAliveTime wszystkie wątki faktycznie kończą się, chociaż TPE tego nie robi. –

+0

@Gray moje parametry TPE są ważne. To nie zawsze kończy się niepowodzeniem. Używając użytych parametrów, zobaczymy, że działa zgodnie z oczekiwaniami. Podane przeze mnie parametry są zgodne z regułami TPE w unikalny, ale także realistyczny sposób (tj. Ogromne zadanie wsadowe jest zrzucane w kolejce). Głównym problemem z twoimi parametrami jest to, że nigdy nie uruchamiasz sytuacji, która powoduje dodanie wątków do puli powyżej i poza rozmiar rdzenia. Aby przekroczyć/dodać poza rozmiar rdzenia, pula musi mieć rozmiar rdzenia (10), a kolejka musi przekroczyć jej limit (100). Spróbuj przykładu z 'for (int i = 0; i andematt

+0

@andematt Interesująca podróż. To jest wyścig w "ThreadPoolExecutor". Edytowałem swoją odpowiedź. – Gray