2013-06-21 26 views
12

Mam ten problem w PythonieWypełnienie kolejkę i zarządzania wieloprocesorowe w Pythonie

  • Mam kolejkę adresów URL, które muszę sprawdzić od czasu do czasu
  • jeśli kolejka jest wypełniona muszę przetworzyć każdy element w kolejce
  • Każda pozycja w kolejce muszą być przetwarzane przez jeden proces (wieloprocesorowej)

do tej pory udało mi się to osiągnąć „ręcznie” tak:

while 1: 
     self.updateQueue() 

     while not self.mainUrlQueue.empty(): 
      domain = self.mainUrlQueue.get() 

      # if we didn't launched any process yet, we need to do so 
      if len(self.jobs) < maxprocess: 
       self.startJob(domain) 
       #time.sleep(1) 
      else: 
       # If we already have process started we need to clear the old process in our pool and start new ones 
       jobdone = 0 

       # We circle through each of the process, until we find one free ; only then leave the loop 
       while jobdone == 0: 
        for p in self.jobs : 
         #print "entering loop" 
         # if the process finished 
         if not p.is_alive() and jobdone == 0: 
          #print str(p.pid) + " job dead, starting new one" 
          self.jobs.remove(p) 
          self.startJob(domain) 
          jobdone = 1 

Jednak prowadzi to do mnóstwa problemów i błędów. Zastanawiałem się, czy nie lepiej mi pasuje, używając puli procesów. Jaki byłby właściwy sposób na zrobienie tego?

Jednak wiele razy moja kolejka jest pusta i może zostać wypełniona 300 punktami na sekundę, więc nie jestem zbyt pewny, jak to zrobić.

Odpowiedz

20

Można użyć funkcji blokowania queue w celu odrodzenia wielu procesów podczas uruchamiania (przy użyciu multiprocessing.Pool) i pozostawienia ich w stanie uśpienia do czasu, gdy niektóre dane będą dostępne w kolejce do przetworzenia. Jeśli nie jesteś zaznajomiony z tym, można spróbować „play” z tego prostego programu:

import multiprocessing 
import os 
import time 

the_queue = multiprocessing.Queue() 


def worker_main(queue): 
    print os.getpid(),"working" 
    while True: 
     item = queue.get(True) 
     print os.getpid(), "got", item 
     time.sleep(1) # simulate a "long" operation 

the_pool = multiprocessing.Pool(3, worker_main,(the_queue,)) 
#       don't forget the coma here^

for i in range(5): 
    the_queue.put("hello") 
    the_queue.put("world") 


time.sleep(10) 

Testowane z Pythona 2.7.3 na Linux

To będzie tarło 3 procesy (w dodatku z proces macierzysty). Każde dziecko wykonuje funkcję worker_main. Jest to prosta pętla pobierająca nowy element z kolejki w każdej iteracji. Pracownicy będą blokować, jeśli nic nie będzie gotowe do przetworzenia.

Podczas uruchamiania wszystkie 3 procesy będą uśpione, dopóki kolejka nie zostanie zasilona niektórymi danymi. Kiedy dane są dostępne, jeden z oczekujących pracowników otrzymuje ten przedmiot i zaczyna go przetwarzać. Następnie próbuje uzyskać inny element z kolejki, czekając ponownie, jeśli nic nie jest dostępne ...

+0

to nie działa na windowsach w python 2.7.4, musisz mieć __name__ = '__main__' część i powinieneś przekazać the_queue jako trzeci parametr do funkcji multiprocessing.Pool, w przeciwnym razie worker_main nie otrzyma danych – jhexp

+0

Jestem także zainteresowany tym, jak sprawić, by ten fragment kodu działał. Kiedy uruchomię go tak, jak jest, to działa, ale nic nie wypisuje, prawdopodobnie dlatego, że worker_main nie otrzymuje danych. Ale kiedy przekazuję the_queue jako trzeci parametr, otrzymuję argument TypeError: worker_main() po * musi być sekwencją, a nie Queue – ziky90

+0

@ ziky90 Prawdopodobnie zapomniałeś o śpiączce w '(queue,)'. Zmodyfikowałem kod, aby dodać komentarz wskazujący na możliwe źródło błędu. –