2016-08-11 40 views
7

Próbuję stworzyć system, za pomocą którego moja aplikacja może odbierać dane strumieniowe z kanału Redis PubSub i przetwarzać je. Redis driver że używam, wraz ze wszystkimi innymi kierowcami Redis rdzy, które widziałem, użyj operacji blokowania, aby uzyskać dane z kanału, który zwraca tylko wartość po otrzymaniu danych:Jak wdrożyć strumień kontraktów terminowych na połączenie blokujące za pomocą futures.rs i Redis PubSub?

let msg = match pubsub.get_message() { 
     Ok(m) => m, 
     Err(_) => panic!("Could not get message from pubsub!") 
}; 
let payload: String = match msg.get_payload() { 
    Ok(s) => s, 
    Err(_) => panic!("Could not convert redis message to string!") 
}; 

I chciałem użyć biblioteki futures-rs, aby zawrzeć to wywołanie funkcji blokującej w przyszłości, aby móc wykonywać inne zadania w mojej aplikacji podczas oczekiwania na dane wejściowe.

Przeczytałem tutorial dla kontraktów terminowych i próbowałem stworzyć Stream, który sygnalizowałby, kiedy dane są odbierane przez PubSub, ale nie mogę wymyślić, jak to zrobić.

Jak mogę utworzyć funkcje schedule i poll dla funkcji blokowania pubsub.get_message()?

+5

Korzystanie z biblioteki w dniu, w którym miała ona swoje wielkie ogłoszenie; jak bardzo ambitny!^_^ – Shepmaster

Odpowiedz

10

Ciężkie zastrzeżenie Nigdy wcześniej nie korzystałem z tej biblioteki, a moja niska wiedza na temat niektórych koncepcji jest trochę ... brakująca. Przeważnie czytam przez the tutorial. Jestem prawie pewien, że każdy, kto wykonał pracę asynchroniczną, przeczyta to i będzie się śmiał, ale może być przydatnym punktem wyjścia dla innych ludzi. Zastanówcie się nad emptorem!


Zacznijmy od czegoś nieco prostszy, demonstrując jak Stream prace. Możemy konwertować iterator o Result s do strumienia:

extern crate futures; 

use futures::Future; 
use futures::stream::{self, Stream}; 

fn main() { 
    let payloads: Vec<Result<String,()>> = vec![Ok("a".into()), Ok("b".into())]; 
    let payloads = stream::iter(payloads.into_iter()); 

    let foo = payloads 
     .and_then(|payload| futures::finished(println!("{}", payload))) 
     .for_each(|_| Ok(())); 

    foo.forget(); 
} 

To pokazuje nam jeden sposób, aby konsumować strumienia. Używamy and_then, aby zrobić coś dla każdego ładunku (tutaj tylko go wydrukujemy), a następnie for_each, aby przekonwertować Stream z powrotem na Future. Możemy wtedy uruchomić przyszłość, nazywając dziwnie nazwaną forget method.


Następnie należy powiązać bibliotekę Redis z miksem, obsługując tylko jedną wiadomość. Ponieważ metoda get_message() blokuje się, musimy wprowadzić kilka wątków do miksu. Nie jest dobrym pomysłem wykonywanie dużej ilości pracy w tego typu systemie asynchronicznym, ponieważ wszystko inne będzie blokowane. For example:

O ile nie jest to inaczej ustawione jest tak, że należy zapewnić, że implementacje tej funkcji wykończenia bardzo szybko.

W idealnym świecie skrzynka redis zostanie zbudowana na takiej bibliotece jak futures i wystawi to wszystko natywnie.

extern crate redis; 
extern crate futures; 

use std::thread; 
use futures::Future; 
use futures::stream::{self, Stream}; 

fn main() { 
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis"); 

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle"); 
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel"); 

    let (tx, payloads) = stream::channel(); 

    let redis_thread = thread::spawn(move || { 
     let msg = pubsub.get_message().expect("Unable to get message"); 
     let payload: Result<String, _> = msg.get_payload(); 
     tx.send(payload).forget(); 
    }); 

    let foo = payloads 
     .and_then(|payload| futures::finished(println!("{}", payload))) 
     .for_each(|_| Ok(())); 

    foo.forget(); 
    redis_thread.join().expect("unable to join to thread"); 
} 

Moje zrozumienie staje się bardziej nieostre. W oddzielnym wątku blokujemy wiadomość i przenosimy ją do kanału, gdy ją otrzymamy. Czego nie rozumiem, to dlaczego musimy trzymać rączkę nici. Spodziewam się, że foo.forget będzie się blokował, czekając, aż strumień będzie pusty.

W związku telnet do serwera Redis, wyślij to:

publish rust awesome 

I widać to działa. Dodanie instrukcji print pokazuje, że instrukcja (dla mnie) foo.forget jest uruchamiana przed utworzeniem wątku.


Wiele wiadomości jest trudniejsze. Urządzenie Sender zużywa się, aby zapobiec zbyt daleko od strony generującej przed stroną zużywającą. Osiąga się to poprzez zwrócenie kolejnej przyszłości z send! Musimy promu z powrotem stamtąd do ponownego użycia go do następnej iteracji pętli:

extern crate redis; 
extern crate futures; 

use std::thread; 
use std::sync::mpsc; 

use futures::Future; 
use futures::stream::{self, Stream}; 

fn main() { 
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis"); 

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle"); 
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel"); 

    let (tx, payloads) = stream::channel(); 

    let redis_thread = thread::spawn(move || { 
     let mut tx = tx; 

     while let Ok(msg) = pubsub.get_message() { 
      let payload: Result<String, _> = msg.get_payload(); 

      let (next_tx_tx, next_tx_rx) = mpsc::channel(); 

      tx.send(payload).and_then(move |new_tx| { 
       next_tx_tx.send(new_tx).expect("Unable to send successor channel tx"); 
       futures::finished(()) 
      }).forget(); 

      tx = next_tx_rx.recv().expect("Unable to receive successor channel tx"); 
     } 
    }); 

    let foo = payloads 
     .and_then(|payload| futures::finished(println!("{}", payload))) 
     .for_each(|_| Ok(())); 

    foo.forget(); 
    redis_thread.join().expect("unable to join to thread"); 
} 

Jestem pewien, że nie będzie więcej ekosystem dla tego rodzaju współdziałania w miarę upływu czasu. Na przykład skrzynia futures-cpupool może być rozszerzona o , prawdopodobnie o, aby obsługiwać podobny przypadek.

+0

Dzięki za niesamowitą odpowiedź! Tylko jedno pytanie: czy nie przyłączając się do 'redis_thread' nie neguje całego wysiłku, aby proces czytania wyników nie był blokowany? Być może jest coś, czego nie rozumiem. – Ameo

+1

"Spodziewałbym się, że foo.forget będzie blokował się, czekając, aż strumień będzie pusty" W rzeczywistości futures nie są zobowiązane do zapewnienia metody "blokuj do czasu". 'forget()', jeśli chodzi o jego opis, jest potrzebne, aby zapobiec automatycznemu anulowaniu, gdy przyszłość zostanie opuszczona, ale nie jest związane z oczekiwaniem. W Scali na przykład nie ma na to metody "Future", ale zamiast tego istnieje osobna para metod "Await.ready' /' Await.result', które czekają, aż przyszłość będzie gotowa w ciągu pewnego czasu. –

+1

O ile rozumiem, w przyszłości-rs można zaimplementować coś podobnego za pomocą ['Future :: select'] (http://alexcrichton.com/futures-rs/futures/trait.Future.html#method .select), gdzie druga przyszłość kończy się po określonym czasie oczekiwania. –