2017-09-01 75 views
7

Mój kod wywołuje wiele "funkcji różnicowych", aby obliczyć "Yin algorithm" (podstawowy ekstraktor częstotliwości).Optymalizacja obliczeń "funkcji różnicowej"

Funkcja różnica (równ. 6 w papier) jest zdefiniowany jako:

enter image description here

A to my realizacja funkcji różnicą

def differenceFunction(x, W, tau_max): 
    df = [0] * tau_max 
    for tau in range(1, tau_max): 
     for j in range(0, W - tau): 
      tmp = long(x[j] - x[j + tau]) 
      df[tau] += tmp * tmp 
    return df 

na przykład w:

x = np.random.randint(0, high=32000, size=2048, dtype='int16') 
W = 2048 
tau_max = 106 
differenceFunction(x, W, tau_max) 

Czy istnieje sposób na optymalizację tego podwójnego komp utation (tylko z pythonem, najlepiej bez innych bibliotek niż numpy)?

EDIT: Zmieniono kod, aby uniknąć Index Error (pętla j odpowiedź @Elliot)

EDIT2: Kod Zmieniono używać X [0] (pętla j, @hynekcer komentarz)

+1

Chcesz szybsze podejście? lub Szukasz wektoryzowanego rozwiązania wykorzystującego matryce. – Dark

+1

Szukam najszybszego podejścia przy użyciu python – PatriceG

+3

Jednym ze sposobów jest użycie 'numba', jeśli chcesz zachować pętle i uzyskać maksymalną prędkość. Czy korzystasz z innych bibliotek? – Dark

Odpowiedz

6

EDIT: Poprawiona prędkość do 220 mikrosekund - patrz na końcu zmienił - bezpośredni wersję

Wymagane obliczenia można łatwo ocenione przez Autocorrelation function lub podobnie przez splot. Twierdzenie Wiener-Khinchin pozwala na obliczanie autokorelacji z dwiema szybkimi transformacjami Fouriera (FFT), ze złożonością czasową O (n   log   n). Używam funkcji zwiniętego splotu fftconvolve z pakietu Scipy. Zaletą jest to, że łatwo jest wyjaśnić, dlaczego to działa. Wszystko jest wektoryzowane, bez pętli na poziomie interpretera Pythona.

from scipy.signal import fftconvolve 

def difference_by_convol(x, W, tau_max): 
    x = np.array(x, np.float64) 
    w = x.size 
    x_cumsum = np.concatenate((np.array([0.]), (x * x).cumsum())) 
    conv = fftconvolve(x, x[::-1]) 
    df = x_cumsum[w:0:-1] + x_cumsum[w] - x_cumsum[:w] - 2 * conv[w - 1:] 
    return df[:tau_max + 1] 
  • W porównaniu z differenceFunction_1loop funkcji w Elliot's answer: jest szybciej FFT: 430   mikrosekundy stosunku do pierwotnego 1170 mikrosekund.Zaczyna być szybszy dla około tau_max >= 40. Dokładność liczbowa jest świetna. Najwyższy błąd względny jest mniejszy niż 1E-14 w porównaniu do dokładnego wyniku całkowitego. (W związku z tym można go łatwo zaokrąglić do dokładnego długiego rozwiązania integer).
  • Parametr tau_max nie ma znaczenia dla algorytmu. Ostatecznie ogranicza tylko wyjście. Element zerowy w indeksie 0 jest dodawany do wyjścia, ponieważ indeksy powinny zaczynać się od 0 w Pythonie.
  • Parametr W nie jest ważny w języku Python. Rozmiar lepiej jest introspekcji.
  • Dane są najpierw konwertowane na np.float64, aby zapobiec powtarzającym się konwersjom. Jest o pół procenta szybsza. Dowolny typ mniejszy niż np.int64 byłby nie do zaakceptowania z powodu przepełnienia.
  • Wymagana funkcja różnicowa to funkcja podwójnej energii minus autokorelacja. Można to ocenić za pomocą splotu: correlate(x, x) = convolve(x, reversed(x).
  • "Od Scipy v0.19 normalna convolve automatycznie wybiera tę metodę lub metodę bezpośrednią opartą na estymacji, która jest szybsza." Ta heurystyka nie jest adekwatna do tej sprawy, ponieważ splot ocenia o wiele więcej tau niż tau_max i musi być przeważony o znacznie szybszą FFT niż metodą bezpośrednią.
  • Można go również obliczyć za pomocą modułu Numpy ftp bez Scipy, przepisując odpowiedź Calculate autocorrelation using FFT in matlab na Python (poniżej na końcu). Myślę, że powyższe rozwiązanie może być łatwiejsze do zrozumienia.

Dowód: (dla Pythonistas :-)

Oryginalny naiwny realizacja może być zapisany jako:

df = [sum((x[j] - x[j + t]) ** 2 for j in range(w - t)) for t in range(tau_max + 1)] 

gdzie tau_max < w.

wywodzą z reguły (a - b)**2 == a**2 + b**2 - 2 * a * b

df = [ sum(x[j] ** 2 for j in range(w - t)) 
     + sum(x[j] ** 2 for j in range(t, w)) 
     - 2 * sum(x[j] * x[j + t] for j in range(w - t)) 
     for t in range(tau_max + 1)] 

podstawić dwa pierwsze elementy z pomocą x_cumsum = [sum(x[j] ** 2 for j in range(i)) for i in range(w + 1)], który może być łatwo obliczona w czasie liniowo. Zamień sum(x[j] * x[j + t] for j in range(w - t)) przez splot conv = convolvefft(x, reversed(x), mode='full'), który ma wyjście o rozmiarze len(x) + len(x) - 1.

df = [x_cumsum[w - t] + x_cumsum[w] - x_cumsum[t] 
     - 2 * convolve(x, x[::-1])[w - 1 + t] 
     for t in range(tau_max + 1)] 

optymalizują ekspresji wektora:

df = x_cumsum[w:0:-1] + x_cumsum[w] - x_cumsum[:w] - 2 * conv[w - 1:] 

Każdy krok może być testowane i porównywane z danymi testowymi numerycznie


Edycja: wdrożone rozwiązanie bezpośrednio NumPy FFT.

def difference_fft(x, W, tau_max): 
    x = np.array(x, np.float64) 
    w = x.size 
    tau_max = min(tau_max, w) 
    x_cumsum = np.concatenate((np.array([0.]), (x * x).cumsum())) 
    size = w + tau_max 
    p2 = (size // 32).bit_length() 
    nice_numbers = (16, 18, 20, 24, 25, 27, 30, 32) 
    size_pad = min(x * 2 ** p2 for x in nice_numbers if x * 2 ** p2 >= size) 
    fc = np.fft.rfft(x, size_pad) 
    conv = np.fft.irfft(fc * fc.conjugate())[:tau_max] 
    return x_cumsum[w:w - tau_max:-1] + x_cumsum[w] - x_cumsum[:tau_max] - 2 * conv 

To ponad dwa razy szybciej niż w moim poprzednim rozwiązaniem, ponieważ długość splotu jest ograniczony do najbliższego „nice” numer z małymi czynniki pierwsze po W + tau_max, nie ocenia pełny 2 * W. Nie jest również konieczne przekształcanie tych samych danych dwukrotnie, jak w przypadku `fftconvolve (x, odwrócony (x)).

In [211]: %timeit differenceFunction_1loop(x, W, tau_max) 
1.1 ms ± 4.51 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each) 

In [212]: %timeit difference_by_convol(x, W, tau_max) 
431 µs ± 5.69 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each) 

In [213]: %timeit difference_fft(x, W, tau_max) 
218 µs ± 685 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each) 

Najnowszym rozwiązaniem jest szybszy niż difference_by_convol Eliota dla tau_max> = 20. Ten wskaźnik nie zależy znacznie od wielkości danych ze względu na podobny stosunek kosztów ogólnych.

+0

Dzięki. Edytowałem kod. – PatriceG

+0

@PatriceGuyot Dodano dwukrotnie szybszą metodę. – hynekcer

0

Nie wiem, jak znaleźć alternatywę dla problemu zagnieżdżonych pętli, ale dla funkcji arytmetycznych można użyć biblioteki numpy. jest szybszy niż operacje ręczne.

import numpy as np 
tmp = np.subtract(long(x[j] ,x[j + tau]) 
0

chciałbym zrobić coś takiego:

>>> x = np.random.randint(0, high=32000, size=2048, dtype='int16') 
>>> tau_max = 106 
>>> res = np.square((x[tau_max:] - x[:-tau_max])) 

jednak jestem przekonany, nie jest to najszybszy sposób, aby to zrobić.

6

Przede wszystkim należy wziąć pod uwagę granice tablicy. Twój kod, jak pierwotnie napisano, dostanie IndexError. można dostać o znaczącej przyspieszenie przez Wektoryzacja pętli wewnętrznej

import numpy as np 

# original version 
def differenceFunction_2loop(x, W, tau_max): 
    df = np.zeros(tau_max, np.long) 
    for tau in range(1, tau_max): 
     for j in range(0, W - tau): # -tau eliminates the IndexError 
      tmp = np.long(x[j] -x[j + tau]) 
      df[tau] += np.square(tmp) 
    return df 

# vectorized inner loop 
def differenceFunction_1loop(x, W, tau_max): 
    df = np.zeros(tau_max, np.long) 
    for tau in range(1, tau_max): 
     tmp = (x[:-tau]) - (x[tau:]).astype(np.long) 
     df[tau] = np.dot(tmp, tmp) 
    return df 

x = np.random.randint(0, high=32000, size=2048, dtype='int16') 
W = 2048 
tau_max = 106 
twoloop = differenceFunction_2loop(x, W, tau_max) 
oneloop = differenceFunction_1loop(x, W, tau_max) 

# confirm that the result comes out the same. 
print(np.all(twoloop == oneloop)) 
# True 

Teraz przez jakiś benchmarkingu. W ipython otrzymuję następujący

In [103]: %timeit twoloop = differenceFunction_2loop(x, W, tau_max) 
1 loop, best of 3: 2.35 s per loop 

In [104]: %timeit oneloop = differenceFunction_1loop(x, W, tau_max) 
100 loops, best of 3: 8.23 ms per loop 

Więc, około 300 krotne przyspieszenie.

+0

Uwaga: wektorizacja jest techniką, gdy procesory przetwarzają fragmenty danych jednocześnie. Eliminacja kontroli zasięgu nazywa się dzieleniem iteracji. – egorlitvinenko

+1

@egorlitvinenko Istnieje więcej mechanizmów, dlaczego operacje Numpy na wektorach są szybsze niż pętla w języku interpretowanym. Jedną z nich są instrukcje SSE (Streaming SIMD Extensions (Single Instruction Multiple Data)) na rejestrach wektorowych używanych w niektórych przypadkach. Przydatny prosty termin [wektoryzacja] (https://en.wikipedia.org/wiki/Vectorization) powyżej odpowiada Wikipedii. – hynekcer

+0

@hynekcer Będę wyglądać. Dziękuję za wyjaśnienie. – egorlitvinenko

1

W przeciwieństwie do optymalizacji algorytmu można zoptymalizować tłumacza z numba.jit:

import timeit 

import numpy as np 
from numba import jit 


def differenceFunction(x, W, tau_max): 
    df = [0] * tau_max 
    for tau in range(1, tau_max): 
     for j in range(0, W - tau): 
      tmp = int(x[j] - x[j + tau]) 
      df[tau] += tmp * tmp 
    return df 


@jit 
def differenceFunction2(x, W, tau_max): 
    df = np.ndarray(shape=(tau_max,)) 
    for tau in range(1, tau_max): 
     for j in range(0, W - tau): 
      tmp = int(x[j] - x[j + tau]) 
      df[tau] += tmp * tmp 
    return df 


x = np.random.randint(0, high=32000, size=2048, dtype='int16') 
W = 2048 
tau_max = 106 
differenceFunction(x, W, tau_max) 


print('old', 
     timeit.timeit('differenceFunction(x, W, tau_max)', 'from __main__ import differenceFunction, x, W, tau_max', 
        number=20)/20) 
print('new', 
     timeit.timeit('differenceFunction2(x, W, tau_max)', 'from __main__ import differenceFunction2, x, W, tau_max', 
        number=20)/20) 

Wynik:

old 0.18265145074453273 
new 0.016223197058214667 

Można łączyć optymalizacji algorytmu i numba.jit dla lepszego rezultatu.

1

Oto inne podejście wykorzystujące rozumienie list. Zajmuje to mniej więcej jedną dziesiątą czasu zajmowanego przez oryginalną funkcję, ale nie pokonuje Elliot's answer. Po prostu go tam wypuszczę.

import numpy as np 
import time 

# original version 
def differenceFunction_2loop(x, W, tau_max): 
    df = np.zeros(tau_max, np.long) 
    for tau in range(1, tau_max): 
     for j in range(0, W - tau): # -tau eliminates the IndexError 
      tmp = np.long(x[j] -x[j + tau]) 
      df[tau] += np.square(tmp) 
    return df 

# vectorized inner loop 
def differenceFunction_1loop(x, W, tau_max): 
    df = np.zeros(tau_max, np.long) 
    for tau in range(1, tau_max): 
     tmp = (x[:-tau]) - (x[tau:]).astype(np.long) 
     df[tau] = np.dot(tmp, tmp) 
    return df 

# with list comprehension 
def differenceFunction_1loop_listcomp(x, W, tau_max): 
    df = [sum(((x[:-tau]) - (x[tau:]).astype(np.long))**2) for tau in range(1, tau_max)] 
    return [0] + df[:] 

x = np.random.randint(0, high=32000, size=2048, dtype='int16') 
W = 2048 
tau_max = 106 

s = time.clock() 
twoloop = differenceFunction_2loop(x, W, tau_max) 
print(time.clock() - s) 

s = time.clock() 
oneloop = differenceFunction_1loop(x, W, tau_max) 
print(time.clock() - s) 

s = time.clock() 
listcomprehension = differenceFunction_1loop_listcomp(x, W, tau_max) 
print(time.clock() - s) 

# confirm that the result comes out the same. 
print(np.all(twoloop == listcomprehension)) 
# True 

wyniki wydajności (w przybliżeniu):

differenceFunction_2loop() = 0.47s 
differenceFunction_1loop() = 0.003s 
differenceFunction_1loop_listcomp() = 0.033s