2011-10-23 19 views
23

Zastanawiam się, czy istnieje implementacja/opakowanie dla ConcurrentQueue, podobne do BlockingCollection, gdzie pobieranie z kolekcji nie blokuje, ale jest zamiast tego asynchroniczne i spowoduje asynchronizację, aż element zostanie umieszczony w kolejka.godna uwagi Kolejka na podstawie zadania

Wymyśliłem własną implementację, ale wygląda na to, że nie działa zgodnie z oczekiwaniami. Zastanawiam się, czy wymyślam coś, co już istnieje.

Oto moja realizacja:

public class MessageQueue<T> 
{ 
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>(); 

    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
     new ConcurrentQueue<TaskCompletionSource<T>>(); 

    object queueSyncLock = new object(); 

    public void Enqueue(T item) 
    { 
     queue.Enqueue(item); 
     ProcessQueues(); 
    } 

    public async Task<T> Dequeue() 
    { 
     TaskCompletionSource<T> tcs = new TaskCompletionSource<T>(); 
     waitingQueue.Enqueue(tcs); 
     ProcessQueues(); 
     return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task; 
    } 

    private void ProcessQueues() 
    { 
     TaskCompletionSource<T> tcs=null; 
     T firstItem=default(T); 
     while (true) 
     { 
      bool ok; 
      lock (queueSyncLock) 
      { 
       ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem); 
       if (ok) 
       { 
        waitingQueue.TryDequeue(out tcs); 
        queue.TryDequeue(out firstItem); 
       } 
      } 
      if (!ok) break; 
      tcs.SetResult(firstItem); 
     } 
    } 
} 
+0

oh fuj .... ...... –

+21

@AdamSack: rzeczywiście, ale twój komentarz mi nie pomoże. – spender

Odpowiedz

36

nie wiem Rozwiązanie bez blokady, ale możesz rzucić okiem na nową Dataflow library, część Async CTP. Prosty BufferBlock<T> powinno wystarczyć, np .:

BufferBlock<int> buffer = new BufferBlock<int>(); 

Produkcja i konsumpcja są najłatwiej zrobić za pomocą metod rozszerzenie o typach przepływu danych pakietowych.

Produkcja jest tak proste, jak:

buffer.Post(13); 

i konsumpcja jest asynchroniczny-ready:

int item = await buffer.ReceiveAsync(); 

ja polecam użyć przepływ danych, jeśli to możliwe; sprawienie, by taki bufor, zarówno sprawny, jak i poprawny, był trudniejszy, niż się wydaje.

+0

Wygląda to bardzo obiecująco ... sprawdzi to jutro. Dzięki. Wygląda bardzo podobnie do portu CCR. – spender

+2

Zerknąłem przed snem! Wygląda na to, że Dataflow bardzo ładnie pasuje do moich potrzeb. Wydaje się, że wypełnia lukę między tym, co oferuje TPL, a tym, co oferuje CCR (z którego korzystałem do wielkiego sukcesu). Daje mi to poczucie, że znakomita praca w CCR nie została zmarnowana. To jest właściwa odpowiedź (i coś błyszczącego i nowego, by zatopić moje zęby!) Dzięki @StephenCleary. – spender

1

To może być przesadą dla przypadku użycia (podane krzywa uczenia się), ale Reactive Extentions zapewnia cały klej można kiedykolwiek chcesz do składu asynchronicznym.

Zasadniczo subskrybujesz zmiany i są one przekazywane do Ciebie, gdy tylko będą dostępne, a Ty możesz spowodować, że system wprowadzi zmiany w osobnym wątku.

+0

Jestem co najmniej częściowo zorientowany w Reactive, ale jest trochę ezoteryczny, aby użyć go w produkcji, ponieważ inni mogą mieć do utrzymania kodu.Naprawdę przekopuję prostotę, którą asynchronizacja/oczekiwanie przynosi do wcześniej bardzo skomplikowanego produktu serwerowego, i staram się utrzymywać całą technologię asynchroniczną w jednej technologii. – spender

-1

mógłby po prostu użyć BlockingCollection (domyślny ConcurrentQueue) i zawinąć wywołanie Take w Task więc można go await:

var bc = new BlockingCollection<T>(); 

T element = await Task.Run(() => bc.Take()); 
+4

Niezły pomysł, ale nie jestem zadowolony z blokowania. Zamierzam mieć kilka tysięcy klientów, każdy z własną kolejką wiadomości. Każde zablokowanie zatopi statek, ponieważ będzie wiązać nici nie robiąc nic. Powodem, dla którego chcę niezapewniającego zadania, jest zachowanie wszystkich operacji w wątku bez powodowania głodu wątków. – spender

0

Oto realizacja obecnie używam.

public class MessageQueue<T> 
{ 
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>(); 
    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
     new ConcurrentQueue<TaskCompletionSource<T>>(); 
    object queueSyncLock = new object(); 
    public void Enqueue(T item) 
    { 
     queue.Enqueue(item); 
     ProcessQueues(); 
    } 

    public async Task<T> DequeueAsync(CancellationToken ct) 
    { 
     TaskCompletionSource<T> tcs = new TaskCompletionSource<T>(); 
     ct.Register(() => 
     { 
      lock (queueSyncLock) 
      { 
       tcs.TrySetCanceled(); 
      } 
     }); 
     waitingQueue.Enqueue(tcs); 
     ProcessQueues(); 
     return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task; 
    } 

    private void ProcessQueues() 
    { 
     TaskCompletionSource<T> tcs = null; 
     T firstItem = default(T); 
     lock (queueSyncLock) 
     { 
      while (true) 
      { 
       if (waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem)) 
       { 
        waitingQueue.TryDequeue(out tcs); 
        if (tcs.Task.IsCanceled) 
        { 
         continue; 
        } 
        queue.TryDequeue(out firstItem); 
       } 
       else 
       { 
        break; 
       } 
       tcs.SetResult(firstItem); 
      } 
     } 
    } 
} 

To działa dość dobrze, ale nie ma dość dużo niezgody na queueSyncLock, jak robię sporo użytkowania CancellationToken anulować niektóre z zadań oczekujących. Oczywiście, prowadzi to do znacznie mniej blokując chciałbym zobaczyć z BlockingCollection ale ...

Zastanawiam się, czy jest gładsza, zablokować wolne środki osiągnięcia tego samego celu

2

Moja atempt (to miało zdarzenie podniesione kiedy „obietnica” jest tworzony i może być stosowany przez producenta zewnętrznego wiedzieć, kiedy należy produkować więcej elementów):

public class AsyncQueue<T> 
{ 
    private ConcurrentQueue<T> _bufferQueue; 
    private ConcurrentQueue<TaskCompletionSource<T>> _promisesQueue; 
    private object _syncRoot = new object(); 

    public AsyncQueue() 
    { 
     _bufferQueue = new ConcurrentQueue<T>(); 
     _promisesQueue = new ConcurrentQueue<TaskCompletionSource<T>>(); 
    } 

    /// <summary> 
    /// Enqueues the specified item. 
    /// </summary> 
    /// <param name="item">The item.</param> 
    public void Enqueue(T item) 
    { 
     TaskCompletionSource<T> promise; 
     do 
     { 
      if (_promisesQueue.TryDequeue(out promise) && 
       !promise.Task.IsCanceled && 
       promise.TrySetResult(item)) 
      { 
       return;          
      } 
     } 
     while (promise != null); 

     lock (_syncRoot) 
     { 
      if (_promisesQueue.TryDequeue(out promise) && 
       !promise.Task.IsCanceled && 
       promise.TrySetResult(item)) 
      { 
       return; 
      } 

      _bufferQueue.Enqueue(item); 
     }    
    } 

    /// <summary> 
    /// Dequeues the asynchronous. 
    /// </summary> 
    /// <param name="cancellationToken">The cancellation token.</param> 
    /// <returns></returns> 
    public Task<T> DequeueAsync(CancellationToken cancellationToken) 
    { 
     T item; 

     if (!_bufferQueue.TryDequeue(out item)) 
     { 
      lock (_syncRoot) 
      { 
       if (!_bufferQueue.TryDequeue(out item)) 
       { 
        var promise = new TaskCompletionSource<T>(); 
        cancellationToken.Register(() => promise.TrySetCanceled()); 

        _promisesQueue.Enqueue(promise); 
        this.PromiseAdded.RaiseEvent(this, EventArgs.Empty); 

        return promise.Task; 
       } 
      } 
     } 

     return Task.FromResult(item); 
    } 

    /// <summary> 
    /// Gets a value indicating whether this instance has promises. 
    /// </summary> 
    /// <value> 
    /// <c>true</c> if this instance has promises; otherwise, <c>false</c>. 
    /// </value> 
    public bool HasPromises 
    { 
     get { return _promisesQueue.Where(p => !p.Task.IsCanceled).Count() > 0; } 
    } 

    /// <summary> 
    /// Occurs when a new promise 
    /// is generated by the queue 
    /// </summary> 
    public event EventHandler PromiseAdded; 
} 
+0

Myślę, że to najlepsze rozwiązanie. Wdrożyłem to i przetestowałem go szeroko. Kilka uwag: wezwanie do! Promise.Task.IsCanceled jest niepotrzebne. Dodałem instrukcję ManualResetEventSlim, aby śledzić, kiedy właściwość bufferQueue jest pusta, aby wywołujący mógł zablokować oczekiwanie na opróżnienie kolejki. –

+0

Ty [powinieneś się pozbyć] (http://stackoverflow.com/a/21653382/298609) 'CancellationTokenRegistration', które otrzymałeś z wywołania' cancellationToken.Register'. – Paya