2011-03-10 18 views
5

Przesyłam kilka przyszłych zadań przy użyciu usługi CompletionService owiniętej wokół 2 wątku FixedThreadPool ExecutorService, ustawiam następnie ustawić pętlę równą liczbie zadań, które zostały przesłane i używam usługi completement. take() czeka na wszystkich, aby zakończyć lub zawieść. Kłopoty bardzo rzadko się kończą (ale nie wiem dlaczego), więc zmieniłem metodę take() na sondę (300, Timeout.SECONDS), przy założeniu, że wykonanie jednego zadania zajmuje więcej niż 5 minut sondowanie nie powiedzie się, a następnie w końcu wyjdzie z pętli i mogę przejść przez wszystkie futures i zadzwonić do future.cancel (true), aby wymusić anulowanie zadania przestępczego.Jak anulować zadania, które trwają zbyt długo za pomocą usługi CompletionService

Po uruchomieniu kodu i jego wisi, widzę, że ankieta nie działa nieprzerwanie co 5 minut i nie ma więcej zadań, więc zakładam, że dwóch pracowników jest w jakiś sposób zakleszczonych i nigdy nie kończy, i nigdy nie pozwala na dodatkowe zadania zacząć. Ponieważ limit czasu to 5 minut, a było jeszcze 1000 zadań do wykonania, czas potrzebny do przerwania pętli był zbyt długi, dlatego anulowano zadanie.

Więc to, co chcę zrobić, to przerwać/wymuszenie anulowania bieżącego zadania, jeśli nie zostało zakończone w ciągu 5 minut, ale nie widzę w żaden sposób to zrobić.

Ten przykładowy kod przedstawia uproszczoną wersję tego, co Im mówić o

import com.jthink.jaikoz.exception.JaikozException; 
import java.util.ArrayList; 
import java.util.Collection; 
import java.util.List; 
import java.util.concurrent.*; 

public class CompletionServiceTest 
{ 
    public static void main(final String[] args) 
    { 
     CompletionService<Boolean> cs = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(2)); 
     Collection<Worker> tasks = new ArrayList<Worker>(10); 
     tasks.add(new Worker(1)); 
     tasks.add(new Worker(2)); 
     tasks.add(new Worker(3)); 
     tasks.add(new Worker(4)); 
     tasks.add(new Worker(5)); 
     tasks.add(new Worker(6)); 

     List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(tasks.size()); 
     try 
     { 
      for (Callable task : tasks) 
      { 
       futures.add(cs.submit(task)); 
      } 
      for (int t = 0; t < futures.size(); t++) 
      { 
       Future<Boolean> result = cs.poll(10, TimeUnit.SECONDS); 
       if(result==null) 
       { 
        System.out.println("Worker TimedOut:"); 
        continue; 
       } 
       else 
       { 
        try 
        { 
         if(result.isDone() && result.get()) 
         { 
          System.out.println("Worker Completed:"); 
         } 
         else 
         { 
          System.out.println("Worker Failed"); 
         } 
        } 
        catch (ExecutionException ee) 
        { 
         ee.printStackTrace(); 
        } 
       } 
      } 
     } 
     catch (InterruptedException ie) 
     { 
     } 
     finally 
     { 
      //Cancel by interrupting any existing tasks currently running in Executor Service 
      for (Future<Boolean> f : futures) 
      { 
       f.cancel(true); 
      } 
     } 
     System.out.println("Done"); 
    } 
} 

class Worker implements Callable<Boolean> 
{ 
    private int number; 
    public Worker(int number) 
    { 
     this.number=number; 
    } 

    public Boolean call() 
    { 
     if(number==3) 
     { 
      try 
      { 
       Thread.sleep(50000); 
      } 
      catch(InterruptedException tie) 
      { 

      } 
     } 
     return true; 
    } 
} 

wyjście

Worker Completed: 
Worker Completed: 
Worker Completed: 
Worker Completed: 
Worker Completed: 
Worker TimedOut: 
Done 
+0

@ user294896 - czy możesz podać przykładowy kod w małym, samodzielnym przykładzie? – justkt

+0

@justkt Mogę spróbować, może trochę potrwać. –

+0

Dlaczego samo zadanie nie może zdać sobie sprawy, że trwa ono zbyt długo i zostało przerwane? To znacznie uprościłoby sprawy. – trojanfoe

Odpowiedz

4

Myślę, że go rozwiązałem, w zasadzie, jeśli wystąpi przekroczenie limitu czasu, następnie przejdę przez moją listę przyszłych obiektów i znajdę pierwszą, która się nie zakończyła, i wymuszam anulowanie. Nie wydaje się taki elegancki, ale wydaje się działać.

Zmieniłem rozmiar puli tylko po to, aby pokazać wyjście, które lepiej demonstruje rozwiązanie, ale działa również z dwiema pulami wątków.

import java.util.ArrayList; 
import java.util.Collection; 
import java.util.Date; 
import java.util.List; 
import java.util.concurrent.*; 

public class CompletionServiceTest 
{ 
    public static void main(final String[] args) 
    { 
     CompletionService<Boolean> cs = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(1)); 
     Collection<Worker> tasks = new ArrayList<Worker>(10); 
     tasks.add(new Worker(1)); 
     tasks.add(new Worker(2)); 
     tasks.add(new Worker(3)); 
     tasks.add(new Worker(4)); 
     tasks.add(new Worker(5)); 
     tasks.add(new Worker(6)); 

     List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(tasks.size()); 
     try 
     { 
      for (Callable task : tasks) 
      { 
       futures.add(cs.submit(task)); 
      } 
      for (int t = 0; t < futures.size(); t++) 
      { 
       System.out.println("Invocation:"+t); 
       Future<Boolean> result = cs.poll(10, TimeUnit.SECONDS); 
       if(result==null) 
       { 
        System.out.println(new Date()+":Worker Timedout:"); 
        //So lets cancel the first futures we find that havent completed 
        for(Future future:futures) 
        { 
         System.out.println("Checking future"); 
         if(future.isDone()) 
         { 
          continue; 
         } 
         else 
         { 
          future.cancel(true); 
          System.out.println("Cancelled"); 
          break; 
         } 
        } 
        continue; 
       } 
       else 
       { 
        try 
        { 
         if(result.isDone() && !result.isCancelled() && result.get()) 
         { 
          System.out.println(new Date()+":Worker Completed:"); 
         } 
         else if(result.isDone() && !result.isCancelled() && !result.get()) 
         { 
          System.out.println(new Date()+":Worker Failed"); 
         } 
        } 
        catch (ExecutionException ee) 
        { 
         ee.printStackTrace(System.out); 
        } 
       } 
      } 
     } 
     catch (InterruptedException ie) 
     { 
     } 
     finally 
     { 
      //Cancel by interrupting any existing tasks currently running in Executor Service 
      for (Future<Boolean> f : futures) 
      { 
       f.cancel(true); 
      } 
     } 
     System.out.println(new Date()+":Done"); 
    } 
} 

class Worker implements Callable<Boolean> 
{ 
    private int number; 
    public Worker(int number) 
    { 
     this.number=number; 
    } 

    public Boolean call() 
     throws InterruptedException 
    { 
     try 
     { 
      if(number==3) 
      { 
       Thread.sleep(50000); 
      } 
     } 
     catch(InterruptedException ie) 
     { 
      System.out.println("Worker Interuppted"); 
      throw ie; 
     } 
     return true; 
    } 
} 

Wyjście jest

Invocation:0 
Thu Mar 10 20:51:39 GMT 2011:Worker Completed: 
Invocation:1 
Thu Mar 10 20:51:39 GMT 2011:Worker Completed: 
Invocation:2 
Thu Mar 10 20:51:49 GMT 2011:Worker Timedout: 
Checking future 
Checking future 
Checking future 
Cancelled 
Invocation:3 
Worker Interuppted 
Invocation:4 
Thu Mar 10 20:51:49 GMT 2011:Worker Completed: 
Invocation:5 
Thu Mar 10 20:51:49 GMT 2011:Worker Completed: 
Thu Mar 10 20:51:49 GMT 2011:Done 
2

W przykładzie pracownika, Twój Callable blokuje na rozmowy, które obsługuje przerwanie. Jeśli twój prawdziwy kod blokuje blokadę wewnętrzną (blok synchronized), nie będzie można jej anulować poprzez przerwanie. Zamiast tego można użyć jawnej blokady (java.util.concurrent.Lock), która pozwala określić, jak długo ma czekać na pobranie blokady. Jeśli wątek przekroczy limit czasu oczekiwania na blokadę, prawdopodobnie z powodu wystąpienia zakleszczenia, może przerwać z komunikatem o błędzie.

Nawiasem mówiąc, w twoim przykładzie Twój Callable nie powinien połknąć InterruptedException. Powinieneś albo przekazać go (ponownie rzucić, albo dodać InterruptedException do linii rzutów deklaracji metody) lub w bloku catch, zresetować przerwany stan wątku (z Thread.currentThread().interrupt()).

+0

Kod roboczy nie jest naprawdę reprezentatywny dla prawdziwego pracownika, do którego po prostu włożyłem coś, aby kod działał. Pytanie dotyczy usługi CompletionService/ExecutorService, jak mogę anulować zadanie z usługi, która wydaje się mieć problem, kiedy nie wiem, która przyszłość działa/ma problem. –

+0

ps: powinieneś także sprawdzać Thread.currentThread(). IsInterrupted() w strategicznych punktach twojego kodu. InterruptedException zazwyczaj nie jest generowany, chyba że w operacji blokowania, która obsługuje go. Do ciebie należy sprawdzenie, czy wystąpiło przerwanie przy użyciu metody isInterrupted(), i zatrzymanie zadania po nim. – Matt

1

Zawsze można zadzwonić future.get(timeout...)
To zwróci wyjątek limit czasu, jeśli jeszcze nie skończyć ... to można nazwać future.cancel().