2016-07-07 48 views
7

Próbuję zaimplementować stronę odczytu w mojej architekturze ES-CQRS. Powiedzmy mam uporczywe aktora tak:Strumień zdarzeń Akka Persistence Query i CQRS

object UserWrite { 

    sealed trait UserEvent 
    sealed trait State 
    case object Uninitialized extends State 
    case class User(username: String, password: String) extends State 
    case class AddUser(user: User) 
    case class UserAdded(user: User) extends UserEvent 
    case class UserEvents(userEvents: Source[(Long, UserEvent), NotUsed]) 
    case class UsersStream(fromSeqNo: Long) 
    case object GetCurrentUser 

    def props = Props(new UserWrite) 
} 

class UserWrite extends PersistentActor { 

    import UserWrite._ 

    private var currentUser: State = Uninitialized 

    override def persistenceId: String = "user-write" 

    override def receiveRecover: Receive = { 
    case UserAdded(user) => currentUser = user 
    } 

    override def receiveCommand: Receive = { 
    case AddUser(user: User) => persist(UserAdded(user)) { 
     case UserAdded(`user`) => currentUser = user 
    } 
    case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo) 
    case GetCurrentUser => sender() ! currentUser 
    } 

    def publishUserEvents(fromSeqNo: Long) = { 
    val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) 
    val userEvents = readJournal 
     .eventsByPersistenceId("user-write", fromSeqNo, Long.MaxValue) 
     .map { case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event } 
    sender() ! UserEvents(userEvents) 
    } 
} 

O ile rozumiem, za każdym razem, gdy zdarzenie pobiera utrzymywały, możemy opublikować go poprzez Akka Persistence Query. Teraz nie jestem pewien, jaki byłby właściwy sposób subskrybowania tych wydarzeń, więc mogę utrzymać go w mojej bazie danych po przeczytaniu? Jednym z pomysłów jest wysłanie najpierw wiadomości o stanie UsersStream od aktora czytającego do UserWrite aktora i zdarzeń "sink" w tym aktorze czytającym.

EDIT

Po sugestii @cmbaxter, I wdrożone czytać drużynie w ten sposób:

object UserRead { 

    case object GetUsers 
    case class GetUserByUsername(username: String) 
    case class LastProcessedEventOffset(seqNo: Long) 
    case object StreamCompleted 

    def props = Props(new UserRead) 
} 

class UserRead extends PersistentActor { 
    import UserRead._ 

    var inMemoryUsers = Set.empty[User] 
    var offset  = 0L 

    override val persistenceId: String = "user-read" 

    override def receiveRecover: Receive = { 
    // Recovery from snapshot will always give us last sequence number 
    case SnapshotOffer(_, LastProcessedEventOffset(seqNo)) => offset = seqNo 
    case RecoveryCompleted         => recoveryCompleted() 
    } 

    // After recovery is being completed, events will be projected to UserRead actor 
    def recoveryCompleted(): Unit = { 
    implicit val materializer = ActorMaterializer() 
    PersistenceQuery(context.system) 
     .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) 
     .eventsByPersistenceId("user-write", offset + 1, Long.MaxValue) 
     .map { 
     case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event 
     } 
     .runWith(Sink.actorRef(self, StreamCompleted)) 
    } 

    override def receiveCommand: Receive = { 
    case GetUsers     => sender() ! inMemoryUsers 
    case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username) 
    // Match projected event and update offset 
    case (seqNo: Long, UserAdded(user)) => 
     saveSnapshot(LastProcessedEventOffset(seqNo)) 
     inMemoryUsers += user 
    } 
} 

Istnieją pewne kwestie jak: strumień zdarzeń wydaje się być powolne. To znaczy. UserRead Aktor może odpowiedzieć zestawem użytkowników, zanim zostanie zapisany nowo dodany użytkownik.

EDIT 2

zwiększyłem częstotliwość odświeżania Cassandra czasopiśmie zapytań, które mniej więcej rozwiązany problem z powolnym strumieniem zdarzeń. Wygląda na to, że dziennik zdarzeń Cassandra jest domyślnie odpytywany co 3 sekundy. W moim application.conf I dodaje:

cassandra-query-journal { 
    refresh-interval = 20ms 
} 

EDIT 3

Właściwie nie zmniejszać częstotliwość odświeżania. To zwiększy użycie pamięci, ale to nie jest niebezpieczne, ani punkt. W ogóle pojęcie CQRS jest takie, że strona zapisu i odczytu są asynchroniczne. Dlatego po zapisaniu danych nigdy nie będą dostępne od razu do odczytu. Radzenie sobie z UI? Po prostu otwieram strumień i przesyłam dane przez zdarzenia wysłane przez serwer po potwierdzeniu ich przez stronę czytającą.

+2

Chciałbym po prostu przenieść kod w oparciu o odczyt czasopism na twojej stronie odczytu projekcji aktor zamiast wysyłać mu wiadomości z 'Source' na nim. Następnie przetwórz ten strumień w tym czytniku rzutowania z boku i przesyłaj te informacje do Elasticsearch. – cmbaxter

+0

@cmbaxter Zrobiłem to. Wydaje się to bardzo dobrym pomysłem. Zaktualizowałem moje pytanie i nadal akceptuję sugestie, ponieważ wciąż mam pewne wątpliwości. –

Odpowiedz

4

Istnieje kilka sposobów, aby to zrobić. Na przykład w mojej aplikacji mam aktora po mojej stronie kwerendy, który ma PersistenceQuery, który stale szuka zmian, ale możesz mieć wątek z tym samym zapytaniem też. Chodzi o to, aby utrzymać strumień otwarty, aby móc jak najszybciej przeczytać utrzymywały zdarzenie jak to się dzieje

val readJournal = 
PersistenceQuery(system).readJournalFor[CassandraReadJournal](
    CassandraReadJournal.Identifier) 

// issue query to journal 
val source: Source[EventEnvelope, NotUsed] = 
    readJournal.eventsByPersistenceId(s"MyActorId", 0, Long.MaxValue) 

// materialize stream, consuming events 
implicit val mat = ActorMaterializer() 
source.map(_.event).runForeach{ 
    case userEvent: UserEvent => { 
    doSomething(userEvent) 
    } 
} 

Zamiast tego można mieć czasomierz, który podnosi się PersistenceQuery i zapisuje nowe zdarzenia, ale myślę, że mając strumień otwarty jest najlepszym sposobem

2

Chociaż rozwiązanie z PersistenceQuery tylko został zatwierdzony, zawiera ona następujące problemy:

  1. jest częściowe, to jedyna metoda, aby przeczytać EventEnvelopes prezentowane.
  2. Nie może pracować z migawkami stanu iw rezultacie część CQRS Reader powinna mieć ponad wszystkie utrwalone zdarzenia nigdy nie zostały utracone.

Pierwsze rozwiązanie jest lepsze, ale ma następujące zagadnienia:

  1. To jest zbyt skomplikowane. Powoduje to niepotrzebne zajmowanie się numerami sekwencji przez użytkownika.
  2. Kod dotyczy stanu (zapytanie/aktualizacja) zbyt powiązanego z implementacją aktorów.

Jest istnieje prostsze:

import akka.NotUsed 
import akka.actor.{Actor, ActorLogging} 
import akka.persistence.query.{EventEnvelope, PersistenceQuery} 
import akka.persistence.query.javadsl.{EventsByPersistenceIdQuery, ReadJournal} 
import akka.persistence._ 
import akka.stream.ActorMaterializer 
import akka.stream.javadsl.Source 

/** 
    * Created by alexv on 4/26/2017. 
    */ 
class CQRSTest { 

    // User Command, will be transformed to User Event 
    sealed trait UserCommand 
    // User Event 
    // let's assume some conversion from Command to event here 
    case class PersistedEvent(command: UserCommand) extends Serializable 
    // User State, for simplicity assumed that all State will be snapshotted 
    sealed trait State extends Serializable{ 
    def clear(): Unit 
    def updateState(event: PersistedEvent): Unit 
    def validateCommand(command:UserCommand): Boolean 
    def applyShapshot(newState: State): Unit 
    def getShapshot() : State 
    } 
    case class SaveSnapshot() 

    /** 
    * Common code for Both reader and writer 
    * @param state - State 
    */ 
    abstract class CQRSCore(state: State) extends PersistentActor with ActorLogging { 
    override def persistenceId: String = "CQRSPersistenceId" 

    override def preStart(): Unit = { 
     // Since the state is external and not depends to Actor's failure or restarts it should be cleared. 
     state.clear() 
    } 

    override def receiveRecover: Receive = { 
     case event : PersistedEvent => state.updateState(event) 
     case SnapshotOffer(_, snapshot: State) => state.applyShapshot(snapshot) 
     case RecoveryCompleted => onRecoveryCompleted(super.lastSequenceNr) 
    } 

    abstract def onRecoveryCompleted(lastSequenceNr:Long) 
    } 

    class CQRSWriter(state: State) extends CQRSCore(state){ 
    override def preStart(): Unit = { 
     super.preStart() 
     log.info("CQRSWriter Started") 
    } 

    override def onRecoveryCompleted(lastSequenceNr: Long): Unit = { 
     log.info("Recovery completed") 
    } 

    override def receiveCommand: Receive = { 
     case command: UserCommand => 
     if(state.validateCommand(command)) { 
      // Persist events and call state.updateState with each persisted event 
      persistAll(List(PersistedEvent(command)))(state.updateState) 
     } 
     else { 
      log.error("Validation Failed for Command: {}", command) 
     } 
     case SaveSnapshot => saveSnapshot(state.getShapshot()) 
     case SaveSnapshotSuccess(metadata) => log.debug("Saved snapshot successfully: {}", metadata) 
     case SaveSnapshotFailure(metadata, reason) => log.error("Failed to Save snapshot: {} . Reason: {}", metadata, reason) 
    } 
    } 

    class CQRSReader(state: State) extends CQRSCore(state){ 
    override def preStart(): Unit = { 
     super.preStart() 
     log.info("CQRSReader Started") 
    } 

    override def onRecoveryCompleted(lastSequenceNr: Long): Unit = { 
     log.info("Recovery completed, Starting QueryStream") 

     // ReadJournal type not specified here, so may be used with Cassandra or In-memory Journal (for Tests) 
     val readJournal = PersistenceQuery(context.system).readJournalFor(
     context.system.settings.config.getString("akka.persistence.query.my-read-journal")) 
     .asInstanceOf[ReadJournal 
     with EventsByPersistenceIdQuery] 
     val source: Source[EventEnvelope, NotUsed] = readJournal.eventsByPersistenceId(
     OrgPersistentActor.orgPersistenceId, lastSequenceNr + 1, Long.MaxValue) 
     source.runForeach({ envelope => state.updateState(envelope.event.asInstanceOf[PersistedEvent]) },ActorMaterializer()) 

    } 

    // Nothing received since it is Reader only 
    override def receiveCommand: Receive = Actor.emptyBehavior 
    } 
} 
+0

Założona część CQRSRead powinna być bezpośrednio zapytana przez jej stan. CQRSReader zapewnia, że ​​stan jest podobny do stanu CQRSWriter. Nie zaimplementowałem tutaj stanu Concrete, ale może to być wszystko, od prostego Hash Map do In-memory Graph DB –