2015-10-01 18 views
9

Zwykle, gdy używa się metody parallelStream języka Java 8, wynikiem jest wykonanie za pomocą domyślnej, wspólnej puli łączenia widżetów (to jest ForkJoinPool.commonPool()).Java parallelStream() z niestandardową pulą z kradzieżą rozmówcy?

Jest to jednak wyraźnie niepożądane, jeśli ma się pracę, która jest daleko od obciążenia procesora, np. może często czekać na IO. W takich przypadkach należy użyć oddzielnej puli, dostosowanej do innych kryteriów (np. Ile czasu prawdopodobnie będzie to faktycznie wykorzystywać procesor).

Nie ma oczywistego, oczywiste, sposób uzyskania parallelStream(), aby użyć innej puli, ale jest sposób, jak szczegółowe here.

Niestety, takie podejście pociąga za sobą wywołanie operacji terminalu w strumieniu równoległym z wątku puli połączeń wideł. Wadą tego jest to, że jeśli pula dołączania celu-widelca jest całkowicie zajęta istniejącą pracą, całe wykonanie będzie czekać na nią, nie robiąc absolutnie nic. W ten sposób pula może stać się wąskim gardłem gorszym niż wykonanie z pojedynczym gwintem. Natomiast gdy używasz parallelStream() w "normalny" sposób, ForkJoinPool.common.externalHelpComplete() lub ForkJoinPool.common.tryExternalUnpush() są używane, aby wywoływać wątek spoza puli, pomagając w przetwarzaniu.

Czy ktoś wie o sposób zarówno dostać parallelStream() do korzystania z innych niż domyślne widelec-join basen i mają gwint dzwonisz spoza puli pomocy widelca-przyłączyć się do przetworzenia tej pracy (ale nie reszta pracy związanej z basenem widłowym)?

+3

Nie rozumiem twojego problemu. Wadą tego jest to, że jeśli pula dołączenia celu-widelca jest całkowicie zajęta istniejącym work_. Czy nie tworzysz nowej puli tylko dla tego równoległego wywoływania strumienia? –

+1

Jest jeszcze gorzej. Kiedy wywołujesz 'get' na swoich zadaniach, które nie znajduje się we wspólnej puli, nadal będzie wywoływać' ForkJoinPool.common.tryExternalUnpush() ', ale oczywiście nie znajdzie zadania w kolejce wspólnej puli. – Holger

+0

Aby odpowiedzieć na pytanie, nie, nie tworzyłem nowej puli wątków tylko dla tego wywołania. Raczej udostępniam tę pulę wątków w wielu podobnych wywołaniach, z których niektóre mogą się nakładać, niektóre z nich mogą mieć znacznie dłuższe/większe zadania niż inne. –

Odpowiedz

2

Możesz użyć awaitQuiescence na basenie, aby pomóc. Jednak nie możesz wybrać zadań, które pomożemy, a tylko wezmą kolejne oczekujące z puli, a więc, jeśli będzie więcej oczekujących zadań, możesz skończyć wykonywanie ich przed uzyskaniem własnych.

ForkJoinPool forkJoinPool = new ForkJoinPool(1); 
// make all threads busy: 
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE)); 
// submit our task (may contain your stream operation) 
ForkJoinTask<Thread> task = forkJoinPool.submit(() -> Thread.currentThread()); 
// help out 
while(!task.isDone()) // use zero timeout to execute one task only 
    forkJoinPool.awaitQuiescence(0, TimeUnit.NANOSECONDS); 
System.out.println(Thread.currentThread()==task.get()); 

wydrukuje true.

natomiast

ForkJoinPool forkJoinPool = new ForkJoinPool(1); 
// make all threads busy: 
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE)); 
// overload: 
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE)); 
// submit our task (may contain your stream operation) 
ForkJoinTask<Thread> task = forkJoinPool.submit(() -> Thread.currentThread()); 
// help out 
while(!task.isDone()) 
    forkJoinPool.awaitQuiescence(0, TimeUnit.NANOSECONDS); 
System.out.println(Thread.currentThread()==task.get()); 

zawiśnie na zawsze, jak to próbuje wykonać drugie zadanie blokowania.

Mimo to pozwoli wątkowi inicjującemu przetworzyć przetwarzane zadania puli, które zwiększą szansę na wykonanie jego własnego zadania, o ile nie ma nieskończonych zadań (powyższy przykład jest ekstremalny i wybrany tylko do demonstracji).


Należy jednak pamiętać, że cała relacja między Fork/Dołącz ramy i Stream API jest szczegółów wdrażania tak.

+0

Doszedłem do wniosku, że mogę to zrobić, ale jak zauważyłeś, może to oznaczać, że pomagam w innym wątku ze swoimi zadaniami, zamiast pomagać w pracy własnej. To rodzaj nie-rozrusznika. Ponadto, widzę, że przyłączenie widełek jest szczegółem implementacji, ale musi być lepsza kontrola nad metodą parallelStream(), np. coś tak prostego jak parallelStream (forkJoinPool). –

+1

Cóż, z metodą taką jak 'parallelStream (ForkJoinPool)', to nie byłby już szczegół implementacji ... – Holger

+0

Nie, ale to, co parallelStream() z no-argumentami nadal byłoby szczegółem implementacji. To po prostu dałoby ci możliwość odrobiny kontroli w przypadkach, gdy jest to potrzebne. –