Mam problemy z rekursywnym łańcuchem obserwowalnych obiektów.RxJS: Rekurencyjna lista obserwowalnych i pojedynczy obserwator
Pracuję z RxJS, który jest obecnie w wersji 1.0.10621, i zawiera najbardziej podstawową funkcjonalność Rx, w połączeniu z Rx dla jQuery.
Pozwolę sobie przedstawić przykładowy scenariusz mojego problemu: Przepytam Twitter search API (odpowiedź JSON) dla tweetów/aktualizacji zawierających określone słowo kluczowe. Odpowiedź zawiera również "refresh_url", którego należy użyć do wygenerowania żądania kontynuacji. Odpowiedź na to kolejne żądanie ponownie będzie zawierała nowe refresh_url, itp.
Rx.jQuery pozwala mi uczynić API wyszukiwania Twittera możliwym do zaobserwowania zdarzeniem, które produkuje jeden onNext, a następnie kończy. Próbowałem do tej pory, aby program obsługi onNext zapamiętał parametr refresh_url i użył go w module obsługi onCompleted, aby wygenerować zarówno nowego obserwowalnego, jak i odpowiedniego obserwatora dla następnego żądania. W ten sposób jedna para obserwowalnych + obserwatorów podąża za drugą w nieskończoność.
Problem z tym podejściem jest:
Następca obserwowalne/obserwator już żył, kiedy ich poprzednicy nie zostały jeszcze wyrzucać.
Muszę wykonać wiele nieprzyjemnych zapisów księgowych, aby zachować ważne odniesienie do obecnego żywego obserwatora, którego faktycznie mogą być dwa. (Jeden w OnCompleted, a drugi gdzie indziej w cyklu życia) Odniesienie to jest oczywiście potrzebne do rezygnacji z subskrypcji/usunięcia obserwatora. Alternatywą dla księgowości byłoby wprowadzenie efektu ubocznego za pomocą "nadal działającego?" - boolowskiego, tak jak zrobiłem to w moim przykładzie.
Przykładowy kod:
running = true;
twitterUrl = "http://search.twitter.com/search.json";
twitterQuery = "?rpp=10&q=" + encodeURIComponent(text);
twitterMaxId = 0; //actually twitter ignores its since_id parameter
newTweetObserver = function() {
return Rx.Observer.create(
function (tweet) {
if (tweet.id > twitterMaxId) {
twitterMaxId = tweet.id;
displayTweet(tweet);
}
}
);
}
createTwitterObserver = function() {
twitterObserver = Rx.Observer.create(
function (response) {
if (response.textStatus == "success") {
var data = response.data;
if (data.error == undefined) {
twitterQuery = data.refresh_url;
var tweetObservable;
tweetObservable = Rx.Observable.fromArray(data.results.reverse());
tweetObservable.subscribe(newTweetObserver());
}
}
},
function(error) { alert(error); },
function() {
//create and listen to new observer that includes a delay
if (running) {
twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery).delay(3000);
twitterObservable.subscribe(createTwitterObserver());
}
}
);
return twitterObserver;
}
twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery);
twitterObservable.subscribe(createTwitterObserver());
Nie daj się nabrać na podwójną warstwą obserwable/obserwatorów z wnioskami do tweetów. Mój przykład dotyczy głównie pierwszej warstwy: żądania danych z Twittera. Jeśli w rozwiązaniu tego problemu druga warstwa (przekształcając odpowiedzi na tweety) może stać się jednym z pierwszych, byłoby fantastycznie; Ale myślę, że to zupełnie inna sprawa. Na razie.
Erik Meijer wskazał mi operator Expand (patrz przykład poniżej) i zasugerował Join patterns jako alternatywę.
var ys = Observable.Expand
(new[]{0}.ToObservable() // initial sequence
, i => (i == 10 ? Observable.Empty<int>() // terminate
: new[]{i+1}.ToObservable() // recurse
)
);
ys.ToArray().Select(a => string.Join(",", a)).DumpLive();
Powinno to być możliwe do skopiowania do LINQPad. Zakłada pojedyncze obserwacje i produkuje jednego końcowego obserwatora.
Moje pytanie brzmi: w jaki sposób mogę zrobić najładniejszą sztuczkę w RxJS?
EDYTOWANIE:
Operator rozszerzenia może zostać prawdopodobnie zaimplementowany, jak pokazano w this thread. Ale trzeba by było generators (a ja mam tylko JS < 1.6).
Niestety RxJS 2.0.20304-beta nie implementuje metody Extend.
I doszli do wniosku, że rozwiązaniem tego problemu jest rzeczywiście [Expand operator] (http://social.msdn.microsoft.com/Forums/da-DK/rx/thread/2746e373- bf43-4381-834c-8cc182704ae9), który nie został jeszcze zaimplementowany w RxJS do wersji 2.0.20304-beta. – derabbink
expand jest dokładnie tym, czego potrzebujesz. Rozumiem, że ten post jest stary, ale możliwe jest po prostu przeszczepienie operatorów, których potrzebujesz, od przyszłej wersji Rx do własnej wersji Rx. Może wymagać pewnych manipulacji, ale podstawa kodu nie zmieniła się znacząco od wersji sprzed wersji v1. –
Ponadto, poprawianie Rx (jeśli to możliwe) jest dobrą decyzją. Istnieje wiele poprawek i usprawnień w najnowszych wersjach. –