2016-06-09 23 views
5

Chcę dodać after(d: FiniteDuration)(callback: => Unit) util do Scala Future s, który pozwoliłby mi na to:Jak dodać czasowego obserwatora do Scala Future?

val f = Future(someTask) 

f.after(30.seconds) { 
    println("f has not completed in 30 seconds!") 
} 

f.after(60.seconds) { 
    println("f has not completed in 60 seconds!") 
} 

Jak mogę to zrobić?

Odpowiedz

0

Jednym ze sposobów jest użycie Future.firstCompletedOf (zobacz blogpost):

val timeoutFuture = Future { Thread.sleep(500); throw new TimeoutException } 

val f = Future.firstCompletedOf(List(f, timeoutFuture)) 
f.map { case e: TimeoutException => println("f has not completed in 0.5 seconds!") } 

gdzie TimeoutException jest jakiś wyjątek lub typ.

+0

Ale 'firstCompletedOf' nie anulować drugi przyszłość, jeśli pierwszy powróci. Więc jeśli większość moich przyszłości trwa kilka milisekund, ale chcę dodać instrukcję debugowania po 30 sekundach, będę tworzył wiele Thread.sleep (30000), które nie zostaną anulowane, prawda? – pathikrit

+0

@pathikrit tak, ale wynik zostanie odrzucony. Jeśli jest to nieblokująca przyszłość (np. 'Val timeoutFuture = akka.pattern.after (500.milliseconds, using = system.scheduler) {...}' z posta na blogu, to nie sądzę, że to nie problem (nie blokuje wątku). –

0

Użyj import akka.pattern.after. Jeśli chcesz go zaimplementować bez akka, tutaj jest source code. Innym (java) przykładem jest TimeoutFuture w com.google.common.util.concurrent.

1

Zwykle używam nici basen wykonawcę i obiecuje:

import scala.concurrent.duration._ 
import java.util.concurrent.{Executors, ScheduledThreadPoolExecutor} 
import scala.concurrent.{Future, Promise} 

val f: Future[Int] = ??? 

val executor = new ScheduledThreadPoolExecutor(2, Executors.defaultThreadFactory(), AbortPolicy) 

def withDelay[T](operation: ⇒ T)(by: FiniteDuration): Future[T] = { 
    val promise = Promise[T]() 
    executor.schedule(new Runnable { 
    override def run() = { 
     promise.complete(Try(operation)) 
    } 
    }, by.length, by.unit) 
    promise.future 
} 

Future.firstCompletedOf(Seq(f, withDelay(println("still going"))(30 seconds))) 
Future.firstCompletedOf(Seq(f, withDelay(println("still still going"))(60 seconds))) 
0

coś takiego, być może:

object PimpMyFuture { 
    implicit class PimpedFuture[T](val f: Future[T]) extends AnyVal { 
     def after(delay: FiniteDuration)(callback: => Unit): Future[T] = { 
      Future { 
      blocking { Await.ready(f, delay) } 
      } recover { case _: TimeoutException => callback } 
      f 
     } 
    } 
    } 

    import PimpMyFuture._ 
    Future { Thread.sleep(10000); println ("Done") } 
    .after(5.seconds) { println("Still going") } 

Ta implementacja jest prosta, ale to w zasadzie podwaja liczbę wątków trzeba - każda aktywna przyszłość skutecznie zajmuje dwa wątki - co jest nieco marnotrawstwem. Alternatywnie możesz użyć zaplanowanych zadań, aby twoje oczekiwania nie były blokowane. Nie wiem, od „standardowego” scheduler w Scala (lib każdy ma własne), ale dla prostego zadania jak to można użyć Java TimerTask bezpośrednio:

object PimpMyFutureNonBlocking {  
val timer = new java.util.Timer 

implicit class PimpedFuture[T](val f: Future[T]) extends AnyVal { 
    def after(delay: FiniteDuration)(callback: => Unit): Future[T] = { 
     val task = new java.util.TimerTask { 
      def run() { if(!f.isCompleted) callback } 
     } 
     timer.schedule(task, delay.toMillis) 
     f.onComplete { _ => task.cancel } 
     f 
    } 
    } 
}