2009-07-15 7 views
34

Mam program wielowątkowy, w którym utworzę funkcję generatora, a następnie przekazuję ją do nowych wątków. Chcę, aby był on podzielony/globalny, aby każdy wątek mógł uzyskać następną wartość z generatora.Czy generatory są bezpieczne dla wątków?

Czy można bezpiecznie używać takiego generatora, czy też napotkam problemy/warunki uzyskujące dostęp do współużytkowanego generatora z wielu wątków?

Jeśli nie, czy jest lepszy sposób podejścia do problemu? Potrzebuję czegoś, co przejdzie przez listę i wytworzy następną wartość dla dowolnego wątku, który ją wywoła.

Odpowiedz

49

To nie jest wątek bezpieczne; jednoczesne wywołania mogą przeplatać się i powodować bałagan ze zmiennymi lokalnymi.

Powszechnie stosuje się wzorzec master-slave (obecnie nazywany wzorem rolnika-pracownika na PC). Zrób trzeci wątek, który generuje dane, i dodaj kolejkę między urządzeniem master i slave, gdzie slave odczyta z kolejki, a master zapisze do niej. Standardowy moduł kolejki zapewnia wymagane bezpieczeństwo wątków i organizuje blokowanie wzorca, dopóki urządzenia podrzędne nie będą gotowe do odczytu większej ilości danych.

+7

Zdecydowanie +1 dla Queue.Queue, świetny sposób na uporządkowanie systemu nawlekania, jeśli ma to zastosowanie (co jest w większości przypadków i zdecydowanie dla tego zadania). –

-7

To zależy od python realizacja używasz. W CPython, GIL sprawia, że ​​wszystkie operacje na obiektach Pythona są bezpieczne dla wątków, ponieważ tylko jeden wątek może wykonywać kod w danym momencie.

http://en.wikipedia.org/wiki/Global_Interpreter_Lock

+1

"GIL powoduje, że wszystkie operacje na obiektach Pythona są niebezpieczne" - huh? wszystkie operacje nie są atomowe –

+6

Jest to niebezpiecznie mylące. GIL oznacza jedynie, że kod Pythona nie uszkodzi stanu Pythona w środowisku wielowątkowym: nie można zmienić wątków w środku bajtowego op. (Na przykład możesz zmodyfikować udostępniony dict, nie uszkadzając go.) Nadal możesz zmieniać wątki między dowolnymi dwoma opcjami bajtowymi. –

40

Edited by dodać odniesienia poniżej.

Można owinąć generator blokadą. Na przykład,

import threading 
class LockedIterator(object): 
    def __init__(self, it): 
     self.lock = threading.Lock() 
     self.it = it.__iter__() 

    def __iter__(self): return self 

    def next(self): 
     self.lock.acquire() 
     try: 
      return self.it.next() 
     finally: 
      self.lock.release() 

gen = [x*2 for x in [1,2,3,4]] 
g2 = LockedIterator(gen) 
print list(g2) 

Blokowanie odbywa 50ms w moim systemie, Kolejka trwa 350ms. Kolejka jest przydatna, gdy naprawdę masz kolejkę; na przykład, jeśli masz przychodzące żądania HTTP i chcesz je umieścić w kolejce do przetworzenia przez wątki robocze. (To nie pasuje do modelu iteratora Pythona - gdy skończy się iterator elementów, to jest zrobione.) Jeśli naprawdę masz iterator, to LockedIterator jest szybszym i prostszym sposobem na zabezpieczenie wątku.

from datetime import datetime 
import threading 
num_worker_threads = 4 

class LockedIterator(object): 
    def __init__(self, it): 
     self.lock = threading.Lock() 
     self.it = it.__iter__() 

    def __iter__(self): return self 

    def next(self): 
     self.lock.acquire() 
     try: 
      return self.it.next() 
     finally: 
      self.lock.release() 

def test_locked(it): 
    it = LockedIterator(it) 
    def worker(): 
     try: 
      for i in it: 
       pass 
     except Exception, e: 
      print e 
      raise 

    threads = [] 
    for i in range(num_worker_threads): 
     t = threading.Thread(target=worker) 
     threads.append(t) 
     t.start() 

    for t in threads: 
     t.join() 

def test_queue(it): 
    from Queue import Queue 
    def worker(): 
     try: 
      while True: 
       item = q.get() 
       q.task_done() 
     except Exception, e: 
      print e 
      raise 

    q = Queue() 
    for i in range(num_worker_threads): 
     t = threading.Thread(target=worker) 
     t.setDaemon(True) 
     t.start() 

    t1 = datetime.now() 

    for item in it: 
     q.put(item) 

    q.join() 

start_time = datetime.now() 
it = [x*2 for x in range(1,10000)] 

test_locked(it) 
#test_queue(it) 
end_time = datetime.now() 
took = end_time-start_time 
print "took %.01f" % ((took.seconds + took.microseconds/1000000.0)*1000) 
+1

Mniej sprawny niż przy użyciu Queue.Queue, ale pięknie wykonany. – gooli