2016-11-16 37 views
5

Wykonuję optymalizację funkcji za pomocą algorytmu ewolucyjnego (CMAES). Aby uruchomić go szybciej, korzystam z modułu do przetwarzania wieloprocesowego. Funkcja potrzebna do optymalizacji pobiera duże macierze jako dane wejściowe (input_A_Opt, and input_B_Opt) w poniższym kodzie.Przezwyciężanie ograniczeń pamięci podczas korzystania z przetwarzania wieloprocesowego

Ma kilka GB rozmiaru. Gdy uruchomię tę funkcję bez obsługi wieloprocesowej, działa dobrze. Kiedy używam przetwarzania wieloprocesowego, wydaje się, że występuje problem z pamięcią. Jeśli uruchomić go z małych nakładów to działa dobrze, ale gdy uruchamiam z pełnego wejścia, pojawia się następujący błąd:

File "<ipython-input-2-bdbae5b82d3c>", line 1, in <module> 
opt.myFuncOptimization() 

File "/home/joe/Desktop/optimization_folder/Python/Optimization.py", line 45, in myFuncOptimization 
**f_values = pool.map_async(partial_function_to_optmize, solutions).get()** 
File "/usr/lib/python3.5/multiprocessing/pool.py", line 608, in get 
raise self._value 
    File "/usr/lib/python3.5/multiprocessing/pool.py", line 385, in _handle_tasks 
put(task) 
File "/usr/lib/python3.5/multiprocessing/connection.py", line 206, in send 
self._send_bytes(ForkingPickler.dumps(obj)) 

File "/usr/lib/python3.5/multiprocessing/connection.py", line 393, in _send_bytes 
header = struct.pack("!i", n) 

error: 'i' format requires -2147483648 <= number <= 2147483647 

A oto uproszczona wersja kodu (ponownie, jeśli uruchomię go z wejście 10 razy mniejsze, wszystko działa bez zarzutu):

import numpy as np 
import cma 
import multiprocessing as mp 
import functools 
import myFuncs 
import hdf5storage 



def myFuncOptimization(): 

    temp = hdf5storage.loadmat('/home/joe/Desktop/optimization_folder/matlab_workspace_for_optimization')  

    input_A_Opt = temp["input_A"] 
    input_B_Opt = temp["input_B"] 

    del temp 

    numCores = 20 

    # Inputs 
    #________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________________ 
    P0 = np.array([   4.66666667, 2.5, 2.66666667, 4.16666667, 0.96969697,  1.95959596,  0.44088176,  0.04040404,  6.05210421,  0.58585859,  0.46464646,   8.75751503,   0.16161616,    1.24248497,   1.61616162,     1.56312625,   5.85858586,     0.01400841, 1.0,   2.4137931,  0.38076152, 2.5, 1.99679872  ]) 
    LBOpt = np.array([   0.0,  0.0, 0.0,  0.0,  0.0,   0.0,   0.0,   0.0,   0.0,   0.0,   0.0,    0.0,    0.0,     0.0,    0.0,      0.0,    0.0,      0.0,  0.0,   0.0,   0.0,  0.0, 0.0,   ]) 
    UBOpt = np.array([   10.0,  10.0, 10.0,  10.0,  10.0,   10.0,   10.0,   10.0,   10.0,   10.0,   10.0,    10.0,    10.0,     10.0,    10.0,      10.0,    10.0,      10.0,  10.0,   10.0,   10.0,  10.0, 10.0,   ]) 
    initialStdsOpt = np.array([2.0,  2.0, 2.0,  2.0,  2.0,   2.0,   2.0,   2.0,   2.0,   2.0,   2.0,    2.0,    2.0,     2.0,    2.0,      2.0,    2.0,      2.0,  2.0,   2.0,   2.0,  2.0, 2.0,   ]) 
    minStdsOpt = np.array([ 0.030,  0.40, 0.030,  0.40,  0.020,   0.020,   0.020,   0.020,   0.020,   0.020,   0.020,    0.020,    0.020,     0.020,    0.020,      0.020,    0.020,      0.020,  0.050,   0.050,   0.020,  0.40, 0.020,   ]) 

    options = {'bounds':[LBOpt,UBOpt], 'CMA_stds':initialStdsOpt, 'minstd':minStdsOpt, 'popsize':numCores} 
    es = cma.CMAEvolutionStrategy(P0, 1, options) 

    pool = mp.Pool(numCores) 

    partial_function_to_optmize = functools.partial(myFuncs.func1, input_A=input_A_Opt, input_B=input_B_Opt) 

    while not es.stop(): 
     solutions = es.ask(es.popsize)    
     f_values = pool.map_async(partial_function_to_optmize, solutions).get() 
     es.tell(solutions, f_values) 
     es.disp(1) 
     es.logger.add() 

    return es.result_pretty() 

Wszelkie sugestie, jak rozwiązać ten problem? czy nie koduję poprawnie (nowość w Pythonie) czy powinienem użyć innego pakietu wieloprocesowego, takiego jak miarka?

+0

Używasz zbyt dużo pamięci! Sprawdź w pamięci współdzielonej wartości, które nie muszą być kopiowane (https://docs.python.org/2/library/multiprocessing.html#sharing-state-between-processes). – tcooc

+0

Powiązane: [Użyj numpy array we wspólnej pamięci do wieloprocesowości] (https://stackoverflow.com/questions/7894791/use-numpy-array-in-shared-memory-for-multiprocessing) – robyschek

Odpowiedz

0

Twoje obiekty są zbyt duże, aby przejść między procesami. Przekazujesz więcej niż 2147483647 bajtów - to ponad 2 GB! Protokół nie jest do tego stworzony, a zwykłe obciążenie serializacją i deserializacją tak dużych porcji danych może być poważnym obciążeniem dla wydajności.

Zmniejsz rozmiar danych przekazywanych do poszczególnych procesów. Jeśli pozwala ci na to przepływ pracy, przeczytaj dane w oddzielnym procesie i przekazuj tylko wyniki.

+0

Dzięki MisterMiyagi. Tak, każda mijana macierz ma więcej niż 2 GB. Te dane są jednak potrzebne. Spróbuję zasugerować odczytanie danych w każdym procesie, tak naprawdę można to zrobić. Zwiększy to czas potrzebny na każdą iterację, ponieważ zamiast czytania jednego na początku procesu będę musiał czytać za każdym razem, gdy wykonam jedną ocenę (tysiące razy). Prawdopodobnie wydłużony czas wynosi ~ 25%, rodzaj akceptowalny. – Joe

+0

btw, czy wiesz, że istnieje jakaś inna równoległa struktura przetwarzania, która może pracować z tą ilością danych (np. Miarka)? – Joe

+0

pytanie uzupełniające - czy problem może wynikać z tego, że 20 równoległych procesów czyta ten sam plik w tym samym czasie? – Joe