Jest to dość klasyczny przypadek użycia Rx do odpytywania innego systemu. Większość ludzi użyje Observable.Interval
jako swojego operatora, a dla większości będzie dobrze.
Masz jednak określone wymagania dotyczące limitów czasu i ponów próbę. W tym przypadku myślę, że jesteś lepiej wyłączyć za pomocą kombinacji operatorów:
Observable.Timer
, co pozwala na wykonanie zapytanie w określonym czasie
Timeout
do zidentyfikowania i zapytań do bazy danych, które przekroczyły
ToObservable()
do zamień swoje wyniki na Task
na obserwowalną sekwencję.
Retry
pozwala odzyskać po upływie limitów czasu
Repeat
, aby umożliwić kontynuację po pomyślnych zapytaniach do bazy danych. Spowoduje to również zachowanie początkowego okresu/luki między zakończeniem poprzedniego zapytania do bazy danych a rozpoczęciem następnego.
Ta praca LINQPad fragment powinien pokazać kwerenda działa prawidłowo:
void Main()
{
var pollingPeriod = TimeSpan.FromSeconds(5);
var dbQueryTimeout = TimeSpan.FromSeconds(10);
//You will want to have your Rx query timeout after the expected silence of the timer, and then further maximum silence.
var rxQueryTimeOut = pollingPeriod + dbQueryTimeout;
var scheduler = new EventLoopScheduler(ts => new Thread(ts) { Name = "DatabasePoller" });
var query = Observable.Timer(pollingPeriod, scheduler)
.SelectMany(_ => DatabaseQuery().ToObservable())
.Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
.Retry() //Loop on errors
.Repeat(); //Loop on success
query.StartWith("Seed")
.TimeInterval(scheduler) //Just to debug, print the timing gaps.
.Dump();
}
// Define other methods and classes here
private static int delay = 9;
private static int delayModifier = 1;
public async Task<string> DatabaseQuery()
{
//Oscillate the delay between 3 and 12 seconds
delay += delayModifier;
var timespan = TimeSpan.FromSeconds(delay);
if (delay < 4 || delay > 11)
delayModifier *= -1;
timespan.Dump("delay");
await Task.Delay(timespan);
return "Value";
}
Wyniki wyglądać następująco:
Seed 00:00:00.0125407
Timeout 00:00:15.0166379
Timeout 00:00:15.0124480
Timeout 00:00:15.0004520
Timeout 00:00:15.0013296
Timeout 00:00:15.0140864
Value 00:00:14.0251731
Value 00:00:13.0231958
Value 00:00:12.0162236
Value 00:00:11.0138606
Kluczową część próbki jest ....
var query = Observable.Timer(TimeSpan.FromSeconds(5), scheduler)
.SelectMany(_ => DatabaseQuery().ToObservable())
.Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
.Retry() //Loop on errors
.Repeat(); //Loop on success
EDYCJA: Oto dalsze wyjaśnienie, w jaki sposób dotrzeć do tego rozwiązania. https://github.com/LeeCampbell/RxCookbook/blob/master/Repository/Polling.md
Czy możesz dodać więcej szczegółów? Jakie dane odzyskuje zapytanie? czy zapytanie zwraca pojedynczy obiekt? w przypadku przekroczenia limitu czasu mówisz, że chcesz rozpocząć nowe zapytanie, co to za zapytanie? –