2016-09-21 12 views
8

Mam zdefiniowaną funkcję, która renderuje tablicę MxN. Macierz jest bardzo duża, dlatego chcę użyć tej funkcji do utworzenia małych macierzy (M1xN, M2xN, M3xN --- MixN. M1 + M2 + M3 + --- + Mi = M) jednocześnie wykorzystując przetwarzanie wielokrotne/gwintowanie i ewentualnie dołącz te tablice, aby utworzyć tablicę mxn. Jak Pan Boardrider słusznie zasugerował, aby zapewnić realną przykład, następujący przykład będzie szeroko przekazać to, co mam zamiar zrobićUżyj wielo-przetwarzania/wątków do przerwania operacji numpy na kawałki

import numpy as n 
def mult(y,x): 
    r = n.empty([len(y),len(x)]) 
    for i in range(len(r)): 
     r[i] = y[i]*x 
    return r 
x = n.random.rand(10000) 
y = n.arange(0,100000,1) 
test = mult(y=y,x=x) 

jako długościach x i y wzrostem system zajmie więcej i więcej czasu. W odniesieniu do tego przykładu, chcę uruchomić ten kod tak, że jeśli mam 4 rdzenie, mogę podać jedną czwartą zadania każdemu, tj. Dać zadanie do obliczenia elementów r[0] na r[24999] na 1 rdzeń, r[25000] na r[49999] na 2 rdzeń, r[50000] na r[74999] na trzeci rdzeń i r[75000] na na 4. rdzeń. Ostatecznie zmień wyniki, dodaj je, aby uzyskać jedną tablicę: r[0] do r[99999].

Mam nadzieję, że ten przykład wyjaśnia. Jeśli mój problem nadal nie jest jasny, proszę powiedz.

+1

Co powiedzieli na [mcve]? – boardrider

+0

nigdy nie będziesz programować czegoś szybciej w Pythonie niż wewnętrzne mechanizmy rozgłaszania numpy, nawet jeśli jest to wielowątkowe/procesowe ... niech numpy zrobi to wewnętrznie – Aaron

+0

Uważaj, aby nie używać wielu wątków/procesów dla niego . Wykonanie niewielkiej ilości pracy na ogromnej ilości danych spowoduje, że procesor zostanie zatrzymany przez prędkość magistrali pamięci (jest powolny w porównaniu do pamięci podręcznej procesora itp.). Jeśli więc twój algorytm jest powiązany we/wy, dodanie kolejnych wątków nie spowoduje poprawy szybkości. – bazza

Odpowiedz

6

Pierwsza rzecz do powiedzenia to: jeśli chodzi o wielu rdzeni na tym samym procesorze, numpy jest już zdolny do parallelizing operację lepiej niż kiedykolwiek moglibyśmy zrobić ręcznie (patrz omówienie w multiplication of large arrays in python)

W ten sprawa klucz byłoby po prostu upewnić się, że mnożenie jest wykonywane w hurtowych operacji tablicy zamiast Python for -loop:

test2 = x[n.newaxis, :] * y[:, n.newaxis] 

n.abs(test - test2).max() # verify equivalence to mult(): output should be 0.0, or very small reflecting floating-point precision limitations 

[Jeśli rzeczywiście chciał szerzyć to w wielu oddzielnych procesorów, to już zupełnie inna ale wydaje się, że to pytanie Proponuję jeden (multi-core) CPU]


OK, mając powyższe na uwadze. załóżmy chcesz parallelize operacja bardziej skomplikowane niż tylko mult(). Załóżmy, że starałeś się zoptymalizować swoją operację do operacji macierzy hurtowych, które mogą się same łączyć, ale twoja operacja po prostu nie jest na to podatna. W takim przypadku można użyć pamięci współużytkowanej multiprocessing.Array utworzonej za pomocą lock=False i multiprocessing.Pool, aby przypisać procesy do adresowania niepokrywających się jej fragmentów, podzielonych na wymiary y (a także, jeśli chcesz, jednocześnie ponad x). Przykładowy wykaz znajduje się poniżej. Zauważ, że to podejście nie robi dokładnie tego, co określasz (łącz wyniki razem i dołącz je do jednej tablicy). Raczej robi coś bardziej wydajnego: wiele procesów jednocześnie składa swoje części odpowiedzi w nienakładających się częściach pamięci współdzielonej. Po zakończeniu nie jest konieczne sortowanie/dołączanie: po prostu odczytujemy wynik.

import os, numpy, multiprocessing, itertools 

SHARED_VARS = {} # the best way to get multiprocessing.Pool to send shared multiprocessing.Array objects between processes is to attach them to something global - see http://stackoverflow.com/questions/1675766/ 

def operate(slices): 
    # grok the inputs 
    yslice, xslice = slices 
    y, x, r = get_shared_arrays('y', 'x', 'r') 
    # create views of the appropriate chunks/slices of the arrays: 
    y = y[yslice] 
    x = x[xslice] 
    r = r[yslice, xslice] 
    # do the actual business 
    for i in range(len(r)): 
     r[i] = y[i] * x # If this is truly all operate() does, it can be parallelized far more efficiently by numpy itself. 
         # But let's assume this is a placeholder for something more complicated. 

    return 'Process %d operated on y[%s] and x[%s] (%d x %d chunk)' % (os.getpid(), slicestr(yslice), slicestr(xslice), y.size, x.size) 

def check(y, x, r): 
    r2 = x[numpy.newaxis, :] * y[:, numpy.newaxis] # obviously this check will only be valid if operate() literally does only multiplication (in which case this whole business is unncessary) 
    print('max. abs. diff. = %g' % numpy.abs(r - r2).max()) 
    return y, x, r 

def slicestr(s): 
    return ':'.join('' if x is None else str(x) for x in [s.start, s.stop, s.step]) 

def m2n(buf, shape, typecode, ismatrix=False): 
    """ 
    Return a numpy.array VIEW of a multiprocessing.Array given a 
    handle to the array, the shape, the data typecode, and a boolean 
    flag indicating whether the result should be cast as a matrix. 
    """ 
    a = numpy.frombuffer(buf, dtype=typecode).reshape(shape) 
    if ismatrix: a = numpy.asmatrix(a) 
    return a 

def n2m(a): 
    """ 
    Return a multiprocessing.Array COPY of a numpy.array, together 
    with shape, typecode and matrix flag. 
    """ 
    if not isinstance(a, numpy.ndarray): a = numpy.array(a) 
    return multiprocessing.Array(a.dtype.char, a.flat, lock=False), tuple(a.shape), a.dtype.char, isinstance(a, numpy.matrix) 

def new_shared_array(shape, typecode='d', ismatrix=False): 
    """ 
    Allocate a new shared array and return all the details required 
    to reinterpret it as a numpy array or matrix (same order of 
    output arguments as n2m) 
    """ 
    typecode = numpy.dtype(typecode).char 
    return multiprocessing.Array(typecode, int(numpy.prod(shape)), lock=False), tuple(shape), typecode, ismatrix 

def get_shared_arrays(*names): 
    return [m2n(*SHARED_VARS[name]) for name in names] 

def init(*pargs, **kwargs): 
    SHARED_VARS.update(pargs, **kwargs) 

if __name__ == '__main__': 

    ylen = 1000 
    xlen = 2000 

    init(y=n2m(range(ylen))) 
    init(x=n2m(numpy.random.rand(xlen))) 
    init(r=new_shared_array([ylen, xlen], float)) 

    print('Master process ID is %s' % os.getpid()) 

    #print(operate([slice(None), slice(None)])); check(*get_shared_arrays('y', 'x', 'r')) # local test 

    pool = multiprocessing.Pool(initializer=init, initargs=SHARED_VARS.items()) 
    yslices = [slice(0,333), slice(333,666), slice(666,None)] 
    xslices = [slice(0,1000), slice(1000,None)] 
    #xslices = [slice(None)] # uncomment this if you only want to divide things up in the y dimension 
    reports = pool.map(operate, itertools.product(yslices, xslices)) 
    print('\n'.join(reports)) 
    y, x, r = check(*get_shared_arrays('y', 'x', 'r'))