2015-09-15 9 views
11

chcę zrobić równoległą mapę nad wielkim listy. Kod wygląda nieco jak poniżej:Task.async w Elixir Stream

big_list 
|> Stream.map(&Task.async(Module, :do_something, [&1])) 
|> Stream.map(&Task.await(&1)) 
|> Enum.filter filter_fun 

Ale sprawdzał realizację Stream io ile rozumiem Stream.map łączy funkcje i zastosowanie funkcji w połączeniu z elementami w strumieniu, co oznaczałoby, że sekwencja jest tak:

  1. Weźmy pierwszy element
  2. Utwórz zadanie aSYNC
  3. czekać na to, aby zakończyć
  4. zająć drugie elelemnt ...

W tym przypadku, to nie robi tego równolegle. Czy mam rację, czy też czegoś brakuje?

Jeśli mam rację, a co z tym kodem?

Stream.map Task.async ... 
|> Enum.map Task.await ... 

Czy to będzie działać równolegle?

+2

przeczytać - http://www.theerlangelist.com/2015/07/beyond-taskasync.html – emaillenin

Odpowiedz

9

Drugi też nie robić, co chcesz. Widać to wyraźnie za pomocą tego kodu:

defmodule Test do 
    def test do 
    [1,2,3] 
    |> Stream.map(&Task.async(Test, :job, [&1])) 
    |> Enum.map(&Task.await(&1)) 
    end 

    def job(number) do 
    :timer.sleep 1000 
    IO.inspect(number) 
    end 
end 

Test.test 

Zobaczysz liczbę, a następnie 1 sekundę oczekiwania, kolejną liczbę i tak dalej. Kluczem tutaj jest to, że chcesz utworzyć zadania jak najszybciej, więc nie powinno się używać leniwe Stream.map w ogóle. Zamiast używać chętny Enum.map w tym punkcie:

|> Enum.map(&Task.async(Test, :job, [&1])) 
|> Enum.map(&Task.await(&1)) 

Z drugiej strony można użyć Stream.map gdy czekając tak długo, jak to zrobić jakiś chętny operację później, jak twój filter. W ten sposób oczekiwania będą przeplatane z jakimkolwiek przetwarzaniem, które możesz wykonać na wynikach.

4

Elixir 1.4 dostarcza nową funkcję Task.async_stream/5, która zwróci strumień, który uruchamia daną funkcję jednocześnie na każdym elemencie w przeliczalnym.

Dostępne są również opcje określania maksymalnej liczby pracowników i limitu czasu, przy użyciu parametrów opcji :max_concurrency i :timeout.


To sprawi, że przykład biec równolegle:

big_list 
|> Task.async_stream(Module, :do_something, [&1]) 
|> Enum.filter(filter_fun) 
0

Można spróbować Parallel Stream.

stream = 1..10 |> ParallelStream.map(fn i -> i * 2 end) 
stream |> Enum.into([]) 
[2,4,6,8,10,12,14,16,18,20] 

UPD Albo lepsze wykorzystanie Flow