2009-02-07 13 views
7

Czy istnieje sposób wykonywania iteracji stylu na równoległych wyliczeniach w języku C#? W przypadku list pomocniczych wiem, że można użyć zwykłej pętli for iterującej wartość indeksu w stosunku do zakresu indeksu, ale naprawdę wolę foreach naz wielu powodów.Równoległa iteracja w języku C#?

punkty, jeśli to działa w C# 2.0

+0

Wouldn” t pętla for powinna być prostszym, krótszym, czytelnym rozwiązaniem zamiast odpowiedzi Combine poniżej? Jakie są twoje powody, dla których preferujesz foreach w tym przypadku? – Gishu

+0

Równoległa iteracja właśnie pojawiła się w Rubim 1.9, więc założyłbym się, że nie jest w C# od teraz ... LISP miał to jednak :) – Gishu

+0

Nie jestem pewien, czy rozumiem pytanie poprawnie. Czy próbujesz iterować równolegle do wielu wyliczanek, czy też próbujesz wykonać pętlę nad jednym przeliczalnym, przetwarzając różne elementy równolegle? –

Odpowiedz

9

krótka odpowiedź, nie. foreach działa tylko na jednym przeliczalnym naraz.

Jednakże, jeśli połączysz swoje równoległe przeliczniki w jedną, możesz połączyć ze sobą foreach. Nie jestem świadom jakiś prosty, zbudowany w metodzie robi to, ale dodaje powinien działać (choć nie testowałem go):

public IEnumerable<TSource[]> Combine<TSource>(params object[] sources) 
{ 
    foreach(var o in sources) 
    { 
     // Choose your own exception 
     if(!(o is IEnumerable<TSource>)) throw new Exception(); 
    } 

    var enums = 
     sources.Select(s => ((IEnumerable<TSource>)s).GetEnumerator()) 
     .ToArray(); 

    while(enums.All(e => e.MoveNext())) 
    { 
     yield return enums.Select(e => e.Current).ToArray(); 
    } 
} 

Następnie można foreach nad powrócił przeliczalny:

foreach(var v in Combine(en1, en2, en3)) 
{ 
    // Remembering that v is an array of the type contained in en1, 
    // en2 and en3. 
} 
+0

Dlaczego jako typ parametru wybrano obiekt [] zamiast IEnumerable ? Wyeliminowałoby to wyjątek, nie? – mafu

+0

Istnieje co najmniej jedna wersja języka C#, która nie skompiluje params z czymkolwiek innym niż object []. Biorąc pod uwagę, że ta odpowiedź ma pięć lat, domyślam się, że jedyną wersją, której użyłem w tym momencie, byłaby taka, która nie działałaby z 'parametrami IEnumerable [] sources'. W dzisiejszych czasach korzystałem oczywiście z bardziej wyraźnego pisania. Prawdopodobnie użyłbym również metody "IEnumerable .Zip' dla dwóch enumerables - dość pewny, że to również nie istniało pięć lat temu. – Zooba

0

Czy ta praca jest dla Ciebie?

public static class Parallel 
{ 
    public static void ForEach<T>(IEnumerable<T>[] sources, 
            Action<T> action) 
    { 
     foreach (var enumerable in sources) 
     { 
      ThreadPool.QueueUserWorkItem(source => { 
       foreach (var item in (IEnumerable<T>)source) 
        action(item); 
      }, enumerable); 
     } 
    } 
} 

// sample usage: 
static void Main() 
{ 
    string[] s1 = { "1", "2", "3" }; 
    string[] s2 = { "4", "5", "6" }; 
    IEnumerable<string>[] sources = { s1, s2 }; 
    Parallel.ForEach(sources, s => Console.WriteLine(s)); 
    Thread.Sleep(0); // allow background threads to work 
} 

Dla języka C# 2.0 należy przekonwertować wyrażenia lambda powyżej na delegatów.

Uwaga: Ta metoda narzędziowa wykorzystuje wątki w tle. Możesz go zmodyfikować, aby używał wątków pierwszoplanowych i prawdopodobnie będziesz chciał poczekać, aż wszystkie wątki zakończą się. Jeśli to zrobisz, proponuję utworzyć wątki sources.Length - 1 i użyć bieżącego wątku wykonawczego dla ostatniego (lub pierwszego) źródła.

(Chciałabym to czeka na wątków do końca w moim kodu, ale przykro mi, że nie wiem, jak to zrobić jeszcze to. Myślę, że należy użyć WaitHandleThread.Join()).

11

. BlockingCollection .NET 4 to całkiem proste. Utwórz BlockingCollection, zwróć jego .GetConsumingEnumerable() w metodzie przeliczalnej. Następnie foreach po prostu dodaje się do kolekcji blokującej.

E.g.

private BlockingCollection<T> m_data = new BlockingCollection<T>(); 

public IEnumerable<T> GetData(IEnumerable<IEnumerable<T>> sources) 
{ 
    Task.Factory.StartNew(() => ParallelGetData(sources)); 
    return m_data.GetConsumingEnumerable(); 
} 

private void ParallelGetData(IEnumerable<IEnumerable<T>> sources) 
{ 
    foreach(var source in sources) 
    { 
     foreach(var item in source) 
     { 
      m_data.Add(item); 
     }; 
    } 

    //Adding complete, the enumeration can stop now 
    m_data.CompleteAdding(); 
} 

Mam nadzieję, że to pomoże. BTW posted a blog about this ostatniej nocy

Andre

+1

To powinno być oznaczone jako poprawne teraz, gdy .net 4.0 jest obecnie niedostępny –

+0

Nie odpowiada to na pytanie, które dotyczy wyliczania wielu wyliczników łącznie. –

+0

Masz całkowitą rację, Arturze, naprawię to – Andre

3

pisałem implementację EachParallel() z biblioteki .NET4 Parallel. Jest kompatybilny z .NET 3.5: Parallel ForEach Loop in C# 3.5 Zastosowanie:

string[] names = { "cartman", "stan", "kenny", "kyle" }; 
names.EachParallel(name => 
{ 
    try 
    { 
     Console.WriteLine(name); 
    } 
    catch { /* handle exception */ } 
}); 

Realizacja:

/// <summary> 
/// Enumerates through each item in a list in parallel 
/// </summary> 
public static void EachParallel<T>(this IEnumerable<T> list, Action<T> action) 
{ 
    // enumerate the list so it can't change during execution 
    list = list.ToArray(); 
    var count = list.Count(); 

    if (count == 0) 
    { 
     return; 
    } 
    else if (count == 1) 
    { 
     // if there's only one element, just execute it 
     action(list.First()); 
    } 
    else 
    { 
     // Launch each method in it's own thread 
     const int MaxHandles = 64; 
     for (var offset = 0; offset < list.Count()/MaxHandles; offset++) 
     { 
      // break up the list into 64-item chunks because of a limitiation    // in WaitHandle 
      var chunk = list.Skip(offset * MaxHandles).Take(MaxHandles); 

      // Initialize the reset events to keep track of completed threads 
      var resetEvents = new ManualResetEvent[chunk.Count()]; 

      // spawn a thread for each item in the chunk 
      int i = 0; 
      foreach (var item in chunk) 
      { 
       resetEvents[i] = new ManualResetEvent(false); 
       ThreadPool.QueueUserWorkItem(new WaitCallback((object data) => 
       { 
        int methodIndex = (int)((object[])data)[0]; 

        // Execute the method and pass in the enumerated item 
        action((T)((object[])data)[1]); 

        // Tell the calling thread that we're done 
        resetEvents[methodIndex].Set(); 
       }), new object[] { i, item }); 
       i++; 
      } 

      // Wait for all threads to execute 
      WaitHandle.WaitAll(resetEvents); 
     } 
    } 
} 
1

Jeśli chcesz trzymać się podstaw - I przepisał obecnie akceptowana odpowiedź w prostszy sposób:

public static IEnumerable<TSource[]> Combine<TSource> (this IEnumerable<IEnumerable<TSource>> sources) 
    { 
     var enums = sources 
      .Select (s => s.GetEnumerator()) 
      .ToArray(); 

     while (enums.All (e => e.MoveNext())) { 
      yield return enums.Select (e => e.Current).ToArray(); 
     } 
    } 

    public static IEnumerable<TSource[]> Combine<TSource> (params IEnumerable<TSource>[] sources) 
    { 
     return sources.Combine(); 
    }