2017-06-14 65 views
6

Mam plik CSV zawierający wartości elementów dla pozycji: każdy wiersz jest potrójny (id_item, id_feature, value) reprezentujący wartość określonej funkcji dla określonego elementu. Dane są bardzo rzadkie.Oblicz macierz parowania równoległego: czy skalowalne podejście do obsługi gotowych danych jest dostępne w języku Python?

Muszę obliczyć dwie macierze odległości, jedna z wykorzystaniem korelacji Pearsona jako metryki, a druga z użyciem indeksu Jaccard.

W tej chwili wdrażane rozwiązania w pamięci i zrobić coś takiego:

import numpy as np 
from numpy import genfromtxt 
from scipy.sparse import coo_matrix 
from scipy.sparse import csr_matrix 
from scipy.stats.stats import pearsonr 
import sklearn.metrics.pairwise 
import scipy.spatial.distance as ds 
import scipy.sparse as sp 

# read the data 
my_data = genfromtxt('file.csv', delimiter=',') 
i,j,value=my_data.T 

# create a sparse matrix 
m=coo_matrix((value,(i,j))) 

# convert in a numpy array 
m = np.array(m.todense()) 

# create the distance matrix using pdist 
d = ds.pdist(m.T, 'correlation') 

d= ds.squareform(d) 

to działa dobrze i to dość szybko, ale to nie jest skalowalne w poziomie. Chciałbym móc zwiększyć wydajność, dodając węzły do ​​klastra i wszystko może działać nawet w scenariuszu dużych zbiorów danych, ponownie przez dodanie węzłów. Nie obchodzi mnie, czy proces ten zajmuje godziny; odległości należy aktualizować raz dziennie.

Jakie jest najlepsze podejście?

1) Skalarne pairwise_distances ma parametr n_jobs, który pozwala wykorzystać zalety obliczeń równoległych (http://scikit-learn.org/stable/modules/generated/sklearn.metrics.pairwise.pairwise_distances.html), ale z tego co wiem, obsługuje wiele rdzeni na tym samym komputerze, a nie w klastrze. To jest powiązane pytanie Easy way to use parallel options of scikit-learn functions on HPC, ale nie dostałem, co jest najlepszym rozwiązaniem w moim konkretnym przypadku i jeśli Joblib rzeczywiście ma problemy.

Ponadto, ta część, która brzmi w pamięci CSV nadal będzie wąskim gardłem: można przechowywać w HDFS CSV i odczytać to robi coś takiego:

import subprocess 
cat = subprocess.Popen(["hadoop", "fs", "-cat", "data.csv"], stdout=subprocess.PIPE) 

a następnie pętli cat.stdout:

for line in cat.stdout: 
    .... 

, ale nie jestem pewien, czy to dobre rozwiązanie.

dane 2) przechowywać w HDFS, wdrożyć obliczeń w mapie zmniejszyć modę i wykonać zadanie poprzez mrjob

3) Przechowywanie danych w HDFS, realizacji obliczeń w sposób podobny do SQL (nie wiem jeśli jest to łatwe i wykonalne, muszę o tym pomyśleć) i uruchomić go przy użyciu PyHive

Oczywiście chciałbym zachować jak najwięcej aktualnego kodu, więc wariant rozwiązania 1) jest najlepszy dla mnie.

+0

Chciałbym wypróbować dystrybucję Pythona Intela i MPI dla Pythona. Możesz rzucić okiem w tym [problem GoParallel] (https://goparallel.sourceforge.net/wp-content/uploads/2016/07/intel-parallel-universe-issue-25.compressed.pdf). – rll

+0

Jaki jest rozmiar pliku data.csv (liczba linii, MB ...)? – glegoux

+0

@glegoux problem nie jest teraz wielkości pliku CSV, ale możliwość skalowania w przyszłości – Eugenio

Odpowiedz

0

W prototypie:

Proponuję użyć Pyro4 i wdrożyć że z divide and conquer paradygmatu węzła głównego i kilku węzłów podrzędnych.

Jeśli masz n pozycji, masz pary n(n-1)/2, używasz odległości parach sklear z maksymalną liczbą zadań (parametr n_jobs) na każdym węźle.

Podział zestawu par w zadaniach a i wykonanie tego na węzłach b i przegrupowanie wyniku na węźle głównym.

Do produkcji:

radzę PySpark 2.1.1.Zmniejszenie mapy staje się przestarzałe.

+0

O ile widziałem, nie ma prostej metody w iskrze, aby po prostu obliczyć odległości (co było moim pytaniem); Twoja odpowiedź kieruje mnie jednak we właściwym kierunku, więc postanowiłem to zaakceptować. – Eugenio