Czy jest możliwe, aby Actor
czekać X ilość sekund, aby odebrać wiadomość, a jeśli wiadomość jest odbierana, przetwarzać jak zwykle, w przeciwnym razie wyślij wiadomość do innego Actor
(wstępnie określonego w konstruktorze)?Aktor Akka - poczekaj chwilę, aby spodziewać się wiadomości, w przeciwnym razie wyślij wiadomość
Odpowiedz
Tak, jeśli chcesz czekać na żadnego wiadomość, wystarczy ustawić ReceiveTimeout: http://doc.akka.io/docs/akka/current/scala/actors.html#receive-timeout
(Docs jest tutaj nieco mylące, możesz ustawić receiveTimeout również po każdej wiadomości)
Jest to możliwe, spójrz na Akka Actor "ask" and "Await" with TimeoutException. Należy jednak pamiętać, że blokowanie wewnątrz aktora to bardzo zły pomysł, ponieważ w tym czasie aktor nie może obsłużyć żadnych innych wiadomości. Ponadto blokuje jeden wątek przetwarzania Akka.
Lepszym rozwiązaniem jest wysłanie wiadomości (pożar i zapomnienie) i zaplanowanie jakiegoś zdarzenia timeout przy użyciu Akka scheduler. Kiedy nadejdzie odpowiedź, anuluj to wydarzenie lub ustaw flagę, aby się nie uruchamiała, jeśli odpowiedź rzeczywiście nadejdzie na czas.
Może to być przesada, ale możesz sprawdzić cechę Finite State Machine (FSM).
import akka._
import actor._
import util._
import duration._
import Impatient._
object Impatient {
sealed trait State
case object WaitingForMessage extends State
case object MessageReceived extends State
case object TimeoutExpired extends State
sealed trait Data
case object Unitialized extends Data
// In
case object Message
}
class Impatient(receiver: ActorRef) extends Actor with FSM[State, Data] {
startWith(WaitingForMessage, Unitialized)
when(WaitingForMessage, stateTimeout = 3 seconds) {
case Event(StateTimeout, data) => goto(TimeoutExpired) using data // data is usually modified here
case Event(Message, data) => goto(MessageReceived) using data // data is usually modified here
}
onTransition {
case WaitingForMessage -> MessageReceived => stateData match {
case data => log.info("Received message: " + data)
}
case WaitingForMessage -> TimeoutExpired => receiver ! TimeoutExpired
}
when(MessageReceived) {
case _ => stay
}
when(TimeoutExpired) {
case _ => stay
}
initialize
}
Oto ona w akcji:
object Main extends App {
import akka._
import actor._
import Impatient._
val system = ActorSystem("System")
val receiver = system.actorOf(Props(new Actor with ActorLogging {
def receive = {
case TimeoutExpired => log.warning("Timeout expired")
}
}))
val impatient = system.actorOf(Props(new Impatient(receiver)), name = "Impatient")
impatient ! Message
val impatient2 = system.actorOf(Props(new Impatient(receiver)), name = "Impatient2")
Thread.sleep(4000)
impatient2 ! Message
system.shutdown()
}
+1 dla harmonogramu r rozwiązanie. – paradigmatic