2013-03-06 22 views
6

Pracuję nad projektem z poniższym workflow:Odczekaj Collection (kolejki) blokowanie, aby zmniejszyć rozmiary w C#

Część pierwsza:

  • zdarzenia przybywa asynchronicznie i jest w kolejce w blokowaniu kolejka, zadzwonimy że Q1
  • Temat podnosi następna dostępna pozycja z tej kolejki
  • przedmiot kończy się działa {N} liczbę zadań równolegle
  • Każde zadanie kolejkuje wynik w drugiej kolejce, nazwijmy to: Q2.
  • Po zakończeniu przetwarzania elementu następny element jest odczytywany z kolejki.

Część druga:

  • inny wątek czyta off Q2 jeden przedmiot na raz i działa na wynik

Więc problem tutaj jest każdy przedmiot w pierwszej kolejce kończy się równoległa duża liczba zadań, a każde zadanie kolejkuje swój wynik. Druga kolejka musi być przetwarzana seryjnie, po jednym elemencie, i robi się zalewana.


Moje pytanie

muszę mechanizm, który pozwoli na przetwarzanie wątek Q1 czekać aż liczba elementów w Q2 jest poniżej progu określonego. Jaki jest najlepszy sposób, aby to osiągnąć? Czy istnieje sposób na rozwiązanie oparte na zdarzeniach, a nie na rozwiązaniu odpytywania?

Odpowiedz

7

Zamiast używać Queue<T>, można użyć BlockingCollection<T> dla Q2. Jeśli ustawisz jego BoundedCapacity, połączenia z Q2.Add() zostaną zablokowane po osiągnięciu pojemności. Spowoduje to automatyczną dławienie przetwarzania Q1, ponieważ zadania N zaczną blokować, jeśli nie będą mogły dodać do ostatniej kolejki.

+0

Cool; Używam już blokujących kolekcji, więc to drobna zmiana :) –

+0

Wypróbuję to i zaakceptuję, jeśli zadziała. Może to być dobra poprawka, ponieważ zatrzymałoby przetwarzanie moich wątków {N}, dopóki nie mogliby umieścić w kolejce wyników, co zatrzymałoby więcej zdarzeń od przetworzenia w pierwszej kolejności :) –

2

Zakładam, że otrzymujesz dane podczas sporadycznych powodzi, z długimi okresami suszy, podczas których Q2 może nadrobić zaległości. Czy zastanawiałeś się po prostu ograniczając liczbę współbieżnych wątków zrodzonych z Q1, wykorzystując pulę wątków o ograniczonym dostępie do tych zadań?

Podejrzewam, że można czerpać korzyści z wielu pul wątków, jeśli wielkość zadania można łatwo określić po przybyciu. Możesz mieć niewielką liczbę wątków do przetwarzania dużych zadań i dużej liczby wątków gotowych do przetwarzania małych zadań. Nawet trzecia kolejka pośrednia może być korzystna.

+0

Używamy już puli wątków. Problem polega na tym, że trudno jest ocenić, ile wątków należy użyć. Czasem otrzymujemy 80k zdarzeń, które są małe i system jest w porządku (ostatnia kolejka się nie gromadzi, ponieważ zestawy wyników są małe/wczytują się szybko do naszej rozproszonej pamięci podręcznej). Innym razem otrzymamy 100 zdarzeń, które zawiesią się, ponieważ zestawy wyników są ogromne, a przesyłanie trwa wiecznie. Ograniczenie puli sprawia, że ​​pogarsza się sytuacja, gdy zestawy wyników są niewielkie, ale rozwiązuje problem powodzi, gdy są zbyt duże. +1, chociaż jest to dobre rozwiązanie pod warunkiem, że dostarczyłem :) –

1

Twój problem wydaje się być doskonałym przykładem do rozwiązania przez bibliotekę TPL Dataflow.Jeśli są chętni, aby spróbować, oto jak to może działać (jest to bardzo prosty przykład oczywiście):

TransformBlock<int, bool> transform = new TransformBlock<int, bool>(i => i > 5 ? true : false, 
      new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); 
ActionBlock<bool> resultBlock = new ActionBlock<bool>(b => Console.WriteLine("My result is : " + b), 
      new ExecutionDataflowBlockOptions { BoundedCapacity = 10 }); 
transform.LinkTo(resultBlock); 

definiowania transformacji blok które uczynią proces transformacji (działa jako Q1), ty może ustawić poziom równoległości na liczbę zadań, które chcesz wykorzystać.

Następnie tworzysz drugi blok (działający jako Q2), który będzie miał zestaw BoundedCapacity i będzie przetwarzał każdą wiadomość synchronicznie, wywołując akcję dla każdego elementu. Ten blok może być zastąpiony przez dowolny inny, na przykład BufferBlock, który pozwala na odpytywanie z niego na żądanie.

+0

Nie mam teraz czasu na zabawę, ale wydaje mi się to interesujące; Spróbuję jeszcze raz :) Dzięki stary. –