2010-02-25 12 views
46

Chciałbym stworzyć pewnego rodzaju aplikacji wątku Producer/Consumer. Ale nie jestem pewien, jaki jest najlepszy sposób wdrożenia kolejki między tymi dwoma.Produktu/wątki konsumenckie za pomocą kolejki

Więc mam parę pomysłów (oba mogą być całkowicie błędne). Chciałbym wiedzieć, który byłby lepszy i jeśli oboje będą ssać, to jaki byłby najlepszy sposób wdrożenia kolejki. Chodzi mi głównie o moją implementację kolejki w tych przykładach. Rozszerzam klasę kolejki, która jest klasą wewnętrzną i jest bezpieczna dla wątków. Poniżej znajdują się dwa przykłady z 4 klasami każdy.

główna wykładowa

public class SomeApp 
{ 
    private Consumer consumer; 
    private Producer producer; 

    public static void main (String args[]) 
    { 
     consumer = new Consumer(); 
     producer = new Producer(); 
    } 
} 

Consumer wykładowa

public class Consumer implements Runnable 
{ 
    public Consumer() 
    { 
     Thread consumer = new Thread(this); 
     consumer.start(); 
    } 

    public void run() 
    { 
     while(true) 
     { 
      //get an object off the queue 
      Object object = QueueHandler.dequeue(); 
      //do some stuff with the object 
     } 
    } 
} 

Producent wykładowa

public class Producer implements Runnable 
{ 
    public Producer() 
    { 
     Thread producer = new Thread(this); 
     producer.start(); 
    } 

    public void run() 
    { 
     while(true) 
     { 
      //add to the queue some sort of unique object 
      QueueHandler.enqueue(new Object()); 
     } 
    } 
} 

Kolejka wykładowa

public class QueueHandler 
{ 
    //This Queue class is a thread safe (written in house) class 
    public static Queue<Object> readQ = new Queue<Object>(100); 

    public static void enqueue(Object object) 
    { 
     //do some stuff 
     readQ.add(object); 
    } 

    public static Object dequeue() 
    { 
     //do some stuff 
     return readQ.get(); 
    } 
} 

LUB

główna wykładowa

public class SomeApp 
{ 
    Queue<Object> readQ; 
    private Consumer consumer; 
    private Producer producer; 

    public static void main (String args[]) 
    { 
     readQ = new Queue<Object>(100); 
     consumer = new Consumer(readQ); 
     producer = new Producer(readQ); 
    } 
} 

Consumer wykładowa

public class Consumer implements Runnable 
{ 
    Queue<Object> queue; 

    public Consumer(Queue<Object> readQ) 
    { 
     queue = readQ; 
     Thread consumer = new Thread(this); 
     consumer.start(); 
    } 

    public void run() 
    { 
     while(true) 
     { 
      //get an object off the queue 
      Object object = queue.dequeue(); 
      //do some stuff with the object 
     } 
    } 
} 

Producent wykładowa

public class Producer implements Runnable 
{ 
    Queue<Object> queue; 

    public Producer(Queue<Object> readQ) 
    { 
     queue = readQ; 
     Thread producer = new Thread(this); 
     producer.start(); 
    } 

    public void run() 
    { 

     while(true) 
     { 
      //add to the queue some sort of unique object 
      queue.enqueue(new Object()); 
     } 
    } 
} 

Kolejka wykładowa

//the extended Queue class is a thread safe (written in house) class 
public class QueueHandler extends Queue<Object> 
{  
    public QueueHandler(int size) 
    { 
     super(size); //All I'm thinking about now is McDonalds. 
    } 

    public void enqueue(Object object) 
    { 
     //do some stuff 
     readQ.add(); 
    } 

    public Object dequeue() 
    { 
     //do some stuff 
     return readQ.get(); 
    } 
} 

Idź!

+0

Producenci skolączkowani i konsumenci piszą, przy okazji. Nie vice versa .. –

+1

Och i nie zaczynaj wątków od konstruktora !! Wątek ten mógł obserwować obiekt w niespójnym stanie. Aby uzyskać szczegółowe informacje, zapoznaj się z częścią "Współbieżność Java w praktyce". –

+0

Dzięki Zwei, ta kolejka sprawiła, że ​​byłem nieostry. zaczynając wątek od rzeczy konstruktora powinien raczej uruchomić metodę intiliazation i rozpocząć go tam, czy powinien on uruchomić go z głównej klasy metody? – Gareth

Odpowiedz

69

Java 5+ ma wszystkie narzędzia potrzebne do tego rodzaju rzeczy. Będziesz chciał:

  1. Umieść wszystkich swoich producentów w jednym ExecutorService;
  2. Umieść wszystkich swoich Konsumentów w innym ExecutorService;
  3. W razie potrzeby należy porozumieć się między nimi za pomocą BlockingQueue.

Mówię "w razie potrzeby" dla (3), ponieważ z mojego doświadczenia wynika, że ​​jest to niepotrzebny krok. Wszystko, co robisz, to przesyłanie nowych zadań do usługi wykonawców konsumenckich.Więc:

final ExecutorService producers = Executors.newFixedThreadPool(100); 
final ExecutorService consumers = Executors.newFixedThreadPool(100); 
while (/* has more work */) { 
    producers.submit(...); 
} 
producers.shutdown(); 
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 
consumers.shutdown(); 
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 

Więc producers przesłać bezpośrednio do consumers.

+2

Cletus ma prawo do pieniędzy, aby uzyskać więcej informacji, aby wyjaśnić, "od czego zacząć" http://java.sun.com/docs/books/tutorial/essential/concurenrency/ – edwardsmatt

+0

"Tak więc producenci składają bezpośrednio do konsumentów" - Czy można bezpiecznie wywoływać funkcję consumer.submit (...) równolegle, czy też powinienem się zsynchronizować? – Gevorg

+0

Jeśli udostępniasz BlockingQueue, możesz użyć jednego executora dla producentów i konsumentów? – devo

9

Wymyślasz koło.

Jeśli potrzebujesz trwałości i innych funkcji przedsiębiorstwa, użyj JMS (sugerowałbym ActiveMq).

Jeśli potrzebujesz szybkich kolejek w pamięci, użyj jednej z implikacji języka Java Queue.

Jeśli potrzebujesz obsługi java 1.4 lub wcześniejszej wersji, skorzystaj z doskonałego pakietu Doug Lea: concurrent.

+4

Nadal możesz zostać poproszony o wdrożenie Producer Consumer podczas rozmowy kwalifikacyjnej :) –

+0

Używam narzędzia java.util.concurrent przydatnego, ale trudno mi to nazwać "doskonałym", podczas gdy wciąż zmusza mnie do zdania dwóch parametry tylko w celu określenia limitu czasu. Czy zabiłby Douga, by stworzyć klasę o nazwie Duration? – Trejkaz

17

OK, jak zauważają inni, najlepiej jest użyć pakietu java.util.concurrent. Gorąco polecam "Współbieżność Java w praktyce". To świetna książka, która obejmuje prawie wszystko, co musisz wiedzieć.

Co do konkretnego wdrożenia, jak zauważyłem w komentarzach, nie zaczynaj wątków od konstruktorów - może to być niebezpieczne.

Pozostawiając to na boku, drugie wdrożenie wydaje się lepsze. Nie chcesz umieszczać kolejek w statycznych polach. Prawdopodobnie po prostu tracisz elastyczność za nic.

Jeśli chcesz kontynuować własną implementację (w celu nauki, chyba? Powinieneś skonstruować obiekt (możesz utworzyć instancję obiektu Thread), a następnie wywołać start(), aby rozpocząć wątek.

Edytuj: ExecutorService mają własną kolejkę, więc może to być mylące. Oto coś, od czego zaczniesz.

public class Main { 
    public static void main(String[] args) { 
     //The numbers are just silly tune parameters. Refer to the API. 
     //The important thing is, we are passing a bounded queue. 
     ExecutorService consumer = new ThreadPoolExecutor(1,4,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(100)); 

     //No need to bound the queue for this executor. 
     //Use utility method instead of the complicated Constructor. 
     ExecutorService producer = Executors.newSingleThreadExecutor(); 

     Runnable produce = new Produce(consumer); 
     producer.submit(produce); 
    } 
} 

class Produce implements Runnable { 
    private final ExecutorService consumer; 

    public Produce(ExecutorService consumer) { 
     this.consumer = consumer; 
    } 

    @Override 
    public void run() { 
     Pancake cake = Pan.cook(); 
     Runnable consume = new Consume(cake); 
     consumer.submit(consume); 
    } 
} 

class Consume implements Runnable { 
    private final Pancake cake; 

    public Consume(Pancake cake){ 
     this.cake = cake; 
    } 

    @Override 
    public void run() { 
     cake.eat(); 
    } 
} 

Dalsze EDIT: Dla producenta, zamiast while(true), można zrobić coś takiego:

@Override 
public void run(){ 
    while(!Thread.currentThread().isInterrupted()){ 
     //do stuff 
    } 
} 

ten sposób można zamknięcie wykonawcę wywołując .shutdownNow(). Jeśli użyjesz while(true), to się nie wyłączy.

Należy również pamiętać, że Producer jest nadal podatny na RuntimeExceptions (czyli jeden RuntimeException zatrzyma przetwarzanie)

+0

Więc powinienem dodać metodę start() do konsumenta i producenta? Mówisz, że powinienem umieścić coś takiego w mojej głównej metodzie? konsument = nowy klient(); consumer.start (readQ); lub to? konsument = nowy konsument (readQ); consumer.start(); – Gareth

+1

Zwykle robisz nowy Comsumer (readQ); consumer.start() ;. W twoim przypadku wskazane jest zadeklarowanie prywatnego finału kolejki, a jeśli to zrobisz, musisz ustawić kolejkę w konstruktorze. Jeśli jest to kod produkcyjny, zdecydowanie radzę ci pójść z odpowiedzią płodu. Jeśli absolutnie potrzebujesz użyć własnej kolejki, powinieneś zamiast tego używać surowego wątku ExecutorService executor = Executors.newSingleThreadExecutor(). To między innymi ochroni Cię przed RuntimeException, który zatrzymuje system. –

+0

Dzięki. bardzo pomocne. Poszedłem z BlockingQueue jak cletus zasugerowany przez wewnętrzną kolejkę. Nadal staram się opanować klasę ExecutorService, ale kiedy to zrobię, zdecydowanie z niej skorzystam. Dzięki za pomoc. – Gareth

1
  1. kod Java „BlockingQueue”, który został zsynchronizowany put i dostać metody.
  2. Kod Java "Producent", wątek producenta do produkcji danych.
  3. Kod Java "Konsument", wątek konsumencki konsumujący wyprodukowane dane.
  4. Kod Java "ProducerConsumer_Main", główna funkcja uruchamiania wątku producenta i konsumenta.

BlockingQueue.java

public class BlockingQueue 
{ 
    int item; 
    boolean available = false; 

    public synchronized void put(int value) 
    { 
     while (available == true) 
     { 
      try 
      { 
       wait(); 
      } catch (InterruptedException e) { 
      } 
     } 

     item = value; 
     available = true; 
     notifyAll(); 
    } 

    public synchronized int get() 
    { 
     while(available == false) 
     { 
      try 
      { 
       wait(); 
      } 
      catch(InterruptedException e){ 
      } 
     } 

     available = false; 
     notifyAll(); 
     return item; 
    } 
} 

Consumer.java

package com.sukanya.producer_Consumer; 

public class Consumer extends Thread 
{ 
    blockingQueue queue; 
    private int number; 
    Consumer(BlockingQueue queue,int number) 
    { 
     this.queue = queue; 
     this.number = number; 
    } 

    public void run() 
    { 
     int value = 0; 

     for (int i = 0; i < 10; i++) 
     { 
      value = queue.get(); 
      System.out.println("Consumer #" + this.number+ " got: " + value); 
     } 
    } 
} 

ProducerConsumer_Main.java

package com.sukanya.producer_Consumer; 

public class ProducerConsumer_Main 
{ 
    public static void main(String args[]) 
    { 
     BlockingQueue queue = new BlockingQueue(); 
     Producer producer1 = new Producer(queue,1); 
     Consumer consumer1 = new Consumer(queue,1); 
     producer1.start(); 
     consumer1.start(); 
    } 
} 
+3

Zrzuty kodów bez wyjaśnienia rzadko są pomocne. – Chris

6

mam rozszerzony cletus zaproponował odpowiedź na przykładowy kod roboczy.

  1. Jeden ExecutorService (pes) akceptuje zadania Producer.
  2. Jeden ExecutorService (ces) akceptuje zadania Consumer.
  3. Zarówno Producer i Consumer akcji BlockingQueue.
  4. Wiele zadań Producer generuje różne liczby.
  5. Każdy z Consumer zadań może zużywać numer generowany przez Producer

Kod:

import java.util.concurrent.*; 

public class ProducerConsumerWithES { 
    public static void main(String args[]){ 
     BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>(); 

     ExecutorService pes = Executors.newFixedThreadPool(2); 
     ExecutorService ces = Executors.newFixedThreadPool(2); 

     pes.submit(new Producer(sharedQueue,1)); 
     pes.submit(new Producer(sharedQueue,2)); 
     ces.submit(new Consumer(sharedQueue,1)); 
     ces.submit(new Consumer(sharedQueue,2)); 
     // shutdown should happen somewhere along with awaitTermination 
     /* https://stackoverflow.com/questions/36644043/how-to-properly-shutdown-java-executorservice/36644320#36644320 */ 
     pes.shutdown(); 
     ces.shutdown(); 
    } 
} 
class Producer implements Runnable { 
    private final BlockingQueue<Integer> sharedQueue; 
    private int threadNo; 
    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) { 
     this.threadNo = threadNo; 
     this.sharedQueue = sharedQueue; 
    } 
    @Override 
    public void run() { 
     for(int i=1; i<= 5; i++){ 
      try { 
       int number = i+(10*threadNo); 
       System.out.println("Produced:" + number + ":by thread:"+ threadNo); 
       sharedQueue.put(number); 
      } catch (Exception err) { 
       err.printStackTrace(); 
      } 
     } 
    } 
} 

class Consumer implements Runnable{ 
    private final BlockingQueue<Integer> sharedQueue; 
    private int threadNo; 
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) { 
     this.sharedQueue = sharedQueue; 
     this.threadNo = threadNo; 
    } 
    @Override 
    public void run() { 
     while(true){ 
      try { 
       int num = sharedQueue.take(); 
       System.out.println("Consumed: "+ num + ":by thread:"+threadNo); 
      } catch (Exception err) { 
       err.printStackTrace(); 
      } 
     } 
    } 
} 

wyjściowa:

Produced:11:by thread:1 
Produced:21:by thread:2 
Produced:22:by thread:2 
Consumed: 11:by thread:1 
Produced:12:by thread:1 
Consumed: 22:by thread:1 
Consumed: 21:by thread:2 
Produced:23:by thread:2 
Consumed: 12:by thread:1 
Produced:13:by thread:1 
Consumed: 23:by thread:2 
Produced:24:by thread:2 
Consumed: 13:by thread:1 
Produced:14:by thread:1 
Consumed: 24:by thread:2 
Produced:25:by thread:2 
Consumed: 14:by thread:1 
Produced:15:by thread:1 
Consumed: 25:by thread:2 
Consumed: 15:by thread:1 

Uwaga. Jeśli nie potrzebujesz wielu producentów i konsumentów, zachowaj jednego producenta i konsumenta. Dodałem wielu Producentów i Konsumentów, aby zaprezentować możliwości BlockingQueue wśród wielu Producentów i Konsumentów.

+0

To nie zajmuje się wyścigiem, gdy jest wielu producentów i konsumentów. Każda widzi pojemność 0 i próbuje dodać. Z pojedynczym producentem i pojedynczym konsumentem nie ma potrzeby synchronizowania w BlockingQueue, jeśli jest więcej niż jedna wymagana synchronizacja. – Cleonjoys

+0

Możesz zrobić jedną rzecz, skomentować konsumentów, a następnie ustawić Fixed size dla BlockingQueue, zobaczysz siebie. Próbowałem twój kod z nowym LinkedBlockingQueue (2); wyprowadzane były jak poniżej: Produkcja: 11: do nici: 1 wymienić: 21: do nici: 2 wymienić: 22: do nici: 2 wymienić: 12: przez nici: 1 Jak można bardziej wartości zostały wstawione, gdy ustawiona pojemność kolejki wynosiła 2 – Cleonjoys

+0

Taka jest natura BlockingQueue. Jeśli pojemność nie będzie dostępna, zostanie zablokowana. Używam nieograniczonej kolejki blokującej, a powyższy przypadek się nie pojawia. Nawet jeśli powstaje w wyniku ograniczonego BlockingQueue, to sposób w jaki Java go zaimplementowała. Sprawdź https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/LinkedBlockingQueue.html#put-E-. Fragment kodu w moim poście nie ma żadnych problemów. –

1

To jest bardzo prosty kod.

import java.util.*; 

// @author : rootTraveller, June 2017 

class ProducerConsumer { 
    public static void main(String[] args) throws Exception { 
     Queue<Integer> queue = new LinkedList<>(); 
     Integer buffer = new Integer(10); //Important buffer or queue size, change as per need. 

     Producer producerThread = new Producer(queue, buffer, "PRODUCER"); 
     Consumer consumerThread = new Consumer(queue, buffer, "CONSUMER"); 

     producerThread.start(); 
     consumerThread.start(); 
    } 
} 

class Producer extends Thread { 
    private Queue<Integer> queue; 
    private int queueSize ; 

    public Producer (Queue<Integer> queueIn, int queueSizeIn, String ThreadName){ 
     super(ThreadName); 
     this.queue = queueIn; 
     this.queueSize = queueSizeIn; 
    } 

    public void run() { 
     while(true){ 
      synchronized (queue) { 
       while(queue.size() == queueSize){ 
        System.out.println(Thread.currentThread().getName() + " FULL   : waiting...\n"); 
        try{ 
         queue.wait(); //Important 
        } catch (Exception ex) { 
         ex.printStackTrace(); 
        } 
       } 

       //queue empty then produce one, add and notify 
       int randomInt = new Random().nextInt(); 
       System.out.println(Thread.currentThread().getName() + " producing... : " + randomInt); 
       queue.add(randomInt); 
       queue.notifyAll(); //Important 
      } //synchronized ends here : NOTE 
     } 
    } 
} 

class Consumer extends Thread { 
    private Queue<Integer> queue; 
    private int queueSize; 

    public Consumer(Queue<Integer> queueIn, int queueSizeIn, String ThreadName){ 
     super (ThreadName); 
     this.queue = queueIn; 
     this.queueSize = queueSizeIn; 
    } 

    public void run() { 
     while(true){ 
      synchronized (queue) { 
       while(queue.isEmpty()){ 
        System.out.println(Thread.currentThread().getName() + " Empty  : waiting...\n"); 
        try { 
         queue.wait(); //Important 
        } catch (Exception ex) { 
         ex.printStackTrace(); 
        } 
       } 

       //queue empty then consume one and notify 
       System.out.println(Thread.currentThread().getName() + " consuming... : " + queue.remove()); 
       queue.notifyAll(); 
      } //synchronized ends here : NOTE 
     } 
    } 
}