2010-11-11 11 views
7

Próba opanowania sposobu myślenia w kategoriach aktorów zamiast wątków. Jestem nieco zaskoczony następującym przypadkiem użycia:Przejście z modelu wątku na aktorów

Rozważmy system, który ma proces producenta, który tworzy pracę (np. Czytając dane z pliku) i szereg procesów roboczych, które pochłaniają pracę (np. parsując dane i zapisując je w bazie danych). Stawki, przy których praca jest wytwarzana i zużywana, mogą się różnić, a system powinien być odporny na to. Na przykład, jeśli robotnicy nie mogą nadążyć, producent powinien to wykryć i ostatecznie zwolnić lub poczekać.

Jest to dość łatwe do wdrożenia z wątków:

val producer:Iterator[Work] = createProducer() 
val queue = new LinkedBlockingQueue[Work](QUEUE_SIZE) 
val workers = (0 until NUM_WORKERS) map { i => 
    new Thread() { 
    override def run() = { 
     while (true) { 
     try { 
      // take next unit of work, waiting if necessary 
      val work = queue.take() 
      process(work) 
     } 
     catch { 
      case e:InterruptedException => return 
     } 
     } 
    } 
    } 
} 

// start the workers 
workers.foreach(_.start()) 

while (producer.hasNext) { 
    val work = producer.next() 
    // add new unit of work, waiting if necessary 
    queue.put(work) 
} 

while (!queue.isEmpty) { 
    // wait until queue is drained 
    queue.wait() 
} 

// stop the workers 
workers.foreach(_.interrupt()) 

Nie ma nic naprawdę złego w tym modelu, a ja z powodzeniem stosować go wcześniej. Ten przykład jest prawdopodobnie zbyt obszerny, ponieważ użycie Executora lub usługi CompletionService dobrze by pasowało do tego zadania. Ale podoba mi się abstrakcja aktora i myślę, że w wielu przypadkach łatwiej jest o nim myśleć. Czy istnieje sposób na przepisanie tego przykładu za pomocą aktorów, szczególnie upewniając się, że nie ma przepełnienia bufora (np. Pełne skrzynki pocztowe, upuszczone wiadomości itp.)?

Odpowiedz

3

Ponieważ aktorzy przetwarzają wiadomości "offline" (tj. Konsumpcja wiadomości nie jest powiązana z ich otrzymywaniem), trudno jest zobaczyć, w jaki sposób można uzyskać dokładny analog "producenta czeka, aż konsumenci nadrobią zaległości".

Jedyne co mogę myśleć o to, że konsumenci zwracają się o pracę z aktorem producenta (który używa reply):

case object MoreWorkPlease 
class Consumer(prod : Producer) extends Actor { 
    def act = { 
    prod ! MoreWorkPlease 
    loop { 
     react { 
     case Work(payload) => doStuff(payload); reply(MoreWorkPlease) 
     } 
    } 
    } 
} 

class Producer extends Actor { 
    def act = loop { 
    react { 
     case MoreWorkPlease => reply(Work(getNextItem)) 
    } 
    } 
} 

To nie jest idealny, oczywiście, ponieważ producent nie „czytać naprzód "i działa tylko wtedy, gdy konsument jest na to gotowy. Użycie byłoby takie, jak:

val prod = new Producer 
(1 to NUM_ACTORS).map(new Consumer(prod)).foreach(_.start()) 
prod.start() 
+0

Hm, to jedno rozwiązanie, o którym myślałem. Prawdopodobnie wystarcza, ale martwię się tym, że jeśli robotnicy przewyższają producenta, brak bufora roboczego powoduje pogorszenie wydajności. – toluju

+0

@toluju - Zacznij od tego, aby każdy konsument prosił o pracę i niech producent nie reaguje na te wiadomości, ale je odbiera i umieszcza je w kolejce, jeśli nie ma jeszcze więcej pracy do wykonania. (Następnie, gdy jest praca, może ją podzielić na elementy w kolejce.) –