10

Prowadzę dość typowy model producenta/konsumenta przy różnych zadaniach.Czy to jest zadanie dla TPL Dataflow?

Zadanie 1: Odczytuje partie bajtu [] z plików binarnych i uruchamia nowe zadanie dla każdej kolekcji tablic bajtowych. (operacja jest grupowana dla celów zarządzania pamięcią).

Zadanie 2-n: To są zadania robocze i każdy działa na zbiorze przekazywanych (z Zadania 1) macierzy bajtowych i dekantruje tablice bajtów, sortuje je według określonych kryteriów, a następnie przechowuje kolekcję wynikowych obiektów (każda tablica bajtów przekształca się w taki obiekt) w Słowniku Współbieżnym.

Zadanie (n + 1) Wybrałem słownik współbieżny, ponieważ zadaniem tego zadania jest scalenie kolekcji przechowywanych w słownika równoczesnym w tej samej kolejności, w jakiej zostały utworzone z Zadania 1. Osiągam to, przekazując identyfikator collectionID (jest on typu int i inkrementowany dla każdej nowej kolekcji w ramach Task1) do końca z Task1 do tego zadania. Zadanie to w zasadzie sprawdza, czy następny oczekiwany identyfikator kolekcji jest już zapisany w słowniku współbieżnym, a jeśli tak, usuwa go, dodaje do Ostatecznej kolejki i sprawdza kolejną kolekcję w słowniku współbieżnym.

Teraz, z tego, co przeczytałem i filmów, które oglądałem, wydaje mi się, że TPL Dataflow może być idealnym kandydatem do takiego modelu producenta/konsumenta. Po prostu nie jestem w stanie opracować projektu, a tym samym zacząć, ponieważ nigdy nie pracowałem z TPL Dataflow. Pod względem przepustowości i czasu oczekiwania biblioteka jest nawet do tego zadania? Obecnie przetwarzam 2,5 miliona tablic bajtowych, a więc obiekty na sekundę w wynikowych kolekcjach. Czy TPL Dataflow może pomóc w uproszczeniu? Szczególnie interesuje mnie odpowiedź na następujące pytanie: Czy TPL Dataflow może zachować kolejność pobierania partii z Zadania 1, gdy odradza zadania pracownicze i ponownie je łączy, gdy zadania wykonawcze wykonają swoją pracę? Czy to optymalizuje? Po wyprofilowaniu całej struktury uważam, że jest sporo czasu zmarnowanego ze względu na wirowanie i zbyt wiele współbieżnych kolekcji.

Wszelkie pomysły, myśli?

Odpowiedz

12

EDIT: Okazuje się, że bardzo się myliłem. TransformBlockczy zwraca elementy w takiej samej kolejności, w jakiej zostały wprowadzone, nawet jeśli jest skonfigurowany do równoległości. Z tego powodu kod w mojej oryginalnej odpowiedzi jest zupełnie bezużyteczny i zamiast niego można użyć normalnego TransformBlock.


Oryginalny odpowiedź:

O ile wiem, tylko jeden konstrukt równoległość w NET obsługuje zawracania przetworzonych elementów w kolejności weszli: PLINQ z AsOrdered(). Wydaje mi się jednak, że PLINQ nie pasuje do tego, co chcesz.

Z drugiej strony, TPL Dataflow dobrze pasuje, ale nie ma bloku, który wspierałby równoległość i zwrot przedmiotów w tym samym czasie (TransformBlock obsługuje oba, ale nie ma w tym samym czasie). Na szczęście bloki Dataflow zostały zaprojektowane z myślą o kompozycyjności, więc możemy zbudować własny blok, który to robi.

Najpierw jednak musimy wymyślić, jak zamówić wyniki. Używanie współbieżnego słownika, takiego jak sugerowałeś, wraz z pewnym mechanizmem synchronizacji, z pewnością zadziała. Ale myślę, że istnieje prostsze rozwiązanie: użyj kolejki o numerach Task. W zadaniu wyjściowym usuwamy kolejkę Task, czekając na zakończenie (asynchronicznie), a kiedy to nastąpi, wysyłamy jej wynik.Nadal potrzebujemy jakiejś synchronizacji dla przypadku, gdy kolejka jest pusta, ale możemy ją pobrać za darmo, jeśli wybierzemy kolejkę do sprytnego użycia.

Ogólny pomysł jest taki: to, co piszemy, to IPropagatorBlock, z pewnymi danymi wejściowymi i niektórymi danymi wyjściowymi. Najprostszym sposobem utworzenia niestandardowego IPropagatorBlock jest utworzenie jednego bloku, który przetwarza dane wejściowe, innego bloku, który generuje wyniki i traktuje je jako jeden przy użyciu DataflowBlock.Encapsulate().

Blok wejściowy będzie musiał przetwarzać przychodzące elementy we właściwej kolejności, więc nie będzie tam równoległości. Stworzy nowy Task (w rzeczywistości, , abyśmy mogli później ustawić wynik Task), dodać go do kolejki, a następnie wysłać element do przetworzenia, wraz z pewnym sposobem, aby ustawić wynik prawidłowego Task . Ponieważ nie musimy łączyć tego bloku z niczym, możemy użyć numeru ActionBlock.

Blok wyjściowy będzie musiał pobrać Task s z kolejki, asynchronicznie czekać na nie, a następnie wysłać je razem. Ale ponieważ wszystkie bloki mają osadzoną w nich kolejkę, a bloki, w których delegaci mają wbudowane wbudowane czekanie asynchroniczne, będzie to bardzo proste: new TransformBlock<Task<TOutput>, TOutput>(t => t). Ten blok będzie działać zarówno jako kolejka, jak i jako blok wyjściowy. Z tego powodu nie musimy zajmować się żadną synchronizacją.

Ostatni element układanki przetwarza elementy równolegle. W tym celu możemy użyć innego ActionBlock, tym razem z zestawem MaxDegreeOfParallelism. Przejmie dane wejściowe, przetworzy je i ustawi prawidłową wartość Task w kolejce.

ułożyła, może to wyglądać tak:

public static IPropagatorBlock<TInput, TOutput> 
    CreateConcurrentOrderedTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> transform) 
{ 
    var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t); 

    var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
     tuple => tuple.Item2(transform(tuple.Item1)), 
     new ExecutionDataflowBlockOptions 
     { 
      MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded 
     }); 

    var enqueuer = new ActionBlock<TInput>(
     async item => 
     { 
      var tcs = new TaskCompletionSource<TOutput>(); 
      await processor.SendAsync(
       new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult)); 
      await queue.SendAsync(tcs.Task); 
     }); 

    enqueuer.Completion.ContinueWith(
     _ => 
     { 
      queue.Complete(); 
      processor.Complete(); 
     }); 

    return DataflowBlock.Encapsulate(enqueuer, queue); 
} 

Po tak dużo mówi, że to dość mała ilość kodu, myślę.

Wygląda na to, że bardzo zależy Ci na wydajności, więc możesz potrzebować precyzyjniej dostroić ten kod. Na przykład może być sens ustawienie MaxDegreeOfParallelism bloku processor na coś podobnego do Environment.ProcessorCount, aby uniknąć nadmiernej liczby subskrypcji. Ponadto, jeśli opóźnienie jest ważniejsze niż przepustowość, może być sens, aby ustawić MaxMessagesPerTask tego samego bloku na 1 (lub inną małą liczbę), aby po zakończeniu przetwarzania elementu natychmiast wysłać go do wyjścia.

Ponadto, jeśli chcesz dławić przychodzące przedmioty, możesz ustawić BoundedCapacity z enqueuer.

+0

Wow całkiem sporo gadżetów, które najpierw chciałbym przetrawić i wypróbować. Wielkie dzięki za to, przynajmniej zasługiwał na upominek ;-) Pozwól mi zagrać z tymi pomysłami i wrócę. Kolejkowanie zadań ma sens i zastanawiam się, dlaczego wcześniej tego nie zrobiłem. –

+0

OK Spędzam trochę czasu przeglądając twój post i czytając o TPL Dataflow, tutaj kilka pytań, aby w pełni zrozumieć twoje proponowane rozwiązanie: (1) dlaczego sugerujesz niestandardowy IPropagatorBlock i IDataflowBlock.Encapsulate(), czy Transformblock już istnieje? (2) Nie widzę, jak planujesz połączyć bloki. Najpierw rozmawiasz w ActionBlocks o TransformBlocks. Z tego co przeczytałem, czy ActionBlock nie będzie "punktem końcowym" całej architektury? –

+1

1. Zostało to wyjaśnione w drugim akapicie: 'TransformBlock' nie może przetwarzać elementów równolegle i zwracać je w kolejności w tym samym czasie. Może wykonać jedną z nich, ale nie obie. – svick