2013-02-11 23 views
9

Chciałbym użyć iteratora .NET z równoległymi zadaniami/poczekać ?. Coś takiego:Jak uzyskiwać z zadań równoległych w .NET 4.5

IEnumerable<TDst> Foo<TSrc, TDest>(IEnumerable<TSrc> source) 
{ 
    Parallel.ForEach(
     source, 
     s=> 
     { 
      // Ordering is NOT important 
      // items can be yielded as soon as they are done     
      yield return ExecuteOrDownloadSomething(s); 
     } 
} 

Niestety .NET nie może natywnie sobie z tym poradzić. Najlepsza odpowiedź do tej pory przez @svick - użyj AsParallel().

BONUS: Każdy prosty kod asynchroniczny/oczekujący, który implementuje wielu wydawców i jednego subskrybenta? Subskrybent wyda, a puby będą przetwarzać. (tylko biblioteki rdzeniowe)

Odpowiedz

11

To wydaje się zadanie dla PLINQ:

return source.AsParallel().Select(s => ExecuteOrDownloadSomething(s)); 

Spowoduje to wykonanie delegata równolegle przy użyciu ograniczonej liczby wątków, zwracając każdy wynik zaraz po jego zakończeniu.

Jeśli metoda ExecuteOrDownloadSomething() IO jest związany (np faktycznie pobiera coś) i nie chcesz tracić wątki, a następnie za pomocą async - await może mieć sens, ale byłoby bardziej skomplikowane.

Jeśli chcesz w pełni skorzystać z async, nie powinieneś zwracać numeru IEnumerable, ponieważ jest on synchroniczny (tzn. Blokuje, jeśli nie są dostępne żadne elementy). Co jest potrzebne jest jakieś kolekcji asynchronicznym, a można użyć ISourceBlock (konkretnie TransformBlock) z OC dataflow za to:

ISourceBlock<TDst> Foo<TSrc, TDest>(IEnumerable<TSrc> source) 
{ 
    var block = new TransformBlock<TSrc, TDest>(
     async s => await ExecuteOrDownloadSomethingAsync(s), 
     new ExecutionDataflowBlockOptions 
     { 
      MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded 
     }); 

    foreach (var item in source) 
     block.Post(item); 

    block.Complete(); 

    return block; 
} 

Jeśli źródłem jest „wolna” (tj chcesz rozpocząć przetwarzanie wyników Foo() przed zakończeniem powtarzania source), możesz chcieć przenieść wywołanie foreach i Complete() do osobnego Task. Jeszcze lepszym rozwiązaniem byłoby wprowadzenie source do ISourceBlock<TSrc>.

+0

Dzięki, ale czy mógłbyś podać przykład tego, jak można to rozwiązać za pomocą async/wait? Dzięki! – Yurik

+0

@Yurik Czy mógłbyś wyjaśnić, dlaczego tego chcesz? – svick

+0

Głównie dlatego, że uważam, że pomoże mi to zrozumieć nową, oczekującą składnię problemu, który nie jest "async 101", ale scenariusz z prawdziwego świata. – Yurik

0

W bibliotece asynchronicznej wykonanej przez zespół robotyki MS, posiadały prymitywy współbieżności, które pozwoliły na użycie iteratora do wygenerowania kodu asynchronicznego.

Biblioteka (CCR) jest bezpłatna (nie była wolna). Miły artykuł wprowadzający można znaleźć tutaj: Concurrent affairs

Być może można użyć tej biblioteki obok biblioteki .NET zadania, lub będzie ona inspiracją do „roll własne”

+0

mógłbyś wyjaśnić jak dokładnie należy użyć CCR tutaj? – svick

+0

Cytowany przeze mnie artykuł może wyjaśnić to lepiej niż ja.Jeśli spojrzysz na to i sprawdzisz liczbę: "Rysunek 6 SerialAsyncDemo" ma przykład kodu, który jest prawie dokładnie taki, o jaki pyta OP: Operacje asynchroniczne z wykorzystaniem iteratora .Net do uzyskania. Przyznam, że myślę, że ta składnia iteratora, choć sprytna jak na tamte czasy, jest teraz w większości zastąpiona składnią async/await – Toad

1

Wygląda na to, że naprawdę chcesz, aby zamówić sekwencję zadań w oparciu o czas ukończenia. To nie jest strasznie skomplikowane:

public static IEnumerable<Task<T>> Order<T>(this IEnumerable<Task<T>> tasks) 
{ 
    var input = tasks.ToList(); 

    var output = input.Select(task => new TaskCompletionSource<T>()); 
    var collection = new BlockingCollection<TaskCompletionSource<T>>(); 
    foreach (var tcs in output) 
     collection.Add(tcs); 

    foreach (var task in input) 
    { 
     task.ContinueWith(t => 
     { 
      var tcs = collection.Take(); 
      switch (task.Status) 
      { 
       case TaskStatus.Canceled: 
        tcs.TrySetCanceled(); 
        break; 
       case TaskStatus.Faulted: 
        tcs.TrySetException(task.Exception.InnerExceptions); 
        break; 
       case TaskStatus.RanToCompletion: 
        tcs.TrySetResult(task.Result); 
        break; 
      } 
     } 
     , CancellationToken.None 
     , TaskContinuationOptions.ExecuteSynchronously 
     , TaskScheduler.Default); 
    } 

    return output.Select(tcs => tcs.Task); 
} 

Więc tworzymy TaskCompletionSource dla każdego zadania wejściowego, a następnie przejść przez każde z zadań i ustaw kontynuację który chwyta następną źródło uzupełniania z BlockingCollection i ustawia to wynik. Pierwsze ukończone zadanie pobiera pierwsze zwrócone tcs, drugie wykonane zadanie otrzymuje drugie zwrócone tcs, i tak dalej.

Teraz kod staje się dość prosta:

var tasks = collection.Select(item => LongRunningOperationThatReturnsTask(item)) 
    .Order(); 
foreach(var task in tasks) 
{ 
    var result = task.Result;//or you could `await` each result 
    //.... 
} 
+0

Dzięki, ale to, czego potrzebuję, to uzyskać strumień przetworzonych obiektów jako plony z metody. To, co oferujesz, to w zasadzie przeróbka Parallel.ForEach(). – Yurik

+0

@Yurik Jeśli nie musisz czekać na wszystkie elementy, możesz usunąć "WhenAll' /' WaitAll', ale poza tym nie widzę jak 'Select' nie robi tego, czego potrzebujesz sama w sobie. Masz sekwencję przedmiotów i chcesz przekształcić ją w sekwencję zadań, po jednym dla każdego przedmiotu. W jaki sposób 'Select (item => LongRunningOperation (item))' nie spełnia twoich potrzeb, ponieważ zwraca sekwencję zadań? – Servy

+0

W takim przypadku kolejność elementów będzie taka sama, jak oryginał, co może być nieefektywne. Nie mam nic przeciwko wydaniu itemów z porządku. – Yurik