2012-04-11 6 views
7

muszę jednego producenta, zapytania FIFO pojedynczego konsumenta, ponieważczy warto używać BlockingCollection <T> jako pojedynczego producenta, pojedynczego klienta FIFO?

  • muszę przetwarzać komunikaty w kolejności ich otrzymania.
  • Muszę to zrobić asynchronicznie, ponieważ program wywołujący nie powinien czekać na przetwarzanie wiadomości.
  • Kolejne przetwarzanie komunikatów powinno zostać uruchomione tylko po zakończeniu przetwarzania poprzedniego komunikatu. Czasami częstotliwość "odbierania" wiadomości jest wyższa niż częstotliwość "przetwarzania" wiadomości. Ale średnio powinienem być w stanie przetworzyć wszystkie wiadomości, tylko czasami muszę "umieścić w kolejce" ich paczkę.

Więc to całkiem jak TCP/IP, jak sądzę, w którym masz jednego producenta i jednego odbiornika, czasami można odbierać wiadomości szybciej niż można przetworzyć, więc trzeba ich zapytać. Gdzie zamówienie JEST ważne i gdzie dzwoniący absolutnie nie interesuje się tym, co robisz z tymi rzeczami.

Brzmi to dość proste i prawdopodobnie można używać ogólnego Queue za to, ale chcę używać BlockingCollection za to, bo nie chcę pisać żadnego kodu z ManualResetEvent itp

Jak nadaje BlockingCollection dla mojego zadania i prawdopodobnie możesz zaproponować coś innego?

Odpowiedz

11

BlockingCollection klasy implementuje interfejs IProducerConsumerCollection, więc idealnie pasuje do Twoich wymagań.

Można utworzyć dwa zadania: jeden dla asynchronicznego producenta i drugi jako konsumenta. Dawni dodawali przedmioty do BlockingCollection, a te ostatnie zużywają, gdy tylko nowe będą dostępne w kolejności FIFO.

zastosowanie przez producentów konsument Przykład zastosowania TPL Tasks i BlockingCollection:

class ProducerConsumer 
{ 
    private static BlockingCollection<string> queue = new BlockingCollection<string>(); 

    static void Main(string[] args) 
    { 
     Start(); 
    } 

    public static void Start() 
    { 
     var producerWorker = Task.Factory.StartNew(() => RunProducer()); 
     var consumerWorker = Task.Factory.StartNew(() => RunConsumer()); 

     Task.WaitAll(producerWorker, consumerWorker); 
    } 

    private static void RunProducer() 
    { 
     int itemsCount = 100; 

     while (itemsCount-- > 0) 
     { 
      queue.Add(itemsCount + " - " + Guid.NewGuid().ToString()); 
      Thread.Sleep(250); 
     } 
    } 

    private static void RunConsumer() 
    { 
     foreach (var item in queue.GetConsumingEnumerable()) 
     { 
      Console.WriteLine(DateTime.Now.ToString("HH:mm:ss.ffff") + " | " + item); 
     } 
    } 
} 

IProducerConsumerCollection:

definiuje metody manipulowania zbiory bezpieczne wątek przeznaczone do producenta/użytku konsumenckiego. Ten interfejs zapewnia ujednoliconą reprezentację producenta/konsumenta, tak że wyższe poziomy abstrakcji, takie jak System.Collections.Concurrent.BlockingCollection (Of T), mogą wykorzystywać kolekcję jako podstawowy mechanizm przechowywania.

+0

Mam nadzieję, że domyślna implementacja" BlockingCollection "FIFO nie zostanie zmieniona w niektórych następnych wydaniach , ale to jest kolejne pytanie ... – javapowered

+0

Uważam, że powinieneś być w porządku, o ile odwołujesz się do interfejsu 'IP produkcerConsumerCollection', a implementacja klas BCL może: t zmienić w taki główny sposób, jak zamówienie FIFO na coś innego – sll

+2

Widzę, że metoda Start() czeka na dwa zadania do wykonania. Oczywiście zadanie "producerWorker" zakończy się, ale 'consumerWorker' nigdy się nie skończy. Czy należy to wziąć pod uwagę? –

0

Ponieważ jest to kolejka, której potrzebujesz, dlaczego nie trzymać się kolejki? Możesz użyć numeru Syncrhonized Queue.

+0

Nie blokuje. –

+0

Martin prawy, [MSDN] (http://msdn.microsoft.com/en-us/library/system.collections.queue.synchronized.aspx): "Wyliczanie za pomocą kolekcji nie jest samoistnie procedurą bezpieczną dla wątków. Nawet jeśli kolekcja jest zsynchronizowana, inne wątki nadal mogą modyfikować kolekcję, co powoduje, że moduł wyliczający generuje wyjątek. Aby zagwarantować bezpieczeństwo wątków podczas wyliczania, możesz albo zablokować kolekcję podczas całego wyliczania, albo przechwycić wyjątki wynikające ze zmian wprowadzonych przez inny wątek ' – sll