2017-08-30 99 views
16

Oto mój główny program faktoryzacji, dodałem funkcję zwrotną w pool.apply_async(findK, args=(N,begin,end)), komunikat informujący o wyjściu prime factorization is over po zakończeniu faktoryzacji, działa dobrze.Jak przekazać wystąpienie multiprocessing.Pool do funkcji zwrotnej apply_async?

import math 
import multiprocessing 

def findK(N,begin,end): 
    for k in range(begin,end): 
     if N% k == 0: 
      print(N,"=" ,k ,"*", N/k) 
      return True 
    return False 


def prompt(result): 
    if result: 
     print("prime factorization is over") 


def mainFun(N,process_num): 
    pool = multiprocessing.Pool(process_num) 
    for i in range(process_num): 
     if i ==0 : 
      begin =2 
     else: 
      begin = int(math.sqrt(N)/process_num*i)+1 
     end = int(math.sqrt(N)/process_num*(i+1)) 
     pool.apply_async(findK, args=(N,begin,end) , callback = prompt)  
    pool.close() 
    pool.join()  

if __name__ == "__main__": 
    N = 684568031001583853 
    process_num = 16 
    mainFun(N,process_num) 

Teraz chcę zmienić funkcję zwrotną w apply_async, aby zmienić wiersz do funkcji zamykania systemu, aby zabić wszystkich innych procesów.

def prompt(result): 
    if result: 
     pool.terminate() 

Instancja puli nie jest zdefiniowana w obszarze zasięgu lub przekazana do monitu.
pool.terminate() nie może działać w trybie szybkiego podglądu.
Jak przekazać instancję wieloprocesorową do funkcji apply_async'callback?
(zrobiłem to zrobić w formacie klasy, po prostu dodać metodę klasy i wywołać self.pool.terminate może zabić wszystkie inne procesy, jak wykonać zadanie w formacie funkcji?)

jeśli nie ustawiono basen jako zmienną globalną, pula może zostać przekazana do funkcji zwrotnej?

Odpowiedz

7

Passing dodatkowe argumenty funkcji zwrotnej nie są obsługiwane. Ale masz mnóstwo eleganckich sposobów na obejście tego.

można upakować logiki basen w obiekcie:

class Executor: 
    def __init__(self, process_num): 
     self.pool = multiprocessing.Pool(process_num) 

    def prompt(self, result): 
     if result: 
      print("prime factorization is over") 
      self.pool.terminate() 

    def schedule(self, function, args): 
     self.pool.apply_async(function, args=args, callback=self.prompt) 

    def wait(self): 
     self.pool.close() 
     self.pool.join() 


def main(N,process_num): 
    executor = Executor(process_num) 
    for i in range(process_num): 
     ... 
     executor.schedule(findK, (N,begin,end)) 
    executor.wait() 

Albo można użyć realizację concurrent.futures.Executor która zwraca Future obiekt. Przed ustawieniem wywołania zwrotnego wystarczy dołączyć pulę do obiektu Future.

def prompt(future): 
    if future.result(): 
     print("prime factorization is over") 
     future.pool_executor.shutdown(wait=False) 

def main(N,process_num): 
    executor = concurrent.futures.ProcessPoolExecutor(max_workers=process_num) 
    for i in range(process_num): 
     ... 
     future = executor.submit(findK, N,begin,end) 
     future.pool_executor = executor 
     future.add_done_callback(prompt) 
4

Musisz mieć pool koniec w środowisku prompt. Jedną z możliwości jest przeniesienie pool do zasięgu globalnego (choć nie jest to najlepsza praktyka). To wydaje się działać:

import math 
import multiprocessing 

pool = None 

def findK(N,begin,end): 
    for k in range(begin,end): 
     if N% k == 0: 
      print(N,"=" ,k ,"*", N/k) 
      return True 
    return False 


def prompt(result): 
    if result: 
     print("prime factorization is over") 
     pool.terminate() 


def mainFun(N,process_num): 
    global pool 
    pool = multiprocessing.Pool(process_num) 
    for i in range(process_num): 
     if i ==0 : 
      begin =2 
     else: 
      begin = int(math.sqrt(N)/process_num*i)+1 
     end = int(math.sqrt(N)/process_num*(i+1)) 
     pool.apply_async(findK, args=(N,begin,end) , callback = prompt)  
    pool.close() 
    pool.join()  

if __name__ == "__main__": 
    N = 684568031001583853 
    process_num = 16 
    mainFun(N,process_num) 
+0

jeśli nie ustawiono puli jako zmiennej globalnej, czy pula może zostać przekazana do funkcji zwrotnej? –

4

można po prostu zdefiniować lokalny close funkcji jako callback:

import math 
import multiprocessing 


def findK(N, begin, end): 
    for k in range(begin, end): 
     if N % k == 0: 
      print(N, "=", k, "*", N/k) 
      return True 
    return False 


def mainFun(N, process_num): 
    pool = multiprocessing.Pool(process_num) 

    def close(result): 
     if result: 
      print("prime factorization is over") 
      pool.terminate() 
    for i in range(process_num): 
     if i == 0: 
      begin = 2 
     else: 
      begin = int(math.sqrt(N)/process_num * i) + 1 
     end = int(math.sqrt(N)/process_num * (i + 1)) 
     pool.apply_async(findK, args=(N, begin, end), callback=close) 
    pool.close() 
    pool.join() 


if __name__ == "__main__": 
    N = 684568031001583853 
    process_num = 16 
    mainFun(N, process_num) 

Można również użyć partial funkcji z functool z

import functools 

def close_pool(pool, results): 
    if result: 
     pool.terminate() 

def mainFun(N, process_num): 
    pool = multiprocessing.Pool(process_num) 

    close = funtools.partial(close_pool, pool) 
....