Ciężkie zastrzeżenie Nigdy wcześniej nie korzystałem z tej biblioteki, a moja niska wiedza na temat niektórych koncepcji jest trochę ... brakująca. Przeważnie czytam przez the tutorial. Jestem prawie pewien, że każdy, kto wykonał pracę asynchroniczną, przeczyta to i będzie się śmiał, ale może być przydatnym punktem wyjścia dla innych ludzi. Zastanówcie się nad emptorem!
Zacznijmy od czegoś nieco prostszy, demonstrując jak Stream
prace. Możemy konwertować iterator o Result
s do strumienia:
extern crate futures;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let payloads: Vec<Result<String,()>> = vec![Ok("a".into()), Ok("b".into())];
let payloads = stream::iter(payloads.into_iter());
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
}
To pokazuje nam jeden sposób, aby konsumować strumienia. Używamy and_then
, aby zrobić coś dla każdego ładunku (tutaj tylko go wydrukujemy), a następnie for_each
, aby przekonwertować Stream
z powrotem na Future
. Możemy wtedy uruchomić przyszłość, nazywając dziwnie nazwaną forget
method.
Następnie należy powiązać bibliotekę Redis z miksem, obsługując tylko jedną wiadomość. Ponieważ metoda get_message()
blokuje się, musimy wprowadzić kilka wątków do miksu. Nie jest dobrym pomysłem wykonywanie dużej ilości pracy w tego typu systemie asynchronicznym, ponieważ wszystko inne będzie blokowane. For example:
O ile nie jest to inaczej ustawione jest tak, że należy zapewnić, że implementacje tej funkcji wykończenia bardzo szybko.
W idealnym świecie skrzynka redis zostanie zbudowana na takiej bibliotece jak futures i wystawi to wszystko natywnie.
extern crate redis;
extern crate futures;
use std::thread;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let msg = pubsub.get_message().expect("Unable to get message");
let payload: Result<String, _> = msg.get_payload();
tx.send(payload).forget();
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
Moje zrozumienie staje się bardziej nieostre. W oddzielnym wątku blokujemy wiadomość i przenosimy ją do kanału, gdy ją otrzymamy. Czego nie rozumiem, to dlaczego musimy trzymać rączkę nici. Spodziewam się, że foo.forget
będzie się blokował, czekając, aż strumień będzie pusty.
W związku telnet do serwera Redis, wyślij to:
publish rust awesome
I widać to działa. Dodanie instrukcji print pokazuje, że instrukcja (dla mnie) foo.forget
jest uruchamiana przed utworzeniem wątku.
Wiele wiadomości jest trudniejsze. Urządzenie Sender
zużywa się, aby zapobiec zbyt daleko od strony generującej przed stroną zużywającą. Osiąga się to poprzez zwrócenie kolejnej przyszłości z send
! Musimy promu z powrotem stamtąd do ponownego użycia go do następnej iteracji pętli:
extern crate redis;
extern crate futures;
use std::thread;
use std::sync::mpsc;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let mut tx = tx;
while let Ok(msg) = pubsub.get_message() {
let payload: Result<String, _> = msg.get_payload();
let (next_tx_tx, next_tx_rx) = mpsc::channel();
tx.send(payload).and_then(move |new_tx| {
next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
futures::finished(())
}).forget();
tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
}
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
Jestem pewien, że nie będzie więcej ekosystem dla tego rodzaju współdziałania w miarę upływu czasu. Na przykład skrzynia futures-cpupool może być rozszerzona o , prawdopodobnie o, aby obsługiwać podobny przypadek.
Korzystanie z biblioteki w dniu, w którym miała ona swoje wielkie ogłoszenie; jak bardzo ambitny!^_^ – Shepmaster