2015-03-25 7 views
5

Mam klasę, która będzie odpowiedzialna za generowanie zdarzeń w częstych, ale nieregularnych odstępach czasu, które inne klasy muszą spożywać i obsługiwać. Chcę użyć Reactive Extensions do wykonania tego zadania.Jak wypchnąć obiekt na Rx Obserwowalny?

Strona konsumenta tego jest bardzo prosta; Mam swoją klasę konsumencką implementującą IObserver<Payload> i wszystko wydaje się dobrze. Problem pojawia się w klasie producenta.

Wdrażanie IObservable<Payload> bezpośrednio (czyli oddanie własną implementację dla IDisposable Subscribe(IObserver<Payload>) jest, zgodnie z dokumentacją, nie jest zalecane. To sugeruje zamiast komponować z Observable.Create() zestaw funkcji. Ponieważ moja klasa będzie działać przez długi czas, I” Wcześniej próbował tworzenia obserwowanej z var myObservable = Observable.Never(), a potem, kiedy mam nowe Bloki danych dostępne, nazywając myObservable.Publish(payloadData). kiedy to zrobić, choć ja nie wydają się trafić realizację OnNext w moim konsumenta.

myślę, jak obejście, mogę utworzyć wydarzenie w mojej klasie, a następnie utworzyć obserwowalne za pomocą funkcji FromEvent, ale wydaje się, że jest to zbyt mplicated podejście (tj. wydaje się dziwne, że nowa gorliwość Observables "wymaga" zdarzeń do działania). Czy istnieje tu proste podejście, które tu widzę? Jaki jest "standardowy" sposób tworzenia własnych obserwowalnych źródeł?

+1

Zasadniczo, jeśli implementujesz 'IObservable ' lub 'IObserver ', prawdopodobnie robisz coś źle. – Enigmativity

Odpowiedz

6

Tworzenie new Subject<Payload> i nazywają to OnNext metoda wysłać zdarzenie. Możesz Subscribe na temat ze swoim obserwatorem.

Stosowanie tematów jest często dyskutowane. Aby uzyskać dokładną dyskusję na temat używania obiektów (które łączą się z podkładem), zobacz here - ale w skrócie ten przypadek użycia brzmi poprawnie (to znaczy prawdopodobnie spełnia lokalne + gorące kryteria).

Na marginesie, przeciążenia subskrybenta przyjmującego delegatów mogą spowodować, że użytkownik nie będzie musiał wykonać implementacji IObserver<T>.

+0

Koncepcja tematu i podanych linków naprawdę pomogły mi zrozumieć to bardziej. – GWLlosa

1

Observable.Never() nie „wyślij” informację, należy użyć Observable.Return(yourValue)

Jeśli potrzebujesz przewodnika z konkretnymi przykładami Polecam czytanie Intro to Rx

+0

Mogę być bardzo zdezorientowany tutaj ... Jeśli użyję Observable.Return, czy to nie będzie miało jedynej wartości? Potrzebuję takiego, który pozostanie przy życiu przez jakiś czas, dzięki czemu będę mógł okresowo dodawać do niego więcej wartości, gdy je obliczę. – GWLlosa

+0

Powinieneś naprawdę przeczytać "Intro to Rx", IObservable to sekwencja zdarzeń, kiedy Obserwator dostaje OnNext (wartość T) otrzymujesz najnowszą wartość do pracy. – kondas

+1

Zgodnie z dokumentami, "jedna ostateczna podstawowa metoda lub prymitywny konstruktor nazywa się Return, a jego rolą jest reprezentowanie pojedynczej sekwencji , podobnie jak jednokomórkowa matryca znajdowałaby się w świecie sekwencji IEnumerable. zasubskrybowani obserwatorzy to dwa komunikaty: OnNext z wartością i OnCompleted sygnalizujący koniec: otrzymano tej sekwencji: IObservable source = Observable.Return (42); Uruchomienie tego kodu daje następujące dane wyjściowe: OnNext: 42 OnCompleted "Nie chcę ukończonego sygnału, chcę, aby sekwencja pozostała otwarta/żywa – GWLlosa

0

Chyba że natknąć się lepszy sposób robić to, co ja "Na razie ustaliliśmy, że używamy BlockingCollection.

var _itemsToSend = new BlockingCollection<Payload>(); 
IObservable<MessageQueuePayload> _deliverer = 
_itemsToSend.GetConsumingEnumerable().ToObservable(Scheduler.Default);