2012-05-26 9 views
5

Zacząłem czytać więcej o ThreadPoolExecutor z Java Doc, ponieważ używam go w jednym z moich projektów. Więc czy ktoś może mi wyjaśnić, co właściwie oznacza ta linia? - Wiem, co oznacza każdy parametr, ale chciałem to zrozumieć w sposób bardziej ogólny/świecki od niektórych ekspertów tutaj.ThreadPoolExecutor with ArrayBlockingQueue

ExecutorService service = new ThreadPoolExecutor(10, 10, 1000L, 
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10, true), new 
ThreadPoolExecutor.CallerRunsPolicy()); 

Aktualizacja: - komunikat Problem jest: -

Każdy wątek wykorzystuje unikatowy identyfikator między 1 a 1000 a program ma trwać 60 minut lub dłużej, więc w tym 60 minut jest możliwe że wszystkie identyfikatory zostaną zakończone, więc muszę ponownie użyć tych identyfikatorów. Więc to jest poniższy program napisany przy użyciu powyższego executora.

class IdPool { 
    private final LinkedList<Integer> availableExistingIds = new LinkedList<Integer>(); 

    public IdPool() { 
     for (int i = 1; i <= 1000; i++) { 
      availableExistingIds.add(i); 
     } 
    } 

    public synchronized Integer getExistingId() { 
     return availableExistingIds.removeFirst(); 
    } 

    public synchronized void releaseExistingId(Integer id) { 
     availableExistingIds.add(id); 
    } 
} 


class ThreadNewTask implements Runnable { 
    private IdPool idPool; 

    public ThreadNewTask(IdPool idPool) { 
     this.idPool = idPool; 
    } 

    public void run() { 
     Integer id = idPool.getExistingId(); 
     someMethod(id); 
     idPool.releaseExistingId(id); 
    } 

// This method needs to be synchronized or not? 
    private synchronized void someMethod(Integer id) { 
     System.out.println("Task: " +id); 
// and do other calcuations whatever you need to do in your program 
    } 
} 

public class TestingPool { 
    public static void main(String[] args) throws InterruptedException { 
     int size = 10; 
     int durationOfRun = 60; 
     IdPool idPool = new IdPool(); 
     // create thread pool with given size 
     ExecutorService service = new ThreadPoolExecutor(size, size, 500L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(size), new ThreadPoolExecutor.CallerRunsPolicy()); 

     // queue some tasks 
     long startTime = System.currentTimeMillis(); 
     long endTime = startTime + (durationOfRun * 60 * 1000L); 

     // Running it for 60 minutes 
     while(System.currentTimeMillis() <= endTime) { 
      service.submit(new ThreadNewTask(idPool)); 
     } 

     // wait for termination   
     service.shutdown(); 
     service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 
    } 
} 

Moje pytania są następujące: - Czy ten kod jest właściwy, jeśli chodzi o wydajność, czy nie? I co jeszcze mogę tu zrobić, aby był dokładniejszy? Każda pomoc zostanie doceniona.

+0

nie widzę niczego złego w kodzie, inne niż może posiadające try/finally w metodzie „Uruchom”, aby zapewnić, że id zawsze uwolnić (do kiedy mają bardziej skomplikowany kod w „metody someMethod "). – Matt

+0

@Matt, Dzięki za komentowanie, więc someMethod musi być zsynchronizowany? albo nie? Tak jak w moim przypadku dokonałem synchronizacji. – ferhan

+0

I "idPool.releaseExistingId (id);" powinien przyjść w końcu bloku w prawo? – ferhan

Odpowiedz

8

[Po pierwsze, przepraszam, to jest odpowiedź na poprzednią odpowiedź, ale chciałem sformatować].

Poza tym NIE blokujesz, gdy element zostanie przekazany do ThreadPoolExecutor z pełną kolejką. Powodem tego jest fakt, że ThreadPoolExecutor wywołuje metodę BlockingQueue.offer (T item), która z definicji jest metodą nieblokującą. Dodaje element i zwraca wartość true lub nie dodaje (gdy jest pełny) i zwraca wartość false. ThreadPoolExecutor następnie wywołuje zarejestrowany RejectedExecutionHandler, aby poradzić sobie z tą sytuacją.

Z javadoc:

Wykonuje daną czynność kiedyś w przyszłości. Zadanie może wykonać w nowym wątku lub w istniejącym połączonym wątku. Jeśli zadanie nie może zostać przesłane do wykonania, ponieważ wykonawca został wyłączony lub jego pojemność została osiągnięta, zadanie jest obsługiwane przez bieżący RejectedExecutionHandler.

Domyślnie ThreadPoolExecutor.AbortPolicy() służy która rzuca RejectedExecutionException z „submit” lub „execute” sposobu ThreadPoolExecutor.

try { 
    executorService.execute(new Runnable() { ... }); 
} 
catch (RejectedExecutionException e) { 
    // the queue is full, and you're using the AbortPolicy as the 
    // RejectedExecutionHandler 
} 

Można jednak stosować inne procedury obsługi robić coś innego, na przykład zignorować błąd (DiscardPolicy), lub uruchomić go w wątku, który nazywa się „Execute” lub „wyślij” metody (CallerRunsPolicy). Ten przykład zezwala na to, aby dowolny wątek wywoływał metodę "submit" lub "execute" uruchom żądane zadanie, gdy kolejka jest pełna. (To znaczy w danym momencie, można 1 dodatkowa rzecz działa się na górze, co w samej puli):

ExecutorService service = new ThreadPoolExecutor(..., new ThreadPoolExecutor.CallerRunsPolicy()); 

Jeśli chcesz zablokować i czekać, można zaimplementować własną RejectedExecutionHandler które blokują aż tam dostępny w kolejce gniazdo (to oszacowanie, nie mam skompilowane lub uruchomić, ale powinien dostać ten pomysł):

public class BlockUntilAvailableSlot implements RejectedExecutionHandler { 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 
    if (e.isTerminated() || e.isShutdown()) { 
     return; 
    } 

    boolean submitted = false; 
    while (! submitted) { 
     if (Thread.currentThread().isInterrupted()) { 
      // be a good citizen and do something nice if we were interrupted 
      // anywhere other than during the sleep method. 
     } 

     try { 
      e.execute(r); 
      submitted = true; 
     } 
     catch (RejectedExceptionException e) { 
     try { 
      // Sleep for a little bit, and try again. 
      Thread.sleep(100L); 
     } 
     catch (InterruptedException e) { 
      ; // do you care if someone called Thread.interrupt? 
      // if so, do something nice here, and maybe just silently return. 
     } 
     } 
    } 
    } 
} 
+0

Nie rozumiem tego, co właśnie powiedziałeś, ponieważ jestem nowy w rodzinie Executora, więc sposób, w jaki robię w moim pytaniu, jest poprawny, czy nie? Czy może nastąpić poprawa w kodzie? – ferhan

+0

Po prostu komentowałem czyjeś twierdzenie, że "Używa ArrayBlockingQueue do zarządzania żądaniami wykonawczymi z 10 gniazdami, więc kiedy kolejka jest pełna (po 10 wątkach zostały zakolejkowane), zablokuje ona dzwoniącego." Tak nie jest, jak wyjaśniono powyżej, chyba że celowo zrobisz jakiś kod, aby tak się stało. Używanie polityki uruchamiania wywołującego faktycznie uruchamia ją w bieżącym wątku, powyższa instrukcja zakłada, że ​​blokowałbyś, dopóki nie będzie miejsca w kolejce. – Matt

2

Tworzy on ExecutorService, który obsługuje wykonanie puli wątków. Zarówno początkowa, jak i maksymalna liczba wątków w puli wynosi 10 w tym przypadku. Kiedy wątek w puli stanie się bezczynny przez 1 sekundę (1000 ms), to go zabije (licznik czasu bezczynności), jednak ponieważ maksymalna i podstawowa liczba wątków jest taka sama, to nigdy się nie stanie (zawsze utrzymuje 10 wątków wokół i będzie nigdy nie uruchamiaj więcej niż 10 wątków).

Używa on ArrayBlockingQueue do zarządzania żądaniami wykonawczymi z 10 gniazdami, więc gdy kolejka jest pełna (po 10 wątkach zostały zakolejkowane), zablokuje ona dzwoniącego.

Jeśli nić zostanie odrzucony (który w tym przypadku byłoby powodu usługa zamknięcie, ponieważ wątki będą w kolejce lub zostanie zablokowany, gdy w kolejce wątku jeśli kolejka jest pełna), a następnie oferowane Runnable będą realizowane w wątku dzwoniącego.

+0

Dzięki za komentarz i co to oznacza w ArrayBlockingQueue? – ferhan

+0

Ma stałą liczbę slotów wspieranych przez tablicę i blokuje wywołującego podczas wprowadzania, jeśli są pełne. Istnieją inne rodzaje kolejek, które można wykorzystać. Popatrz na jego opis w Javadoc, aby uzyskać więcej informacji. –

+0

Zaktualizowałem moje pytanie, w którym napisałem swój kod, a także wspomniałem o moim problemie, więc czy jest jakiś problem w tym kodzie? jeśli tak, w jaki sposób mogę uczynić go dokładniejszym? Każda pomoc zostanie doceniona. – ferhan

0

Rozważmy semaforów. Są one przeznaczone do tego samego celu. Proszę sprawdzić poniżej kod za pomocą semafora. Nie jestem pewien, czy tego właśnie chcesz. Ale to zablokuje, jeśli nie ma już żadnych zezwoleń na nabycie. Czy identyfikacja jest dla Ciebie ważna?

import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Semaphore; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 

class ThreadNewTask implements Runnable { 
    private Semaphore idPool; 

    public ThreadNewTask(Semaphore idPool) { 
     this.idPool = idPool; 
    } 

    public void run() { 
//  Integer id = idPool.getExistingId(); 
     try { 
      idPool.acquire(); 
      someMethod(0); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } finally { 
      idPool.release(); 
     } 
//  idPool.releaseExistingId(id); 
    } 

    // This method needs to be synchronized or not? 
    private void someMethod(Integer id) { 
     System.out.println("Task: " + id); 
     // and do other calcuations whatever you need to do in your program 
    } 
} 

public class TestingPool { 
    public static void main(String[] args) throws InterruptedException { 
     int size = 10; 
     int durationOfRun = 60; 
     Semaphore idPool = new Semaphore(100); 
//  IdPool idPool = new IdPool(); 
     // create thread pool with given size 
     ExecutorService service = new ThreadPoolExecutor(size, size, 500L, 
       TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(size), 
       new ThreadPoolExecutor.CallerRunsPolicy()); 

     // queue some tasks 
     long startTime = System.currentTimeMillis(); 
     long endTime = startTime + (durationOfRun * 60 * 1000L); 

     // Running it for 60 minutes 
     while (System.currentTimeMillis() <= endTime) { 
      service.submit(new ThreadNewTask(idPool)); 
     } 

     // wait for termination 
     service.shutdown(); 
     service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 
    } 
} 
+0

Czy możesz podać przykład oparty na moim stwierdzeniu problemu? – ferhan

+0

Tak, ID jest dla mnie ważny i muszę przekazać identyfikator do metody, aby każdy wątek używał innego unikalnego identyfikatora. – ferhan

+0

OK. Twoje podejście wygląda wtedy dobrze? – Ravi