Per the comments, chcemy mieć każdy proces pracy na 10000 rzędzie kawałek. To nie jest trudne do wykonania; zobacz poniżej przepis iter/islice
. Jednak problem z użyciem
pool.map(worker, ten_thousand_row_chunks)
że pool.map
będzie próbował umieścić wszystkie kawałki w kolejce zadań naraz. Jeśli wymaga to więcej pamięci niż jest dostępna, otrzymuje się MemoryError
. (Uwaga: pool.imap
suffers from the same problem.)
Zamiast tego musimy wywoływać iteracyjnie po pool.map
na kawałkach każdego kawałka.
import itertools as IT
import multiprocessing as mp
import csv
def worker(chunk):
return len(chunk)
def main():
# num_procs is the number of workers in the pool
num_procs = mp.cpu_count()
# chunksize is the number of lines in a chunk
chunksize = 10**5
pool = mp.Pool(num_procs)
largefile = 'Counseling.csv'
results = []
with open(largefile, 'rb') as f:
reader = csv.reader(f)
for chunk in iter(lambda: list(IT.islice(reader, chunksize*num_procs)), []):
chunk = iter(chunk)
pieces = list(iter(lambda: list(IT.islice(chunk, chunksize)), []))
result = pool.map(worker, pieces)
results.extend(result)
print(results)
pool.close()
pool.join()
main()
Każdy chunk
będzie składać się z maksymalnie chunksize*num_procs
linii z pliku. To wystarczająca ilość danych, aby dać wszystkim pracownikom w puli coś do pracy, ale nie za duże, aby spowodować błąd MemoryError - pod warunkiem, że chunksize
nie jest ustawiony zbyt duży.
Każdy egzemplarz chunk
jest następnie dzielony na części, z których każdy składa się z do chunksize
wierszy z pliku. Te części są następnie przesyłane do pool.map
.
Jak iter(lambda: list(IT.islice(iterator, chunksize)), [])
pracę:
jest to idiom grupowania iterator na kawałki o długości chunksize. Zobaczmy, jak to działa na przykładzie:
In [111]: iterator = iter(range(10))
Zauważ, że za każdym razem IT.islice(iterator, 3)
nazywa się nowy kawałek z 3 produktów kroi off iteracyjnej:
In [112]: list(IT.islice(iterator, 3))
Out[112]: [0, 1, 2]
In [113]: list(IT.islice(iterator, 3))
Out[113]: [3, 4, 5]
In [114]: list(IT.islice(iterator, 3))
Out[114]: [6, 7, 8]
kiedy jest mniej niż 3 przedmioty pozostawione w iteracyjnej, tylko to, co pozostaje jest zwracany:
In [115]: list(IT.islice(iterator, 3))
Out[115]: [9]
A jeśli nazywają go ponownie, masz pustą listę:
In [116]: list(IT.islice(iterable, 3))
Out[116]: []
lambda: list(IT.islice(iterator, chunksize))
jest funkcją, która zwraca list(IT.islice(iterator, chunksize))
kiedy dzwonił. Jest to „jedna wkładka”, która jest równoważna
def func():
return list(IT.islice(iterator, chunksize))
Wreszcie iter(callable, sentinel)
powraca jeden iteracyjnej. Wartości uzyskane przez ten iterator są wartościami zwracanymi przez wywoływalny. Utrzymuje wartości plonowania do momentu, gdy wywoływacz zwróci wartość równą wartownikowi. Więc
iter(lambda: list(IT.islice(iterator, chunksize)), [])
zachowa po powrocie wartości list(IT.islice(iterator, chunksize))
dopóki ta wartość jest pusta lista:
In [121]: iterator = iter(range(10))
In [122]: list(iter(lambda: list(IT.islice(iterator, 3)), []))
Out[122]: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
Powiedzmy wybrać, aby podzielić plik na 10 kawałków. Czy chcesz, aby jeden proces roboczy obsłużył 1 porcję pliku, czy chcesz równomiernie rozłożyć ten 1 kawałek między robotników w puli, poczekać, aż wszystkie skończą się, a następnie wysłać pulę następną porcją? – unutbu
@HappyLeapSecond \t 1 porcja na proces roboczy byłaby bardziej wydajna (więc nie muszę blokować i czekać na zakończenie każdego innego procesu) Przed zadaniem tego pytania, przejrzałem dosyć obszernie dokumentację Pythona. Rozumiem, że używasz groupby do odwzorowania każdej wartości z rzędu na klucz (odpowiednia kolumna). Zwraca to iterator. Następnie przekazujesz to do islice, które zaczyna się od 0, a następnie wyrywa num_chunks (które będzie 10). To byłaby poprawna liczba wierszy? Idealnie chciałbym, aby procesy działały z 10 000 części wierszy. –
W drugim problemie * "jest kolumna, która musi być [pogrupowana] przez ... i wszystkie wiersze o tej nazwie nie mogą być podzielone" *. Z tego powodu użyto polecenia 'itertools.groupby'. Tutaj nie ma wymogu grupowania wierszy według wartości określonej kolumny, więc możemy pominąć użycie 'itertools.groupby'. – unutbu