2017-03-03 19 views
5

Próbuję utworzyć operator Rx, który wydaje się całkiem przydatny, ale nieoczekiwanie nie znalazłem żadnych pytań na temat Stackoverflow, które pasują dokładnie. Chciałbym stworzyć odmianę na Throttle, która natychmiast przepuszcza wartości, jeśli był okres bezczynności. Mój wyimaginowany przypadek użycia jest mniej więcej taki:Niestandardowy operator Rx do dławienia tylko wtedy, gdy pojawiła się ostatnia wartość

Mam menu rozwijane, które uruchamia żądanie sieciowe po zmianie wartości. Jeśli użytkownik przytrzyma klawisz strzałki i szybko przejdzie przez wartości, nie chcę uruchamiać żądania dla każdej wartości. Ale jeśli dławię strumień, to użytkownik musi odczekać czas trwania przepustnicy za każdym razem, gdy tylko wybierają wartość z rozwijanego menu w normalny sposób.

więc podczas normalnego Throttle wygląda następująco: Normal Throttle():

Chcę utworzyć ThrottleSubsequent że wyglądać tak: ThrottleSubsequent():

Zauważ, że marmury 1, 2 i 6 są przepuszczane przez bezzwłocznie ponieważ każdy z nich przechodzi okres bezczynności.

Moja próba ta wygląda następującą:

public static IObservable<TSource> ThrottleSubsequent<TSource>(this IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler) 
{ 
    // Create a timer that resets with each new source value 
    var cooldownTimer = source 
     .Select(x => Observable.Interval(dueTime, scheduler)) // Each source value becomes a new timer 
     .Switch(); // Switch to the most recent timer 

    var cooldownWindow = source.Window(() => cooldownTimer); 

    // Pass along the first value of each cooldown window immediately 
    var firstAfterCooldown = cooldownWindow.SelectMany(o => o.Take(1)); 

    // Throttle the rest of the values 
    var throttledRest = cooldownWindow 
     .SelectMany(o => o.Skip(1)) 
     .Throttle(dueTime, scheduler); 

    return Observable.Merge(firstAfterCooldown, throttledRest); 
} 

Ten wydaje do pracy, ale mam to czas trudny rozumowania na ten temat, a mam wrażenie, istnieją pewne przypadki brzegowe tutaj gdzie rzeczy mogą być pomieszane z duplikatami wartości lub czymś. Chciałbym uzyskać informację zwrotną od bardziej doświadczonych Rx-erów, czy ten kod jest poprawny, i/lub czy jest bardziej idiomatyczny sposób na zrobienie tego.

Odpowiedz

2

Cóż, oto zestaw testów (używając Nuget Microsoft.Reactive.Testing):

var ts = new TestScheduler(); 
var source = ts.CreateHotObservable<char>(
    new Recorded<Notification<char>>(200.MsTicks(), Notification.CreateOnNext('A')), 
    new Recorded<Notification<char>>(300.MsTicks(), Notification.CreateOnNext('B')), 
    new Recorded<Notification<char>>(500.MsTicks(), Notification.CreateOnNext('C')), 
    new Recorded<Notification<char>>(510.MsTicks(), Notification.CreateOnNext('D')), 
    new Recorded<Notification<char>>(550.MsTicks(), Notification.CreateOnNext('E')), 
    new Recorded<Notification<char>>(610.MsTicks(), Notification.CreateOnNext('F')), 
    new Recorded<Notification<char>>(760.MsTicks(), Notification.CreateOnNext('G')) 
); 

var target = source.ThrottleSubsequent(TimeSpan.FromMilliseconds(150), ts); 
var expectedResults = ts.CreateHotObservable<char>(
    new Recorded<Notification<char>>(200.MsTicks(), Notification.CreateOnNext('A')), 
    new Recorded<Notification<char>>(450.MsTicks(), Notification.CreateOnNext('B')), 
    new Recorded<Notification<char>>(500.MsTicks(), Notification.CreateOnNext('C')), 
    new Recorded<Notification<char>>(910.MsTicks(), Notification.CreateOnNext('G')) 
); 

var observer = ts.CreateObserver<char>(); 
target.Subscribe(observer); 
ts.Start(); 

ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages); 

i korzystania

public static class TestingHelpers 
{ 
    public static long MsTicks(this int i) 
    { 
     return TimeSpan.FromMilliseconds(i).Ticks; 
    } 
} 

Wydaje się przejść. Jeśli chciał, żeby go zmniejszyć, można przekształcić go w ten sposób:

public static IObservable<TSource> ThrottleSubsequent2<TSource>(this IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler) 
{ 
    return source.Publish(_source => _source 
     .Window(() => _source 
      .Select(x => Observable.Interval(dueTime, scheduler)) 
      .Switch() 
     )) 
     .Publish(cooldownWindow => 
      Observable.Merge(
       cooldownWindow 
        .SelectMany(o => o.Take(1)), 
       cooldownWindow 
        .SelectMany(o => o.Skip(1)) 
        .Throttle(dueTime, scheduler) 
      ) 
     ); 
} 

EDIT:

Publish siły dzielące od abonamentu. Jeśli masz złe (lub drogie) źródło obserwowalne z efektami ubocznymi subskrypcji, Publish zapewnia, że ​​zasubskrybujesz tylko raz. Oto przykład, w którym Publish pomaga:

void Main() 
{ 
    var source = UglyRange(10); 
    var target = source 
     .SelectMany(i => Observable.Return(i).Delay(TimeSpan.FromMilliseconds(10 * i))) 
     .ThrottleSubsequent2(TimeSpan.FromMilliseconds(70), Scheduler.Default) //Works with ThrottleSubsequent2, fails with ThrottleSubsequent 
     .Subscribe(i => Console.WriteLine(i)); 
} 
static int counter = 0; 
public IObservable<int> UglyRange(int limit) 
{ 
    var uglySource = Observable.Create<int>(o => 
    { 
     if (counter++ == 0) 
     { 
      Console.WriteLine("Ugly observable should only be created once."); 
      Enumerable.Range(1, limit).ToList().ForEach(i => o.OnNext(i)); 
     } 
     else 
     { 
      Console.WriteLine($"Ugly observable should only be created once. This is the {counter}th time created."); 
      o.OnError(new Exception($"observable invoked {counter} times.")); 
     } 
     return Disposable.Empty; 
    }); 
    return uglySource; 
} 
+0

Dzięki za skierowanie mnie do ramy testowej, która wygląda całkiem przydatna. W ThrottleSubsequent2, jakie są wywołania funkcji Publish()? Czy twoja funkcja działa inaczej niż moja w jakiś materialny sposób? – jjoelson

+0

Dodano przykład z efektami ubocznymi subskrypcji, aby pokazać korzyści z publikowania. – Shlomo

+0

Ma sens. Czy można powiedzieć, że powinieneś zawsze używać czegoś takiego jak Publish(), gdy nie wiesz, czy twoja transformacja będzie używana z gorącymi lub zimnymi obserwacjami? – jjoelson