2009-05-01 14 views
75

Mam klasy, która pobiera obiekty z BlockingQueue i przetwarza je, wywołując take() w pętli ciągłej. W pewnym momencie wiem, że do kolejki nie zostaną dodane żadne obiekty. Jak przerwać metodę take(), aby przestała blokować?Jak przerwać działanie BlockingQueue, które blokuje się przy pobieraniu()?

Tutaj jest klasa, która przetwarza obiekty:

public class MyObjHandler implements Runnable { 

    private final BlockingQueue<MyObj> queue; 

    public class MyObjHandler(BlockingQueue queue) { 
    this.queue = queue; 
    } 

    public void run() { 
    try { 
     while (true) { 
     MyObj obj = queue.take(); 
     // process obj here 
     // ... 
     } 
    } catch (InterruptedException e) { 
     Thread.currentThread().interrupt(); 
    } 
    } 
} 

i tu jest metoda, która używa tej klasy do przetwarzania obiektów:

public void testHandler() { 

    BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 

    MyObjectHandler handler = new MyObjectHandler(queue); 
    new Thread(handler).start(); 

    // get objects for handler to process 
    for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext();) { 
    queue.put(i.next()); 
    } 

    // what code should go here to tell the handler 
    // to stop waiting for more objects? 
} 

Odpowiedz

61

przypadku przerwania nici nie jest to możliwe, drugi jest umieszczenie „marker” lub „polecenia” obiekt w kolejce, które są rozpoznawane jako takie przez MyObjHandler i przerwy z pętli.

+34

Jest to tak zwane podejście "Poison Pill Shutdown" i jest szeroko omawiane w "Java Concurrency in Practice", w szczególności na stronach 155-156. –

12
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 
MyObjectHandler handler = new MyObjectHandler(queue); 
Thread thread = new Thread(handler); 
thread.start(); 
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext();) { 
    queue.put(i.next()); 
} 
thread.interrupt(); 

Jeśli jednak zrobić tego wątku może zostać przerwane, gdy w kolejce znajdują się jeszcze elementy oczekujące na przetworzenie. Możesz rozważyć użycie poll zamiast take, co pozwoli wątkowi przetwarzania na przekroczenie limitu czasu i zakończenie, gdy odczekał chwilę bez nowych danych wejściowych.

+0

Tak, jest to problem, jeśli wątek został przerwany, gdy w kolejce nadal znajdują się elementy. Aby obejść ten problem, dodałem kod, aby upewnić się, że kolejka jest pusta przed przerwaniem wątku: while (queue.size()>0) Thread.currentThread().sleep(5000); MCS

+2

@MCS - uwaga dla przyszłych użytkowników, że twoje podejście tutaj jest hackerem i nie powinno być powielane w kodzie produkcyjnym z trzech powodów. Zawsze preferuje się znalezienie sposobu na zamknięcie zamknięcia. Nigdy nie można używać 'Thread.sleep()' jako alternatywy dla właściwego haka. W innych implementacjach inne wątki mogą umieszczać rzeczy w kolejce, a pętla while nigdy się nie kończy. –

+0

Na szczęście nie ma potrzeby polegać na takich hakach, ponieważ równie łatwo można je przetworzyć * po * przerwaniu, jeśli to konieczne. Na przykład "wyczerpująca" implementacja 'take()' może wyglądać następująco: 'try {return take(); } catch (InterruptedException e) {E o = poll(); if (o == null) throw e; Thread.currentThread(). Interrupt(); return o; } 'Jednak nie ma powodu, dla którego musi być zaimplementowana na tej warstwie, a jej implementacja nieco wyżej doprowadzi do bardziej wydajnego kodu (np. Poprzez uniknięcie elementu" InterruptedException "na poziomie elementu i/lub użycie' BlockingQueue.DrainTo () '). – antak

0

Lub nie przerywaj, to jest paskudne.

public class MyQueue<T> extends ArrayBlockingQueue<T> { 

     private static final long serialVersionUID = 1L; 
     private boolean done = false; 

     public ParserQueue(int capacity) { super(capacity); } 

     public void done() { done = true; } 

     public boolean isDone() { return done; } 

     /** 
     * May return null if producer ends the production after consumer 
     * has entered the element-await state. 
     */ 
     public T take() throws InterruptedException { 
      T el; 
      while ((el = super.poll()) == null && !done) { 
       synchronized (this) { 
        wait(); 
       } 
      } 

      return el; 
     } 
    } 
  1. gdy producent umieszcza obiekt do kolejki, połączenia queue.notify(), jeśli kończy się, połączenia queue.done()
  2. pętli while (! Queue.isDone() ||! Queue.isEmpty())
  3. Test odbioru() wartość zwracana za zerową
+0

Powiedziałbym, że poprzednie rozwiązanie było czystsze i prostsze niż ta – sakthisundar

+0

Gotowe flagi są takie same jak trucizna pigułka, jej po prostu inaczej podawano :) –

+0

Czystsze? Wątpię. Nie wiesz, kiedy dokładnie przerwano wątek. Albo pozwól mi zapytać, co z tym jest czystsze? To mniej kodu, to fakt. – tomasb

12

bardzo późno, ale mam nadzieję, że to pomaga też inny jak i w obliczu podobnego problemu i używane poll podejście sugerowane przez erickson above z niewielkimi zmianami,

class MyObjHandler implements Runnable 
{ 
    private final BlockingQueue<MyObj> queue; 
    public volatile boolean Finished; //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL 
    public MyObjHandler(BlockingQueue queue) 
    { 
     this.queue = queue; 
     Finished = false; 
    } 
    @Override 
    public void run() 
    {   
     while (true) 
     { 
      try 
      { 
       MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS); 
       if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return 
       { 
        // process obj here 
        // ... 
       } 
       if(Finished && queue.isEmpty()) 
        return; 

      } 
      catch (InterruptedException e) 
      {     
       return; 
      } 
     } 
    } 
} 

public void testHandler() 
{ 
    BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 

    MyObjHandler handler = new MyObjHandler(queue); 
    new Thread(handler).start(); 

    // get objects for handler to process 
    for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext();) 
    { 
     queue.put(i.next()); 
    } 

    // what code should go here to tell the handler to stop waiting for more objects? 
    handler.Finished = true; //THIS TELLS HIM 
    //If you need you can wait for the termination otherwise remove join 
    myThread.join(); 
} 

to rozwiązać oba problemy

  1. zasygnalizowała BlockingQueue tak, że wie, że nie musi czekać więcej elementów
  2. Nie przerwane pomiędzy, tak że bloki przetwarzania kończą się tylko wtedy, gdy wszystkie elementy w kolejce są przetwarzane i nie ma żadnych elementów, które pozostały do ​​dodania
+2

Utwórz 'Finished' zmienną' volatile', aby zagwarantować widoczność między wątkami. Zobacz http://stackoverflow.com/a/106787 – lukk

+0

@lukk dzięki za odpowiedź, zredagowałem odpowiedź, aby uwzględnić Twój punkt ..... – dbw

+2

Jeśli się nie mylę, ostatni element w kolejce nie będzie obrobiony. Kiedy weźmiesz ostatni element z kolejki, skończone ma wartość true, a kolejka jest pusta, więc powróci przed obsługą tego ostatniego elementu. Dodaj trzeci warunek jeśli (Finished && queue.isEmpty() && obj == null) –