2014-07-27 27 views
7

Czytałem w niektórych źródeł, że komenda druku nie jest bezpieczny wątku, a obejście jest użycie sys.stdout.write polecenie zamiast, ale nadal nie działa dla mnie i piśmie do STDOUT nie jest atomowy.Jak mogę zrobić zapis atomowy na stdout w pythonie?

Oto krótki przykład (nazywany ten plik parallelExperiment.py):

import os 
    import sys 
    from multiprocessing import Pool 

    def output(msg): 
    msg = '%s%s' % (msg, os.linesep) 
    sys.stdout.write(msg) 

    def func(input): 
    output(u'pid:%d got input \"%s\"' % (os.getpid(), str(input))) 

    def executeFunctionInParallel(funcName, inputsList, maxParallelism): 
     output(u'Executing function %s on input of size %d with maximum parallelism of %d' % (
      funcName.__name__, len(inputsList), maxParallelism)) 
     parallelismPool = Pool(processes=maxParallelism) 
     executeBooleanResultsList = parallelismPool.map(funcName, inputsList) 
     parallelismPool.close() 
     output(u'Function %s executed on input of size %d with maximum parallelism of %d' % (
      funcName.__name__, len(inputsList), maxParallelism)) 
     # if all parallel executions executed well - the boolean results list should all be True 
     return all(executeBooleanResultsList) 

    if __name__ == "__main__": 
    inputsList=[str(i) for i in range(20)] 
    executeFunctionInParallel(func, inputsList, 4) 

Spójrz na wyjściu:

ja. Wyjście nazywając Pythona parallelExperiment.py (zwróć uwagę, że słowo "PID" jest zawiedli w niektórych liniach):

Executing function func on input of size 20 with maximum parallelism of 4 
ppid:2240 got input "0" 
id:4960 got input "2" 
pid:4716 got input "4" 
pid:4324 got input "6" 
ppid:2240 got input "1" 
id:4960 got input "3" 
pid:4716 got input "5" 
pid:4324 got input "7" 
ppid:4960 got input "8" 
id:2240 got input "10" 
pid:4716 got input "12" 
pid:4324 got input "14" 
ppid:4960 got input "9" 
id:2240 got input "11" 
pid:4716 got input "13" 
pid:4324 got input "15" 
ppid:4960 got input "16" 
id:2240 got input "18" 
ppid:2240 got input "19" 
id:4960 got input "17" 
Function func executed on input of size 20 with maximum parallelism of 4 

ii. Wyjście nazywając pyton parallelExperiment.py> parallelExperiment.log, czyli przekierowanie stdout do parallelExperiment.log pliku (należy zwrócić uwagę, że kolejność wierszy nie jest dobre, bo przed i po wywołaniu do executeFunctionInParallel który wywołuje func równolegle, wiadomość powinna zostać wydrukowane):

pid:3244 got input "4" 
pid:3244 got input "5" 
pid:3244 got input "12" 
pid:3244 got input "13" 
pid:240 got input "0" 
pid:240 got input "1" 
pid:240 got input "8" 
pid:240 got input "9" 
pid:240 got input "16" 
pid:240 got input "17" 
pid:1268 got input "2" 
pid:1268 got input "3" 
pid:1268 got input "10" 
pid:1268 got input "11" 
pid:1268 got input "18" 
pid:1268 got input "19" 
pid:3332 got input "6" 
pid:3332 got input "7" 
pid:3332 got input "14" 
pid:3332 got input "15" 
Executing function func on input of size 20 with maximum parallelism of 4 
Function func executed on input of size 20 with maximum parallelism of 4 
+0

Ten niedopasowany wynik jest tym, czego można się spodziewać po wielowątkowości. Tworzysz wątki, które mają własną oś czasu. Fakt, że bash morduje ciąg * pid *, nie jest tak naprawdę rzadki. W większości przypadków, tak jak powiedziałeś, nie akceptuje on wielu receptorów. Po prostu drukuje to, co robi, a czasem przejmuje nakładające się wywołania zwrotne. –

Odpowiedz

7

to się stało z powodu multiprocessing.Pool faktycznie korzysta z podprocesów zamiast wątków. Musisz użyć jawnego synchronization między procesami. Zwróć uwagę na przykładowy link, który rozwiązuje twój problem.

import os 
import sys 
from multiprocessing import Pool, Lock 

lock = Lock() 

def output(msg): 
    msg = '%s%s' % (msg, os.linesep) 
    with lock: 
     sys.stdout.write(msg) 

def func(input): 
    output(u'pid:%d got input \"%s\"' % (os.getpid(), str(input))) 

def executeFunctionInParallel(funcName, inputsList, maxParallelism): 
    output(u'Executing function %s on input of size %d with maximum parallelism of %d' % (
     funcName.__name__, len(inputsList), maxParallelism)) 
    parallelismPool = Pool(processes=maxParallelism) 
    executeBooleanResultsList = parallelismPool.map(funcName, inputsList) 
    parallelismPool.close() 
    output(u'Function %s executed on input of size %d with maximum parallelism of %d' % (
     funcName.__name__, len(inputsList), maxParallelism)) 
    # if all parallel executions executed well - the boolean results list should all be True 
    return all(executeBooleanResultsList) 

if __name__ == "__main__": 
    inputsList=[str(i) for i in range(20)] 
    executeFunctionInParallel(func, inputsList, 4) 
+0

i czy mogę użyć tej metody Pool.map? przykładem jest użycie obiektu "proces" –

+0

Oczywiście, że możesz. Pula sama korzysta z obiektów procesu wewnętrznie. –

+0

Przykro mi, ale nie podążam za ... co powinienem zmienić w ** metodach executeFunctionInParallel ** i ** func **? –

1

Jeśli chcesz uniknąć blokowania i chętnie idzie do interfejsu niższego poziomu, można uzyskać POSIX O_APPEND zachowanie z os.open, os.write (jeśli system je obsługuje); i zobacz Is file append atomic in UNIX?.