2011-07-06 6 views
5

Mam wielu producentów i jednego konsumenta. Jeśli jednak w kolejce jest coś, co nie zostało jeszcze zużyte, producent nie powinien kolejkować go ponownie. (Unikalny ma duplikatów kolekcję, która korzysta z domyślnej kolejki jednoczesnego blokowania)sposób uzyskiwania dostępu do podstawowej domyślnej kolejki współbieżnej kolekcji blokującej

if (!myBlockingColl.Contains(item)) 
    myBlockingColl.Add(item) 

Jednak coll blokowanie nie posiada metody zawiera ani nie stanowi żadnego rodzaju trypeek() jak metody. Jak uzyskać dostęp do podstawowej równoległej kolejki, więc mogę zrobić coś podobnego do tego, co mogę zrobić, aby zrobić coś w rodzaju obrócenia końca. Proszę pomóż. dzięki

+2

Gdyby istniała taka metoda, musiałaby to być pojedyncza metoda atomowa. Nigdy nie można go zaimplementować, ponieważ twoje dwa fragmenty są spowodowane tym, że między TryPeek i Add może dodawać się wątek. Myślę, że nie masz szczęścia. Dlaczego i tak potrzebujesz takiej możliwości? –

+0

w moim przypadku jest ok, jeśli inny wątek dodaje między trypeek i dodać. W większości robię to jako kontrolę bezpieczeństwa, w której program planujący uruchamia generowanie raportów na wielu wątkach (stąd wielu producentów). Jeśli program planujący działa nieprawidłowo z dowolnego powodu i uruchamia ten sam wyzwalacz wiele razy w krótkim przedziale, po prostu chciałbym go obsłużyć. Rozumiem, że mogę obejść i poradzić sobie z tym w jakiś sposób. Wygląda na to, że nie ma eleganckiego sposobu radzenia sobie z tym. dziękuję – Gullu

Odpowiedz

1

Proponuję wykonanie operacji z blokadą, abyś nie czytał i nie zapisywał przedmiotu w sposób, który go psuje, czyniąc go atomowym. Na przykład z dowolną liczbą:

object bcLocker = new object(); 

// ... 

lock (bcLocker) 
{ 
    bool foundTheItem = false; 
    foreach (someClass nextItem in myBlockingColl) 
    { 
     if (nextItem.Equals(item)) 
     { 
      foundTheItem = true; 
      break; 
     } 
    } 
    if (foundTheItem == false) 
    { 
     // Add here 
    } 
} 
+1

zrozumieć, że można to zrobić, blokując staroświecki sposób. Ponieważ używam nowych klas sieci 4, które w większości działają dobrze bez wyraźnego blokowania, liczyłem na coś fajniejszego. dzięki – Gullu

+0

Nie ma "bardziej cool" sposób - nic nie zadziała bez blokowania. Nawet dwie wersje mogą się zderzać. –

+0

każda operacja w kolekcji, która mogłaby go zmienić wymaga ochrony za pomocą tego blokowania, co wymaga operacji na kolekcji, która mogłaby go zmienić wymaga ochrony za pomocą tej blokady. –

7

To interesujące pytanie. Po raz pierwszy widziałem, jak ktoś prosi o kolejkę blokującą, która ignoruje duplikaty. Co dziwne, nie mogłem znaleźć niczego, co chciałbyś, aby już istniało w BCL. Twierdzę, że jest to dziwne, ponieważ BlockingCollection może zaakceptować IProducerConsumerCollection jako kolekcję leżącą u podstaw, która ma metodę TryAdd, która jest reklamowana jako niezdolna do pracy po wykryciu duplikatów. Problem polega na tym, że nie widzę konkretnej implementacji IProducerConsumerCollection, która zapobiega duplikatom. Przynajmniej możemy napisać własną.

public class NoDuplicatesConcurrentQueue<T> : IProducerConsumerCollection<T> 
{ 
    // TODO: You will need to fully implement IProducerConsumerCollection. 

    private Queue<T> queue = new Queue<T>(); 

    public bool TryAdd(T item) 
    { 
    lock (queue) 
    { 
     if (!queue.Contains(item)) 
     { 
     queue.Enqueue(item); 
     return true; 
     } 
     return false; 
    } 
    } 

    public bool TryTake(out T item) 
    { 
    lock (queue) 
    { 
     item = null; 
     if (queue.Count > 0) 
     { 
     item = queue.Dequeue(); 
     } 
     return item != null; 
    } 
    } 
} 

Teraz mamy IProducerConsumerCollection że nie akceptuje duplikatów możemy użyć go tak:

public class Example 
{ 
    private BlockingCollection<object> queue = new BlockingCollection<object>(new NoDuplicatesConcurrentQueue<object>()); 

    public Example() 
    { 
    new Thread(Consume).Start(); 
    } 

    public void Produce(object item) 
    { 
    bool unique = queue.TryAdd(item); 
    } 

    private void Consume() 
    { 
    while (true) 
    { 
     object item = queue.Take(); 
    } 
    } 
} 

nie lubić moje wykonanie NoDuplicatesConcurrentQueue. Z pewnością możesz zaimplementować własne przy użyciu ConcurrentQueue lub cokolwiek, jeśli uważasz, że potrzebujesz wydajności o niskim poziomie blokady, którą zapewniają kolekcje TPL.

Aktualizacja:

udało mi się przetestować kod rano. Jest dobra wiadomość i złe wieści. Dobra wiadomość jest taka, że ​​to technicznie zadziała. Zła wiadomość jest taka, że ​​prawdopodobnie nie będziesz chciał tego robić, ponieważ BlockingCollection.TryAdd przechwytuje wartość zwracaną z podstawowej metody IProducerConsumerCollection.TryAdd i zgłasza wyjątek po wykryciu false. Tak, to prawda. Nie powraca on tak, jak można się spodziewać, a zamiast tego generuje wyjątek. Muszę być szczery, to jest zarówno zaskakujące, jak i śmieszne. Cały punkt metod TryXXX polega na tym, że nie powinni oni rzucać wyjątkami. Jestem głęboko rozczarowany.

+0

Brian, pomyśl o tym człowieku. Pozwól mi to przetrawić, a jutro zaktualizuję, jeśli znajdę odpowiedź. dzięki – Gullu

+0

Fakt, że cały pakiet IPcomponent ConsumerCollection musi zostać wdrożony, sprawia, że ​​ponownie odkrywam uczucie koła. Musi być lepszy sposób. Przegłosowałem twoje rozwiązanie, ponieważ mogę skończyć robiąc coś podobnego. dziękuję – Gullu

+0

@Gullu: Właśnie zaktualizowałem swoją odpowiedź. Nie sądzę, że i tak będziesz chciał skorzystać z tej strategii. Przeczytaj moją aktualizację. –

4

Oprócz zastrzeżeniem Brian Gedeon wspomnianego po Aktualizacja roztwór cierpi z tych problemów z wydajnością:

  • O (n) operacje w kolejce (queue.Contains(item)) mają poważny wpływ na osiągi jak kolejka rośnie
  • zamki ograniczyć współbieżności (której nie wspomina)

Poniższy kod poprawia Bri An roztworu przez

  • użyciu zestawu mieszania zrobić O (1) wyszukiwań
  • łączące 2 struktury danych nazw System.Collections.Concurrent

N.B. Ponieważ nie ma ConcurrentHashSet, używam ConcurrentDictionary, ignorując wartości.

W tym rzadkim przypadku można na szczęście skomponować bardziej złożoną, współbieżną strukturę danych z wielu prostszych, bez dodawania blokad. Kolejność operacji na dwóch współbieżnych strukturach danych jest tutaj ważna.

public class NoDuplicatesConcurrentQueue<T> : IProducerConsumerCollection<T> 
{ 
    private readonly ConcurrentDictionary<T, bool> existingElements = new ConcurrentDictionary<T, bool>(); 
    private readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>(); 

    public bool TryAdd(T item) 
    { 
     if (existingElements.TryAdd(item, false)) 
     { 
      queue.Enqueue(item); 
      return true; 
     } 
     return false; 
    } 

    public bool TryTake(out T item) 
    { 
     if (queue.TryDequeue(out item)) 
     { 
      bool _; 
      existingElements.TryRemove(item, out _); 
      return true; 
     } 
     return false; 
    } 
    ... 
} 

N.B. Inny sposób patrzenia na ten problem: chcesz, aby był zbiorem zachowującym zamówienie reklamowe.