EDIT: Okazuje się, że bardzo się myliłem. TransformBlock
czy zwraca elementy w takiej samej kolejności, w jakiej zostały wprowadzone, nawet jeśli jest skonfigurowany do równoległości. Z tego powodu kod w mojej oryginalnej odpowiedzi jest zupełnie bezużyteczny i zamiast niego można użyć normalnego TransformBlock
.
Oryginalny odpowiedź:
O ile wiem, tylko jeden konstrukt równoległość w NET obsługuje zawracania przetworzonych elementów w kolejności weszli: PLINQ z AsOrdered()
. Wydaje mi się jednak, że PLINQ nie pasuje do tego, co chcesz.
Z drugiej strony, TPL Dataflow dobrze pasuje, ale nie ma bloku, który wspierałby równoległość i zwrot przedmiotów w tym samym czasie (TransformBlock
obsługuje oba, ale nie ma w tym samym czasie). Na szczęście bloki Dataflow zostały zaprojektowane z myślą o kompozycyjności, więc możemy zbudować własny blok, który to robi.
Najpierw jednak musimy wymyślić, jak zamówić wyniki. Używanie współbieżnego słownika, takiego jak sugerowałeś, wraz z pewnym mechanizmem synchronizacji, z pewnością zadziała. Ale myślę, że istnieje prostsze rozwiązanie: użyj kolejki o numerach Task
. W zadaniu wyjściowym usuwamy kolejkę Task
, czekając na zakończenie (asynchronicznie), a kiedy to nastąpi, wysyłamy jej wynik.Nadal potrzebujemy jakiejś synchronizacji dla przypadku, gdy kolejka jest pusta, ale możemy ją pobrać za darmo, jeśli wybierzemy kolejkę do sprytnego użycia.
Ogólny pomysł jest taki: to, co piszemy, to IPropagatorBlock
, z pewnymi danymi wejściowymi i niektórymi danymi wyjściowymi. Najprostszym sposobem utworzenia niestandardowego IPropagatorBlock
jest utworzenie jednego bloku, który przetwarza dane wejściowe, innego bloku, który generuje wyniki i traktuje je jako jeden przy użyciu DataflowBlock.Encapsulate()
.
Blok wejściowy będzie musiał przetwarzać przychodzące elementy we właściwej kolejności, więc nie będzie tam równoległości. Stworzy nowy Task
(w rzeczywistości, , abyśmy mogli później ustawić wynik Task
), dodać go do kolejki, a następnie wysłać element do przetworzenia, wraz z pewnym sposobem, aby ustawić wynik prawidłowego Task
. Ponieważ nie musimy łączyć tego bloku z niczym, możemy użyć numeru ActionBlock
.
Blok wyjściowy będzie musiał pobrać Task
s z kolejki, asynchronicznie czekać na nie, a następnie wysłać je razem. Ale ponieważ wszystkie bloki mają osadzoną w nich kolejkę, a bloki, w których delegaci mają wbudowane wbudowane czekanie asynchroniczne, będzie to bardzo proste: new TransformBlock<Task<TOutput>, TOutput>(t => t)
. Ten blok będzie działać zarówno jako kolejka, jak i jako blok wyjściowy. Z tego powodu nie musimy zajmować się żadną synchronizacją.
Ostatni element układanki przetwarza elementy równolegle. W tym celu możemy użyć innego ActionBlock
, tym razem z zestawem MaxDegreeOfParallelism
. Przejmie dane wejściowe, przetworzy je i ustawi prawidłową wartość Task
w kolejce.
ułożyła, może to wyglądać tak:
public static IPropagatorBlock<TInput, TOutput>
CreateConcurrentOrderedTransformBlock<TInput, TOutput>(
Func<TInput, TOutput> transform)
{
var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t);
var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
tuple => tuple.Item2(transform(tuple.Item1)),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var enqueuer = new ActionBlock<TInput>(
async item =>
{
var tcs = new TaskCompletionSource<TOutput>();
await processor.SendAsync(
new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult));
await queue.SendAsync(tcs.Task);
});
enqueuer.Completion.ContinueWith(
_ =>
{
queue.Complete();
processor.Complete();
});
return DataflowBlock.Encapsulate(enqueuer, queue);
}
Po tak dużo mówi, że to dość mała ilość kodu, myślę.
Wygląda na to, że bardzo zależy Ci na wydajności, więc możesz potrzebować precyzyjniej dostroić ten kod. Na przykład może być sens ustawienie MaxDegreeOfParallelism
bloku processor
na coś podobnego do Environment.ProcessorCount
, aby uniknąć nadmiernej liczby subskrypcji. Ponadto, jeśli opóźnienie jest ważniejsze niż przepustowość, może być sens, aby ustawić MaxMessagesPerTask
tego samego bloku na 1 (lub inną małą liczbę), aby po zakończeniu przetwarzania elementu natychmiast wysłać go do wyjścia.
Ponadto, jeśli chcesz dławić przychodzące przedmioty, możesz ustawić BoundedCapacity
z enqueuer
.
Wow całkiem sporo gadżetów, które najpierw chciałbym przetrawić i wypróbować. Wielkie dzięki za to, przynajmniej zasługiwał na upominek ;-) Pozwól mi zagrać z tymi pomysłami i wrócę. Kolejkowanie zadań ma sens i zastanawiam się, dlaczego wcześniej tego nie zrobiłem. –
OK Spędzam trochę czasu przeglądając twój post i czytając o TPL Dataflow, tutaj kilka pytań, aby w pełni zrozumieć twoje proponowane rozwiązanie: (1) dlaczego sugerujesz niestandardowy IPropagatorBlock i IDataflowBlock.Encapsulate(), czy Transformblock już istnieje? (2) Nie widzę, jak planujesz połączyć bloki. Najpierw rozmawiasz w ActionBlocks o TransformBlocks. Z tego co przeczytałem, czy ActionBlock nie będzie "punktem końcowym" całej architektury? –
1. Zostało to wyjaśnione w drugim akapicie: 'TransformBlock' nie może przetwarzać elementów równolegle i zwracać je w kolejności w tym samym czasie. Może wykonać jedną z nich, ale nie obie. – svick