2013-07-25 17 views
12

W Scali przed 2.10 mogę ustawić równoległość w defaultForkJoinPool (jak w tej odpowiedzi scala parallel collections degree of parallelism). W Scali 2.10 ten interfejs API już nie istnieje. Dobrze udokumentowano, że możemy ustawić równoległość w jednej kolekcji (http://docs.scala-lang.org/overviews/parallel-collections/configuration.html), przypisując jej właściwość taskSupport.Jak ustawić domyślną liczbę wątków dla równoległych kolekcji Scala 2.10?

Używam jednak równoległych kolekcji w całym moim kodzie źródłowym i nie chciałbym dodawać dodatkowych dwóch linii do każdej instancji kolekcji. Czy istnieje sposób konfigurowania globalnego domyślnego rozmiaru puli wątków, aby someCollection.par.map(f(_)) automatycznie używał domyślnej liczby wątków?

+1

To trochę nad głową, ale patrząc na kod źródłowy scala zauważyłem coś ... wydaje się, że równoległe kolekcje opierają swoje tworzenie na "defaultTaskSupport", którego nie widzę sposobu na przesłonięcie, ponieważ jest to obiekt Val. https://github.com/scala/scala/blob/v2.10.2/src/library/scala/collection/parallel/package.scala – LaloInDublin

Odpowiedz

14

Wiem, że pytanie ma ponad miesiąc, ale miałem właśnie to samo pytanie. Googling nie był pomocny i nie mogłem znaleźć niczego, co w nowym interfejsie API wyglądałoby w połowie zdrowego rozsądku.

Ustawianie -Dscala.concurrent.context.maxThreads = n jak sugeruje tutaj: Set the parallelism level for all collections in Scala 2.10? pozornie nie miały żadnego wpływu na wszystkich, ale nie jestem pewien, czy użyłem go poprawnie (I uruchomić mój wniosek z „Java” w środowisku bez zainstalowanego "scala", może to być przyczyną).

Nie wiem, dlaczego ludzie scala usunęli ten niezbędny seter z odpowiedniego obiektu paczki.

Jednakże, jest to często możliwe użycie odbicie obejść niekompletnego/dziwny interfejs:

def setParallelismGlobally(numThreads: Int): Unit = { 
    val parPkgObj = scala.collection.parallel.`package` 
    val defaultTaskSupportField = parPkgObj.getClass.getDeclaredFields.find{ 
    _.getName == "defaultTaskSupport" 
    }.get 

    defaultTaskSupportField.setAccessible(true) 
    defaultTaskSupportField.set(
    parPkgObj, 
    new scala.collection.parallel.ForkJoinTaskSupport(
     new scala.concurrent.forkjoin.ForkJoinPool(numThreads) 
    ) 
) 
} 

Dla tych, którzy nie znają bardziej niejasnych cech Scali, oto krótkie wyjaśnienie:

scala.collection.parallel.`package` 

uzyskuje dostęp do obiektu pakietu ze zmienną defaultTaskSupport (wygląda trochę jak zmienna statyczna Java, ale w rzeczywistości jest to zmienna składowa obiektu pakietu). Do identyfikatora wymagane są odciski, ponieważ package jest zastrzeżonym słowem kluczowym. Następnie otrzymujemy prywatne pole końcowe, które chcemy (getField ("defaultTaskSupport") nie działa z jakiegoś powodu? ...), powiedz, żeby był dostępny, aby móc go zmodyfikować, a następnie zastąp go wartością nasz własny ForkJoinTaskSupport.

Nie rozumiem jeszcze dokładnego mechanizmu tworzenia kolekcji równoległych, ale kod źródłowy funkcji Combiner sugeruje, że wartość defaultTaskSupport powinna jakoś przenikać do kolekcji równoległych.

Zauważ, że pytanie jest jakościowo tego samego rodzaju co znacznie starsze pytanie: "Mam Math.random() na całym moim codebase, w jaki sposób ustawić seed na stałą liczbę do celów debugowania?" (Patrz np.: Set seed on Math.random()). W obu przypadkach mamy jakąś globalną "statyczną" zmienną, którą domyślnie używamy w milionach różnych miejsc, chcemy ją zmienić, ale nie ma ustawników dla tej zmiennej => używamy refleksji.

Brzydko jak diabli, ale wydaje się działać dobrze. Jeśli chcesz ograniczyć całkowitą liczbę wątków, nie zapominaj, że garbage collector działa na osobnym wątku.

+0

Piękno! Szukałem sposobu, aby to dokładnie zrobić. Miałem nadzieję, że to nie będzie takie brzydkie, ale to nie jest twoja wina. Dziwnie nieprzydatny interfejs to wina. – itsbruce