2016-06-14 25 views
6

Chcę uruchomić funkcję równolegle i czekać, aż wszystkie węzły równoległe zostaną wykonane, używając joblib. Podobnie jak w przykładzie:W jaki sposób możemy użyć tqdm w równoległym wykonaniu z joblib?

from math import sqrt 
from joblib import Parallel, delayed 
Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10)) 

Ale chcę, że wykonanie będzie widoczne w jednym ProgressBar jak z tqdm, pokazując, jak wiele pracy zostało zakończone.

Jak byś to zrobił?

Odpowiedz

4

Jeśli problem składa się z wielu części, można podzielić na części k podgrup, uruchom Każda podgrupa równolegle i zaktualizować progressbar pomiędzy, powodując k aktualizacjach toku.

Zostało to przedstawione w poniższym przykładzie z dokumentacji.

>>> with Parallel(n_jobs=2) as parallel: 
... accumulator = 0. 
... n_iter = 0 
... while accumulator < 1000: 
...  results = parallel(delayed(sqrt)(accumulator + i ** 2) 
...       for i in range(5)) 
...  accumulator += sum(results) # synchronization barrier 
...  n_iter += 1 

https://pythonhosted.org/joblib/parallel.html#reusing-a-pool-of-workers

4

Tutaj możliwe Obejście

def func(x): 
    time.sleep(random.randint(1, 10)) 
    return x 

def text_progessbar(seq, total=None): 
    step = 1 
    tick = time.time() 
    while True: 
     time_diff = time.time()-tick 
     avg_speed = time_diff/step 
     total_str = 'of %n' % total if total else '' 
     print('step', step, '%.2f' % time_diff, 
       'avg: %.2f iter/sec' % avg_speed, total_str) 
     step += 1 
     yield next(seq) 

all_bar_funcs = { 
    'tqdm': lambda args: lambda x: tqdm(x, **args), 
    'txt': lambda args: lambda x: text_progessbar(x, **args), 
    'False': lambda args: iter, 
    'None': lambda args: iter, 
} 

def ParallelExecutor(use_bar='tqdm', **joblib_args): 
    def aprun(bar=use_bar, **tq_args): 
     def tmp(op_iter): 
      if str(bar) in all_bar_funcs.keys(): 
       bar_func = all_bar_funcs[str(bar)](tq_args) 
      else: 
       raise ValueError("Value %s not supported as bar type"%bar) 
      return Parallel(**joblib_args)(bar_func(op_iter)) 
     return tmp 
    return aprun 

aprun = ParallelExecutor(n_jobs=5) 

a1 = aprun(total=25)(delayed(func)(i ** 2 + j) for i in range(5) for j in range(5)) 
a2 = aprun(total=16)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4)) 
a2 = aprun(bar='txt')(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4)) 
a2 = aprun(bar=None)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))