2015-07-01 18 views
9

File używam następujący kod, aby podzielić plik CSV w wielu kawałkach (pozyskiwane z here)Python Chunking CSV Multiproccessing

def worker(chunk): 
    print len(chunk) 

def keyfunc(row): 
    return row[0] 

def main(): 
    pool = mp.Pool() 
    largefile = 'Counseling.csv' 
    num_chunks = 10 
    start_time = time.time() 
    results = [] 
    with open(largefile) as f: 
     reader = csv.reader(f) 
     reader.next() 
     chunks = itertools.groupby(reader, keyfunc) 
     while True: 
      # make a list of num_chunks chunks 
      groups = [list(chunk) for key, chunk in 
         itertools.islice(chunks, num_chunks)] 
      if groups: 
       result = pool.map(worker, groups) 
       results.extend(result) 
      else: 
       break 
    pool.close() 
    pool.join() 

Wydaje się jednak, że liczba kawałków zawsze pozostaje stała niezależnie od liczba porcji, które wybieram. Na przykład, czy zdecyduję się na 1 lub 10 porcji, zawsze otrzymam to wyjście podczas przetwarzania przykładowego pliku. Idealnie chciałbym porcjować plik tak, aby był równomiernie rozłożony.

Uwaga, rzeczywisty plik, który kroję, ma ponad 13 milionów wierszy, dlatego przetwarzam go kawałek po kawałku. To musi!

6 
7 
1 
... 
1 
1 
94 
--- 0.101687192917 seconds --- 
+1

Powiedzmy wybrać, aby podzielić plik na 10 kawałków. Czy chcesz, aby jeden proces roboczy obsłużył 1 porcję pliku, czy chcesz równomiernie rozłożyć ten 1 kawałek między robotników w puli, poczekać, aż wszystkie skończą się, a następnie wysłać pulę następną porcją? – unutbu

+1

@HappyLeapSecond \t 1 porcja na proces roboczy byłaby bardziej wydajna (więc nie muszę blokować i czekać na zakończenie każdego innego procesu) Przed zadaniem tego pytania, przejrzałem dosyć obszernie dokumentację Pythona. Rozumiem, że używasz groupby do odwzorowania każdej wartości z rzędu na klucz (odpowiednia kolumna). Zwraca to iterator. Następnie przekazujesz to do islice, które zaczyna się od 0, a następnie wyrywa num_chunks (które będzie 10). To byłaby poprawna liczba wierszy? Idealnie chciałbym, aby procesy działały z 10 000 części wierszy. –

+1

W drugim problemie * "jest kolumna, która musi być [pogrupowana] przez ... i wszystkie wiersze o tej nazwie nie mogą być podzielone" *. Z tego powodu użyto polecenia 'itertools.groupby'. Tutaj nie ma wymogu grupowania wierszy według wartości określonej kolumny, więc możemy pominąć użycie 'itertools.groupby'. – unutbu

Odpowiedz

10

Per the comments, chcemy mieć każdy proces pracy na 10000 rzędzie kawałek. To nie jest trudne do wykonania; zobacz poniżej przepis iter/islice. Jednak problem z użyciem

pool.map(worker, ten_thousand_row_chunks) 

że pool.map będzie próbował umieścić wszystkie kawałki w kolejce zadań naraz. Jeśli wymaga to więcej pamięci niż jest dostępna, otrzymuje się MemoryError. (Uwaga: pool.imapsuffers from the same problem.)

Zamiast tego musimy wywoływać iteracyjnie po pool.map na kawałkach każdego kawałka.

import itertools as IT 
import multiprocessing as mp 
import csv 

def worker(chunk): 
    return len(chunk) 

def main(): 
    # num_procs is the number of workers in the pool 
    num_procs = mp.cpu_count() 
    # chunksize is the number of lines in a chunk 
    chunksize = 10**5 

    pool = mp.Pool(num_procs) 
    largefile = 'Counseling.csv' 
    results = [] 
    with open(largefile, 'rb') as f: 
     reader = csv.reader(f) 
     for chunk in iter(lambda: list(IT.islice(reader, chunksize*num_procs)), []): 
      chunk = iter(chunk) 
      pieces = list(iter(lambda: list(IT.islice(chunk, chunksize)), [])) 
      result = pool.map(worker, pieces) 
      results.extend(result) 
    print(results) 
    pool.close() 
    pool.join() 

main() 

Każdy chunk będzie składać się z maksymalnie chunksize*num_procs linii z pliku. To wystarczająca ilość danych, aby dać wszystkim pracownikom w puli coś do pracy, ale nie za duże, aby spowodować błąd MemoryError - pod warunkiem, że chunksize nie jest ustawiony zbyt duży.

Każdy egzemplarz chunk jest następnie dzielony na części, z których każdy składa się z do chunksize wierszy z pliku. Te części są następnie przesyłane do pool.map.


Jak iter(lambda: list(IT.islice(iterator, chunksize)), []) pracę:

jest to idiom grupowania iterator na kawałki o długości chunksize. Zobaczmy, jak to działa na przykładzie:

In [111]: iterator = iter(range(10)) 

Zauważ, że za każdym razem IT.islice(iterator, 3) nazywa się nowy kawałek z 3 produktów kroi off iteracyjnej:

In [112]: list(IT.islice(iterator, 3)) 
Out[112]: [0, 1, 2] 

In [113]: list(IT.islice(iterator, 3)) 
Out[113]: [3, 4, 5] 

In [114]: list(IT.islice(iterator, 3)) 
Out[114]: [6, 7, 8] 

kiedy jest mniej niż 3 przedmioty pozostawione w iteracyjnej, tylko to, co pozostaje jest zwracany:

In [115]: list(IT.islice(iterator, 3)) 
Out[115]: [9] 

A jeśli nazywają go ponownie, masz pustą listę:

In [116]: list(IT.islice(iterable, 3)) 
Out[116]: [] 

lambda: list(IT.islice(iterator, chunksize)) jest funkcją, która zwraca list(IT.islice(iterator, chunksize)) kiedy dzwonił. Jest to „jedna wkładka”, która jest równoważna

def func(): 
    return list(IT.islice(iterator, chunksize)) 

Wreszcie iter(callable, sentinel) powraca jeden iteracyjnej. Wartości uzyskane przez ten iterator są wartościami zwracanymi przez wywoływalny. Utrzymuje wartości plonowania do momentu, gdy wywoływacz zwróci wartość równą wartownikowi. Więc

iter(lambda: list(IT.islice(iterator, chunksize)), []) 

zachowa po powrocie wartości list(IT.islice(iterator, chunksize)) dopóki ta wartość jest pusta lista:

In [121]: iterator = iter(range(10)) 

In [122]: list(iter(lambda: list(IT.islice(iterator, 3)), [])) 
Out[122]: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]] 
+0

Wow! Świetna i opisowa odpowiedź. Dziękuję bardzo. Teraz rozumiem to znacznie lepiej. Jeśli mogę zadać ci pytanie, w jaki sposób zdobyłeś się na takie rzeczy i intuicyjnie rozumiesz te pythonicowe zasady? Czy masz książkę lub źródło, które możesz polecić? –

+0

Jest wielu innych, którzy wiedzą o wiele więcej niż ja, więc utożsamiam się z tobą, tym, który zadaje pytanie, niż próbując odpowiedzieć na to pytanie. Ponadto może nie być [droga królewska] (https://en.wikipedia.org/wiki/Royal_Road#A_metaphorical_.E2.80.9CRoyal_Road.E2.80.9D_in_famous_quotations). Jedna rzecz, być może, naprawdę mi pomogła - zbieram krótkie, proste przykłady demonstrujące użycie każdej funkcji i funkcji w Pythonie. – unutbu

+0

Nie sądzę, aby miało to znaczenie, jaką dokumentację przeczytasz. W sieci jest wiele świetnych bezpłatnych dokumentów i samouczków. Liczy się to, że ćwiczysz i bawisz się językiem. Konkretne przykłady jasno określają znaczenie i zachowanie języka. Najlepszą radą, jaką mogę dać, jest czerpanie przyjemności z programowania i angażowanie się w [dużo praktyki/zabawy] (http://norvig.com/21-days.html). – unutbu

6

Po pierwsze, itertools.groupby nie ma żadnego sensu, jeśli rekordy nie są już posortowane w kolumnie klucza. Co więcej, jeśli chcesz tylko podzielić plik CSV na wcześniej określoną liczbę wierszy i przekazać go pracownikowi, nie musisz wykonywać wszystkich tych czynności.

Prosta implementacja będzie:

import csv 
from multiprocessing import Pool 


def worker(chunk): 
    print len(chunk) 

def emit_chunks(chunk_size, file_path): 
    lines_count = 0 
    with open(file_path) as f: 
     reader = csv.reader(f) 
     chunk = [] 
     for line in reader: 
      lines_count += 1 
      chunk.append(line) 
      if lines_count == chunk_size: 
       lines_count = 0 
       yield chunk 
       chunk = [] 
      else: 
       continue 
     if chunk : yield chunk 

def main(): 
    chunk_size = 10 
    gen = emit_chunks(chunk_size, 'c:/Temp/in.csv') 
    p = Pool(5) 
    p.imap(worker, gen) 
    print 'Completed..' 

* Edycja: zmieniono pool.imap zamiast pool.map

+1

Nie byłoby "pool.imap" być lepszą pamięcią i jeśli ta kolumna jest posortowana 'jeśli lines_count == chunk_size' jest poprawiona, aby upewnić się, że konkretna kolumna ma różne wartości – deinonychusaur

+0

@deinonychusaur Absolutnie, pool.imap jest poprawnym sposobem aby to zrobić, zajmiemy się problemem pamięci. Zmieniam swoją odpowiedź, aby to wykorzystać. Dzięki. – gipsy

+0

Rozumiem. Nie przechowujesz ich w pamięci, ale używasz wydajności, aby wygenerować te wartości z generatora? Wybrałem drugą odpowiedź, ponieważ słowo kluczowe "yield" jest nieco skomplikowane i trochę zajęło mi zrozumienie tego, co robiłeś. Bezwzględnie przegłosowałem twoją odpowiedź i naprawdę doceniam twoją pomoc Rób dalej to, co robisz :-)! –