2015-11-14 9 views
8

Muszę wysłać zapytanie do bazy danych w odpowiednim czasie, aby poznać stan istniejącego systemu. Myślałem o opakowaniu kwerendy wokół Observable, ale nie wiem prawidłowy sposób to zrobić.Pobieranie bazy danych z rozszerzeniami reaktywnymi

Zasadniczo będzie to to samo zapytanie co 5 sekund. Ale obawiam się, że będę musiał stawić czoła tym problemom:

  • Co zrobić, jeśli wykonanie zapytania zajmuje 10 sekund? Nie chcę, aby wykonywał nowe zapytanie, jeśli poprzednie jest nadal przetwarzane.
  • Powinien również nastąpić przekroczenie limitu czasu. Jeśli aktualne zapytanie nie zostanie wykonane po, na przykład, 20 sekundach, komunikat informacyjny powinien być zarejestrowany jako i należy wysłać nową próbę (to samo zapytanie).

Dodatkowe szczegóły:

  • zapytanie jest tylko SELECT które zwraca zestaw danych z listą kodów stanu (pracę, zarzucić).
  • Sekwencja Observable zawsze pobiera najnowsze dane otrzymane z zapytania, podobnie jak metoda rozszerzenia Switch.
  • Chciałbym zawinąć kwerendę bazy danych (długa operacja) do zadania, ale nie jestem pewien, czy jest to najlepsza opcja.

Jestem prawie pewien, że zapytanie powinno zostać wykonane w innym wątku, ale nie mam pojęcia, jak powinno wyglądać obserwowalne, kiedykolwiek czytając Introduction to Rx by Lee Campbell.

+0

Czy możesz dodać więcej szczegółów? Jakie dane odzyskuje zapytanie? czy zapytanie zwraca pojedynczy obiekt? w przypadku przekroczenia limitu czasu mówisz, że chcesz rozpocząć nowe zapytanie, co to za zapytanie? –

Odpowiedz

14

Jest to dość klasyczny przypadek użycia Rx do odpytywania innego systemu. Większość ludzi użyje Observable.Interval jako swojego operatora, a dla większości będzie dobrze.

Masz jednak określone wymagania dotyczące limitów czasu i ponów próbę. W tym przypadku myślę, że jesteś lepiej wyłączyć za pomocą kombinacji operatorów:

  • Observable.Timer, co pozwala na wykonanie zapytanie w określonym czasie
  • Timeout do zidentyfikowania i zapytań do bazy danych, które przekroczyły
  • ToObservable() do zamień swoje wyniki na Task na obserwowalną sekwencję.
  • Retry pozwala odzyskać po upływie limitów czasu
  • Repeat, aby umożliwić kontynuację po pomyślnych zapytaniach do bazy danych. Spowoduje to również zachowanie początkowego okresu/luki między zakończeniem poprzedniego zapytania do bazy danych a rozpoczęciem następnego.

Ta praca LINQPad fragment powinien pokazać kwerenda działa prawidłowo:

void Main() 
{ 
    var pollingPeriod = TimeSpan.FromSeconds(5); 
    var dbQueryTimeout = TimeSpan.FromSeconds(10); 

    //You will want to have your Rx query timeout after the expected silence of the timer, and then further maximum silence. 
    var rxQueryTimeOut = pollingPeriod + dbQueryTimeout; 

    var scheduler = new EventLoopScheduler(ts => new Thread(ts) { Name = "DatabasePoller" }); 

    var query = Observable.Timer(pollingPeriod, scheduler) 
        .SelectMany(_ => DatabaseQuery().ToObservable()) 
        .Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler) 
        .Retry() //Loop on errors 
        .Repeat(); //Loop on success 

    query.StartWith("Seed") 
     .TimeInterval(scheduler) //Just to debug, print the timing gaps. 
     .Dump(); 
} 

// Define other methods and classes here 
private static int delay = 9; 
private static int delayModifier = 1; 
public async Task<string> DatabaseQuery() 
{ 
    //Oscillate the delay between 3 and 12 seconds 
    delay += delayModifier; 
    var timespan = TimeSpan.FromSeconds(delay); 
    if (delay < 4 || delay > 11) 
     delayModifier *= -1; 
    timespan.Dump("delay"); 
    await Task.Delay(timespan); 
    return "Value"; 
} 

Wyniki wyglądać następująco:

Seed 00:00:00.0125407 
Timeout 00:00:15.0166379 
Timeout 00:00:15.0124480 
Timeout 00:00:15.0004520 
Timeout 00:00:15.0013296 
Timeout 00:00:15.0140864 
Value 00:00:14.0251731 
Value 00:00:13.0231958 
Value 00:00:12.0162236 
Value 00:00:11.0138606 

Kluczową część próbki jest ....

var query = Observable.Timer(TimeSpan.FromSeconds(5), scheduler) 
       .SelectMany(_ => DatabaseQuery().ToObservable()) 
       .Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler) 
       .Retry() //Loop on errors 
       .Repeat(); //Loop on success 

EDYCJA: Oto dalsze wyjaśnienie, w jaki sposób dotrzeć do tego rozwiązania. https://github.com/LeeCampbell/RxCookbook/blob/master/Repository/Polling.md

+0

Czy głównym celem korzystania z EventLoopScheduler tutaj, aby upewnić się, że kwerenda działa na tym samym wątku? Czy jest to najlepszy sposób, aby przejść do odpytywania innego systemu za pomocą RX? – jumpercake

+0

Prawidłowo, to jest intencja tutaj. W tym przypadku sugerowałbym to, aby nie konkurować z pulą zadań/wątków. Jak nazwa wątku w tym przykładzie, większość produktów logowania również to ujawni. Jednak użycie EventLoopScheduler tutaj nie jest obowiązkowe, Rx będzie utrzymywać serializację pracy. –

+0

Zaktualizowano, aby dołączyć link do przykładowego kodu z działaniami, aby dojść do wdrożenia –

1

myślę, że to, co należy zrobić:

var query = 
    from n in Observable.Interval(TimeSpan.FromSeconds(5.0)) 
    from ds in Observable.Amb(
     Observable.Start(() => /* Your DataSet query */), 
     Observable 
      .Timer(TimeSpan.FromSeconds(10.0)) 
      .Select(_ => new DataSet("TimeOut"))) 
    select ds; 

To wyzwala nowe zapytanie z przerwą między egzekucje 5 sekund. To nie 5 sekund od ostatniego uruchomienia, minęło 5 sekund od ostatniego zakończenia.

Następnie wypróbować zapytanie, ale .Amb to z zegarem, który zwraca specjalną DataSet po 10 sekundach. Jeśli zapytanie zakończy się przed upływem 10 sekund, wygrywa, ale w przeciwnym razie zwracane jest specjalne DataSet. Operator .Amb jest w zasadzie operatorem "wyścigowym" - pierwszy obserwowalny w celu uzyskania wartości wygrywa.

+0

Wow, połączenie obserwowalnych jest imponujące! Masz na myśli to, że obie obserwowalne będą ścigać się, a Amb dostanie pierwszy. To, co mnie traci, to 2 zagnieżdżone z klas. Czy ta część jest tą, która dopuszcza część, w której mówisz: "To wyzwala nowe zapytanie z przerwą między wykonaniami 5 sekund. Nie minęło 5 sekund od ostatniego uruchomienia, to 5 sekund od ostatniego zakończenia."? – SuperJMN

+1

@SuperJMN - Dzięki. 'Observable.Interval (TimeSpan.FromSeconds (5.0))' strzela tylko po 5 sekundach po tym, jak wszyscy subskrybenci zakończyli swoją pracę. Tak więc jeśli druga część zapytania działa, interwał nie zostanie wywołany do 5 sekund po zakończeniu. – Enigmativity

+0

Używam 'UseMany', aby zużywać swój" Observable.Interval ", który jest niczym, blokując zużycie, więc będzie zaznaczał się co 5 sekund, tj. Nie raz, gdy zapytanie DB zakończy się. –