2012-07-04 14 views
7

Potrzebuję odrodzenia N wątków konsumenckich, które przetwarzają ten sam InputStream jednocześnie, np. - przekształć go w jakiś sposób, obliczyć sumę kontrolną lub podpis cyfrowy itp. Konsumenci nie zależą od siebie nawzajem i wszystkich z nich korzystają z bibliotek innych firm, które akceptują InputStream jako źródło danych.Równoczesne przetwarzanie pojedynczego strumienia InputStream z niezależnymi klientami

Więc co mogę zrobić, to - tworzyć pewną realizację InputStream, który

  • Przeczytaj fragment danych z „macierzystego” strumień
  • odblokowywanie konsumentów
  • czekać aż do każdego konsumenta przeczytać cały fragment
  • odczytywania następnego klocek

będąc szuka prosty, może rosnąć różne problemy, takie jak livelo ck, gdy pewna klientka umiera, zaimplementuj wszystkie metody InputStream, steruj widelcem/złączem samych konsumentów za pomocą zapór/zatrzasków itp.

Jeden kumpel powiedział mi, że to pół godziny na wdrożenie, to sprawiło, że mój wieczór.

Wolałbym użyć czegoś wystarczająco dojrzałego (google nie przyniosło rezultatów, mój google-fu nie jest wystarczająco dobry?) Lub nie zawracaj sobie głowy i skopiuj cały "źródłowy" strumień do pliku tymczasowego i używać go jako źródła danych. To drugie rozwiązanie wydaje się bardziej niezawodne, ale może skończyć się tworzeniem plików gigabajtowych (na przykład podczas przetwarzania strumieniowego audio).

+0

Czy możesz zapisać dane do pliku i odradzić N FileInputStreams? –

+0

@ JonLin Tak jak powiedział pod koniec pytania, może. –

Odpowiedz

3

Sposób, w jaki to widzę, wymaga przynajmniej pewnego rodzaju buforowania, aby różni konsumenci mogli poruszać się w strumieniu w różnym tempie, bez wszystkiego, co jest stale hamowane przez obecnie najwolniejszego konsumenta. Zasadniczo zapewnia to najgorszą wydajność i bardzo małą korzyść z współbieżności.

Można, na przykład, oznaczyć każdy fragment konsumentami, którzy go używali do tej pory, a następnie usunąć te, które są całkowicie zużyte. Może to być osiągnięte przez każdego konsumenta, który posiada odniesienie do każdego kawałka, którego jeszcze nie użył, co pozwoliłoby GC automatycznie zająć się używanymi porcjami. Producent może przechowywać listę WeakReference s do porcji, więc ma uchwyt na liczbę kawałków, które mają być używane i opiera się na tym ograniczeniu.

Zastanawiam się również nad posiadaniem osobnej instancji InputStream na wątek, która wewnętrznie komunikuje się z producentem InputStream. W ten sposób masz proste rozwiązanie dla twojego ryzyka związanego z blokadą na żywo: try ... finally { is.close(); } - umierający konsument zamyka własny strumień wejściowy. Jest to przekazywane producentowi.

Mam kilka pomysłów na używanie numeru ArrayBlockingQueue na klienta. Pojawiłyby się trudności w zapewnieniu, że wszyscy konsumenci są prawidłowo karmieni, bez konieczności blokowania lub zapracowania producenta.

+0

Nie powiedziałbym, że jest to bardzo mała korzyść - mając 5 konsumentów pracujących na 1 sekundę i jednego konsumenta pracującego przez 2 sekundy, wywołanie współbieżne da 2 sekundy, podczas gdy sekwencja da 7 sekund. Czy może czegoś tutaj brakuje? Po oznakowaniu porcji i buforów uderzę w pamięć, której chciałbym uniknąć. – jdevelop

+0

Tak, to, co mówisz, jest nieuniknione. Jeśli jednak masz średnio zrównoważonych konsumentów, ale ich skuteczność jest bardzo różna, stracisz możliwość zbieżności, jeśli zawsze będziesz czekał na każdego konsumenta, który obecnie pozostaje w tyle. Pomoże w tym buforowanie. A jeśli wprowadzisz równoważenie priorytetów wątków, możesz faktycznie osiągnąć taką sytuację. –

0

Czy rozważałeś użycie strumieni rur? Twój producent może mieć jeden lub więcej PipedOuputStream, w którym rzuca wszystko, co czyta z pliku. Po drugiej stronie potoków masz różne wątki konsumenckie czytające na odpowiadającym PipedInputstream (który jest InputStream, który możesz udostępnić swoim bibliotekom).

Twój wątek producenta może zdecydować, które dane rur powinny zostać przesłane, dzięki temu dane będą przetwarzane dla danego odczytywanego wątku konsumenta po drugiej stronie rury.

Jeśli chcesz odzyskać dane z wątków konsumenckich, możesz utworzyć kolejną rurę w przeciwnym kierunku, aby odesłać dane do Ciebie.

+1

"PipedOutputStream" zablokuje producenta, gdy tylko klient zostanie w tyle, z głodując wszystkich innych konsumentów. –

0

Można wypróbować niektóre implementacje Java Messaging Service (JMS), takie jak Apache ActiveMQ.

W twoim przypadku musisz utworzyć tzw. Temat (patrz Topics vs. Queues). Temat jest tworzony przez producenta i jest publikowany dla N konsumentów, które mogą być uruchomione jednocześnie, a każdy konsument otrzymuje dokładnie te same dane.

Ponieważ chcesz używać InputStream s, istnieje rozdział o tym, jak send messages are streams.

Przypuszczam, że zazwyczaj producenci i konsumenci byliby oddzielnymi procesami, prawdopodobnie działającymi na różnych komputerach w sieci. Myślę, że możesz go skonfigurować tak, aby działał całkowicie w pojedynczej maszynie JVM. Zależy to od wdrożenia JMS. Są one również dość znane: HornetQ by JBoss, RabbitMQ oraz cała masa innych.