Jeśli naprawdę trzeba czekać na przyszłość, aby zakończyć przed przetworzeniem następną wiadomość, możesz spróbować czegoś takiego:
object SimpleMessageHandler{
case class SimpleMessage()
case class FinishSimpleMessage(i:Int)
}
class SimpleMessageHandler extends Actor with Stash{
import SimpleMessageHandler._
import context._
import akka.pattern.pipe
def receive = waitingForMessage
def waitingForMessage: Receive = {
case SimpleMessage() =>
val futData:Future[Int] = ...
futData.map(FinishSimpleMessage(_)) pipeTo self
context.become(waitingToFinish(sender))
}
def waitingToFinish(originalSender:ActorRef):Receive = {
case SimpleMessage() => stash()
case FinishSimpleMessage(i) =>
//Do whatever you need to do to finish here
...
unstashAll()
context.become(waitingForMessage)
case Status.Failure(ex) =>
//log error here
unstashAll()
context.become(waitingForMessage)
}
}
W tym podejściu, możemy przetworzyć SimpleMessage
a następnie włącz obsługę logiki ukryta wszystkie kolejne SimpleMessage
s otrzymane, dopóki nie otrzymamy wyniku z przyszłości. Kiedy otrzymamy wynik, niepowodzenie lub nie, odetniemy wszystkie pozostałe, które otrzymaliśmy, czekając na przyszłość i udając się na naszą wesołą drogę.
Ten aktor po prostu przełącza się między dwoma stanami, co pozwala tylko w pełni przetworzyć jeden plik SimpleMessage
na raz, bez konieczności blokowania przyszłości.
Czego dokładnie szukasz z danymi pochodzącymi z przyszłości? – cmbaxter
To są dane z mojego db (mongo) i chcę je filtrować, a tylko część z nich zapisywać do innej kolekcji. Ale w zasadzie są to dane db, nie mogę tego uruchomić w tle, i muszę poczekać, aby zakończyć tę akcję przed następnym * SimpleMessage *. –
Czy wysyła wiadomość do "nadawcy", czy nie? –