2012-12-03 10 views
5

Mam aktora Akka odpowiedzialnego za obsługę połączeń HTTP. Używam scala wysyłkę wysłać wiele żądań HTTP przez API:Jak radzić sobie z wieloma obietnicami w aktorze (akka)?

urls.foreach { u 
    val service = url(u) 
    val promise = Http(service OK as.String).either 
    for(p <- promise) 
    { 
    p match 
    { 
     case Left(error) => 
     faultHandler(error) 
     case Right(result) => 
     resultHandler(result) 
    } 
    } 

W funkcji resultHandler, że przyrost instancję zmiennej nbOfResults i porównać z liczbą połączeń I zrobili.

def resultHandler(result:String) 
{ 
    this.nbOfResults++ 
    ... 
    if(nbOfResults == nbOfCalls) 
    // Do something 
} 

Czy to bezpieczne? Czy dostęp do nbOfResults jest możliwy w tym samym czasie, jeśli dwa połączenia zwracają jednocześnie wyniki?

Na razie wierzyłem, że aktor jest mniej więcej równoważny wątkowi i dlatego funkcje zwrotne nie są wykonywane równocześnie. Czy to jest poprawne ?

+0

Odpowiedzi zawierają sugestie, jak to zrobić, ja po prostu lubię wyraźnie stan na piśmie, że tak, muszą być ostrożni z asynchronicznymi wywołań zwrotnych, ONI ZOSTANĄ PRZYKŁADOWO UZYSKANE. Krótko mówiąc, twoja obsługa nbOfResults w powyższym kodzie jest nieprawidłowa. –

Odpowiedz

3

Oto wariant odpowiedzi Alexey Romanov używając tylko wysyłkowy: poniżej

//Promises will be of type Array[Promise[Either[Throwable, String]]] 
val promises = urls.map { u => 
    val service = url(u) 

    Http(service OK as.String).either 
} 

//Http.promise.all transform an Iterable[Promise[A]] into Promise[Iterable[A]] 
//So listPromise is now of type Promise[Array[Either[Throwable, String]]] 
val listPromise = Http.promise.all(promises) 

for (results <- listPromise) { 
    //Here results is of type Array[Either[Throwable, String]] 

    results foreach { result => 
     result match { 
      Left(error) => //Handle error 
      Right(response) => //Handle response 
     } 
    } 
} 
2

Jest o wiele lepszy sposób:

val promises = urls.map {u => 
    val service = url(u) 
    val promise = Http(service OK as.String).either 
} 

val listPromise = Future.sequence(promises) 

listPromise.onComplete { whatever } 
2

zgadzam się z Alexey Romanov na jego odpowiedź. Niezależnie od wybranej opcji synchronizowania żądań http, uważaj na sposób, w jaki przetwarzasz wypełnienie obietnic. Twoja intuicja jest poprawna, ponieważ równoczesny dostęp może pojawić się na stanie aktora. Im lepszy sposób obsłużyć to byłoby zrobić coś takiego:

def resultHandler(result: String) { 
    //on completion we are sending the result to the actor who triggered the call 
    //as a message 
    self ! HttpComplete(result) 
} 

i otrzymać funkcję aktora:

def receive = { 
    //PROCESS OTHER MESSAGES HERE 
    case HttpComplete(result) => //do something with the result 
} 

W ten sposób można się upewnić, że przetwarzania wyników HTTP nie naruszy stan aktora z zewnątrz, ale od otrzymania pętli aktora, który jest właściwy sposób to zrobić

1
val nbOfResults = new java.util.concurrent.atomic.AtomicInteger(nbOfCalls) 

// After particular call was ended  
if (nbOfResults.decrementAndGet <= 0) { 
    // Do something 
} 

[EDIT] Usunięto starą odpowiedź z AtomicReference CAS - while (true), compareAndSet Itp

+0

Co jest nie tak z incrementAndGet? –

+0

Dodano wariant odpowiedzi, biorąc pod uwagę sugestię – idonnie