2016-02-24 24 views
5

Czy istnieje kolejka oczekujących zadań użyta w połączeniu z Java 8's Executors.newWorkStealingPool()?W języku Java 8, czy Executors.newWorkStealingPool() również udostępnia kolejkę zadań?

Załóżmy na przykład, że # dostępne rdzenie to 2, a Executors.newWorkStealingPool() jest puste, ponieważ 2 zadania są już uruchomione. Co się stanie, jeśli trzecie zadanie zostanie przesłane do wykonawcy kradnącego pracę? Czy jest w kolejce? A jeśli tak, to jakie są granice, jeśli którekolwiek ze wspomnianej kolejki?

Z góry dziękuję.

+1

Nie mam konkretnej odpowiedzi i jestem zaskoczony, że nie jest to udokumentowane lepiej. Ale przynajmniej w OpenJDK 8, ta metoda produkuje ['ForkJoinPool'] (https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html), który nie używa po prostu 'BlockingQueue', ponieważ inne implementacje ... powodują wiele konfliktów, co prowadzi do narzutów. Zadania, które nie mogą zostać natychmiast wykonane * są nadal w kolejce. Omówiono to (wraz z kolejnymi ograniczeniami) w innej odpowiedzi: http://stackoverflow.com/a/30045601/228171 –

Odpowiedz

4

Czy istnieje kolejka oczekujących zadań używanych w połączeniu z Executorami.newWorkStealingPool() Java 8?

Tak, każdy wątek ma swój własny deque. Kiedy jeden wątek jest wykonywany z jego zadań, wykonuje zadanie z kolejki innego wątku i wykonuje je.

A jeśli tak, to jakie są granice, jeśli są na tej kolejce?

Maksymalny rozmiar kolejek jest ograniczone przez liczbę: static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M

Gdy kolejka jest pełny sposób niekontrolowany jest wyjątek: RejectedExecutionException("Queue capacity exceeded")

3

Z grepcode z Executors i ForkJoinPool

Executors. newWorkStealingPool powraca ForkJoinPool

eXecutors:

public static ExecutorService newWorkStealingPool() { 
     return new ForkJoinPool 
      (Runtime.getRuntime().availableProcessors(), 
      ForkJoinPool.defaultForkJoinWorkerThreadFactory, 
      null, true); 
    } 

ForkJoinPool:

public ForkJoinPool(int parallelism, 
         ForkJoinWorkerThreadFactory factory, 
         UncaughtExceptionHandler handler, 
         boolean asyncMode) { 
     this(checkParallelism(parallelism), 
      checkFactory(factory), 
      handler, 
      asyncMode ? FIFO_QUEUE : LIFO_QUEUE, 
      "ForkJoinPool-" + nextPoolId() + "-worker-"); 
     checkPermission(); 
    } 

Na execute():

public void execute(ForkJoinTask<?> task) { 
     if (task == null) 
      throw new NullPointerException(); 
     externalPush(task); 
    } 

externalPush dzwoni pod numer externalSubmit i można zobaczyć szczegóły tej implementacji w postaci WorkQueue.

externalSubmit:

// Operacje zewnętrzne

/** 
* Full version of externalPush, handling uncommon cases, as well 
* as performing secondary initialization upon the first 
* submission of the first task to the pool. It also detects 
* first submission by an external thread and creates a new shared 
* queue if the one at index if empty or contended. 
* 
* @param task the task. Caller must ensure non-null. 

*/ 

można znaleźć więcej szczegółów o rozmiarach kolejki w WorkQueue klasy

static final class WorkQueue { 

Documentation na WokrQueue:

/** 
    * Queues supporting work-stealing as well as external task 
    * submission. See above for descriptions and algorithms. 
    * Performance on most platforms is very sensitive to placement of 
    * instances of both WorkQueues and their arrays -- we absolutely 
    * do not want multiple WorkQueue instances or multiple queue 
    * arrays sharing cache lines. The @Contended annotation alerts 
    * JVMs to try to keep instances apart. 
    */ 
    @sun.misc.Contended 

/** 
    * Capacity of work-stealing queue array upon initialization. 
    * Must be a power of two; at least 4, but should be larger to 
    * reduce or eliminate cacheline sharing among queues. 
    * Currently, it is much larger, as a partial workaround for 
    * the fact that JVMs often place arrays in locations that 
    * share GC bookkeeping (especially cardmarks) such that 
    * per-write accesses encounter serious memory contention. 
    */ 
    static final int INITIAL_QUEUE_CAPACITY = 1 << 13; 

    /** 
    * Maximum size for queue arrays. Must be a power of two less 
    * than or equal to 1 << (31 - width of array entry) to ensure 
    * lack of wraparound of index calculations, but defined to a 
    * value a bit less than this to help users trap runaway 
    * programs before saturating systems. 
    */ 
    static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M