2014-07-14 12 views
8

Mam źródło przedmiotów i chcę oddzielnie przetwarzać je z tymi samymi wartościami funkcji klucza. W Pythonie będzie to wyglądaćLazy partition-by

for key_val, part in itertools.groupby(src, key_fn): 
    process(key_val, part) 

Takie rozwiązanie jest całkowicie leniwy, to znaczy jeśli process nie próbuje zapisać zawartość całego part, kod zostanie uruchomiony w O(1) pamięci.

roztwór Clojure

(doseq [part (partition-by key-fn src)] 
    (process part)) 

mniej leniwa: realizuje każdą część końca. Problem polega na tym, że src może mieć bardzo długie serie przedmiotów o tej samej wartości key-fn, a ich realizacja może doprowadzić do OOM.

Znalazłem this discussion gdzie twierdził, że następująca funkcja (lekko zmodyfikowane do nazywania spójności wewnątrz słupka) jest na tyle leniwy

(defn lazy-partition-by [key-fn coll] 
    (lazy-seq 
    (when-let [s (seq coll)] 
     (let [fst (first s) 
      fv (key-fn fst) 
      part (lazy-seq (cons fst (take-while #(= fv (key-fn %)) (next s))))] 
     (cons part (lazy-partition-by key-fn (drop-while #(= fv (key-fn %)) s))))))) 

Jednak nie rozumiem, dlaczego nie cierpią na OOM : obie części komórki cons posiadają odniesienie do s, więc podczas gdy process zużywa part, realizuje się s, ale nie zbiera śmieci. Będzie kwalifikować się do GC tylko wtedy, gdy drop-while będzie przemierzać part.

Więc moje pytania to:

  1. jestem skorygować o lazy-partition-by nie jest na tyle leniwy?
  2. Czy jest implementacja partition-by z gwarantowanymi wymaganiami dotyczącymi pamięci, pod warunkiem, że nie mam żadnych odniesień do poprzedniego part, zanim zacznę realizować następny?

EDIT: Oto dość leniwy realizacja w Haskell:

lazyPartitionBy :: Eq b => (a -> b) -> [a] -> [[a]] 
lazyPartitionBy _ [] = [] 
lazyPartitionBy keyFn [email protected](x:_) = let 
    fv = keyFn x 
    (part, rest) = span ((== fv) . keyFn) xl 
    in part : lazyPartitionBy keyFn rest 

Jak widać z span implementation, part i rest niejawnie udziałem skarbu państwa. Zastanawiam się, czy tę metodę można przełożyć na Clojure.

Odpowiedz

3

Mimo że to pytanie wywołuje bardzo interesującą kontemplację na temat projektowania języka, praktycznym problemem jest to, że chcesz przetworzyć na partycje w stałej pamięci. A praktyczny problem można rozwiązać za pomocą niewielkiej inwersji.

Zamiast przetwarzania wyniku funkcji, która zwraca sekwencję partycji, należy przekazać funkcję przetwarzania do funkcji, która tworzy partycje. Następnie możesz kontrolować stan w ograniczony sposób.

Najpierw przedstawimy sposób zsumowania zużycia sekwencji ze stanem ogona.

(defn fuse [coll wick] 
    (lazy-seq 
    (when-let [s (seq coll)] 
    (swap! wick rest) 
    (cons (first s) (fuse (rest s) wick))))) 

Następnie zmodyfikowana wersja partition-by

(defn process-partition-by [processfn keyfn coll] 
    (lazy-seq 
    (when (seq coll) 
     (let [tail (atom (cons nil coll)) 
      s (fuse coll tail) 
      fst (first s) 
      fv (keyfn fst) 
      pred #(= fv (keyfn %)) 
      part (take-while pred s) 
      more (lazy-seq (drop-while pred @tail))] 
     (cons (processfn part) 
       (process-partition-by processfn keyfn more)))))) 

Uwaga: W przypadku O (1) Zużycie pamięci processfn musi być chętny konsumentów! Tak więc podczas gdy (process-partition-by identity key-fn coll) jest taki sam jak (partition-by key-fn coll), ponieważ identity nie pobiera partycji, zużycie pamięci nie jest stałe.


Testy

(defn heavy-seq [] 
    ;adjust payload for your JVM so only a few fit in memory 
    (let [payload (fn [] (long-array 20000000))] 
    (map #(vector % (payload)) (iterate inc 0)))) 

(defn my-process [s] (reduce + (map first s))) 

(defn test1 [] 
    (doseq [part (partition-by #(quot (first %) 10) (take 50 (heavy-seq)))] 
    (my-process part))) 

(defn test2 [] 
    (process-partition-by 
    my-process #(quot (first %) 20) (take 200 (heavy-seq)))) 

so.core=> (test1) 
OutOfMemoryError Java heap space [trace missing] 

so.core=> (test2) 
(190 590 990 1390 1790 2190 2590 2990 3390 3790) 
+0

Czy poprawiam to, jeśli zamieniam '(lazy-seq @tail)' na '(lazy-seq (drop-while # (= fv (keyfn%)) @tail))', które zadziała również w przypadku kiedy 'processfn' nie pochłania całkowicie argumentu? – tempestadept

+0

Wygląda również na to, że wersja bez 'drop-while' traci pierwszy element każdej części poza pierwszą. – tempestadept

+0

@tempestadept Dzięki za debugowanie, naprawiono te problemy z edycjami powyżej. (1) 'drop-while' musi być' (lazy-seq (drop-while ...)), aby zapobiec pochopnej ocenie argumentów 'drop-while'. (2) Błąd off-by-one był spowodowany tym, że 'take-while' musi sprawdzić pierwszą fałszywą wartość. W związku z tym musimy podeprzeć ogon z pierwszą obojętną wartością, aby pozostać na jednym z przodu. –

1

Czy mam rację co do leniwego podziału, ponieważ nie jest on wystarczająco leniwy?

Cóż, jest różnica między lenistwem a wykorzystaniem pamięci. Sekwencja może być leniwą i nadal wymagać dużej ilości pamięci - patrz na przykład implementacja clojure.core/distinct, która używa zestawu do zapamiętania wszystkich wcześniej obserwowanych wartości w sekwencji. Ale tak, twoja analiza wymagań pamięciowych lazy-partition-by jest poprawna - wywołanie funkcji do obliczenia głowicy drugiej partycji zatrzyma głowicę pierwszej partycji, co oznacza, że ​​wykonanie pierwszej partycji powoduje jej zachowanie w pamięci. Można to sprawdzić za pomocą następującego kodu:

user> (doseq [part (lazy-partition-by :a 
         (repeatedly 
         (fn [] {:a 1 :b (long-array 10000000)})))] 
     (dorun part)) 
; => OutOfMemoryError Java heap space 

Ponieważ ani doseq ani dorun zachowuje głowy, to po prostu uruchomić zawsze, jeśli lazy-partition-by były O (1) w pamięci.

Czy istnieje implementacja partycji-by z wymaganiami pamięci gwarancją, pod warunkiem, że nie posiada żadnych odniesień do poprzedniej części do czasu rozpoczęcia realizacji I następny?

Byłoby bardzo trudno, jeśli nie niemożliwe, napisać taką implementację w czysto funkcjonalny sposób, który działałby w ogólnym przypadku. Należy wziąć pod uwagę, że ogólna implementacja lazy-partition-by nie może przyjąć żadnych założeń dotyczących tego, kiedy (lub jeśli) partycja zostanie zrealizowana. Jedynym gwarantowanym poprawnym sposobem znalezienia początku drugiej partycji, poza wprowadzeniem jakiejś nieprzyjemnej odrobiny statefulness, aby śledzić, jak duża część pierwszej partycji została zrealizowana, jest zapamiętywanie, gdzie rozpoczęła się pierwsza partycja i skanowanie na żądanie.

Dla szczególnego przypadku, gdy przetwarzasz rekordy pojedynczo dla efektów ubocznych i chcesz je pogrupować według klucza (jak sugeruje twoje użycie powyższego doseq), możesz rozważyć coś podobnego do linii loop/recur, który utrzymuje stan i ponownie ustawia go po zmianie klucza.

+0

Dobrze, rozumiem różnicę między lenistwo i pamięci gwarancji. Widzę, że nie wybrałem najlepszych słów na moje pytania. Chciałbym użyć metody 'loop/recur', ale piszę funkcję biblioteki, która zajmie' proces' jako argument. Nie widzę oczywistego sposobu, aby to zrobić. Co do unikania jawnego utrzymywania stanu - dodam do pierwotnego pytania implementację Haskella, która, jak sądzę, jest wystarczająco leniwa. – tempestadept

+0

Moja pierwsza próba przeniesienia funkcji Haskella do Clojure cierpiała na ten sam problem z OOM, ale wtedy nie jestem biegły w Haskell. – Alex

7

Reguła, z której korzystam w tych scenariuszach (tj. Tych, w których chcesz, aby pojedyncza sekwencja wejściowa tworzyła wiele sekwencji wyjściowych) jest taka, że ​​z następujących trzech pożądanych właściwości, możesz ogólnie mieć tylko dwa:

  1. Sprawność (przemierzać sekwencję wejściową tylko raz, więc nie trzymać głowę)
  2. Lenistwo (produkować elementy tylko na żądanie)
  3. nie podzielił państwowych zmienny

Wersja w clojure.core wybiera (1,3), ale rezygnuje z (2), tworząc jednocześnie całą partycję. Python i Haskell wybierają (1,2), chociaż nie jest to od razu oczywiste: czy Haskell w ogóle nie ma stanu zmiennego? Cóż, leniwy pomiar wszystkiego (nie tylko sekwencji) oznacza, że ​​wszystkie wyrażenia są thunks, które rozpoczynają się jako puste pola i są zapisywane tylko wtedy, gdy ich wartość jest potrzebna; implementacja span, jak mówisz, dzieli ten sam thunk z span p xs' w obu swoich sekwencjach wyjściowych, tak że to, co jest potrzebne, najpierw "wysyła" je do wyniku drugiej sekwencji, wykonując akcję z odległości niezbędnej do zachowaj inne miłe właściwości.

Alternatywna implementacja Clojure powiązana z wybranymi (2,3), jak zauważyłeś.

Problem polega na tym, że dla partition-by, spadku albo (1) lub (2) oznacza, że ​​trzymasz głowę niektórych sekwencji: albo wejście lub jedno z wyjść. Jeśli więc potrzebujesz rozwiązania, w którym można obsłużyć dowolnie duże partycje arbitralnie dużego wejścia, musisz wybrać (1,2). Istnieje kilka sposobów można to zrobić w Clojure:

  1. Weź podejście Python: return coś bardziej jak iterator niż seq - seqs zrobić mocniejsze gwarancje o braku mutacji, a obiecuję, że bezpiecznie można je przemierzać wiele razy, itd. Jeśli zamiast seqs zwrócisz iterator iteratorów, to konsumowanie elementów z dowolnego iteratora może dowolnie mutować lub unieważniać inne. To gwarantuje, że konsumpcja odbywa się w porządku i że pamięć może zostać uwolniona.
  2. Podejście Haskella: ręcznie odfaj wszystko, z mnóstwem połączeń do delay i wymagaj od klienta dzwonienia pod numer force tak często, jak to konieczne, aby uzyskać dane. To będzie dużo brzydsze w Clojure i znacznie zwiększy twoją głębię stosu (użycie tego na nietrywialnym wejściu prawdopodobnie zepsułoby stos), ale teoretycznie jest to możliwe.
  3. Napisz coś więcej smaku Clojure (ale wciąż całkiem niezwykłego) dzięki kilku zmiennym obiektom danych, które są skoordynowane pomiędzy kolejnymi wyjściowymi numerami, z których każdy jest aktualizowany w razie potrzeby, gdy coś jest wymagane od żadnego z nich.

Jestem pewien, że każde z tych trzech podejść jest możliwe, ale szczerze mówiąc, wszystkie są dość trudne i wcale nie są naturalne. Abstrakcja sekwencji Clojure po prostu nie ułatwia tworzenia struktury danych, którą chcesz. Moja rada jest taka, że ​​jeśli potrzebujesz czegoś takiego, a partycje mogą być zbyt duże, aby zmieścić się wygodnie, po prostu zaakceptuj nieco inny format i wykonaj nieco więcej czynności związanych z księgowaniem: unikaj dylematu (1,2,3), nie produkując wielu sekwencje wyjściowe!

Więc zamiast ((2 4 6 8) (1 3 5) (10 12) (7)) jest Twój format czegoś podobnego (partition-by even? [2 4 6 8 1 3 5 10 12 7]), można przyjąć nieco brzydsze format: ([::key true] 2 4 6 8 [::key false] 1 3 5 [::key true] 10 12 [::key false] 7). Nie jest to ani trudne do wyprodukowania, ani trudne do konsumpcji, chociaż pisanie jest nieco zbyt długie i żmudne.

Oto jeden rozsądny realizacja funkcji produkującej:

(defn lazy-partition-by [f coll] 
    (lazy-seq 
    (when (seq coll) 
     (let [x (first coll) 
      k (f x)] 
     (list* [::key k] x 
     ((fn part [k xs] 
      (lazy-seq 
       (when (seq xs) 
       (let [x (first xs) 
         k' (f x)] 
        (if (= k k') 
        (cons x (part k (rest xs))) 
        (list* [::key k'] x (part k' (rest xs)))))))) 
      k (rest coll))))))) 

A oto jak je konsumować, najpierw definiowania rodzajowe reduce-grouped który ukrywa szczegóły formatu grupowania, a następnie przykładem funkcja count-partition-sizes do wyjścia klucz i rozmiar każdej partycji bez zachowania jakichkolwiek sekwencji w pamięci:

(defn reduce-grouped [f init groups] 
    (loop [k nil, acc init, coll groups] 
    (if (empty? coll) 
     acc 
     (if (and (coll? (first coll)) (= ::key (ffirst coll))) 
     (recur (second (first coll)) acc (rest coll)) 
     (recur k (f k acc (first coll)) (rest coll)))))) 

(defn count-partition-sizes [f coll] 
    (reduce-grouped (fn [k acc _] 
        (if (and (seq acc) (= k (first (peek acc)))) 
         (conj (pop acc) (update-in (peek acc) [1] inc)) 
         (conj acc [k 1]))) 
        [] (lazy-partition-by f coll))) 

user> (lazy-partition-by even? [2 4 6 8 1 3 5 10 12 7]) 
([:user/key true] 2 4 6 8 [:user/key false] 1 3 5 [:user/key true] 10 12 [:user/key false] 7) 
user> (count-partition-sizes even? [2 4 6 8 1 3 5 10 12 7]) 
[[true 4] [false 3] [true 2] [false 1]] 

Edit: znowu Patrząc na to, że nie jestem prawdziwym Jestem przekonany, że mój reduce-grouped jest o wiele bardziej przydatny niż (reduce f init (map g xs)), ponieważ tak naprawdę nie daje wyraźnego wskazania, kiedy klucz się zmienia. Więc jeśli potrzebujesz wiedzieć, kiedy grupa się zmieni, będziesz potrzebować mądrzejszej abstrakcji lub użyć mojego oryginalnego lazy-partition-by bez "inteligentnego" owijania jej.

+0

Chciałbym wystawić na kod klienta bibliotecznego interfejs, który umożliwia zapisywanie funkcji 'process', które pobierają kolejne elementy (z tym samym kluczem). "proces" jest następnie wywoływany w przypadku skutków ubocznych. Sądzę, że trzecia droga jest dla mnie najbardziej odpowiednia. Czy możesz podać zarys tego, jak go wdrożyć? – tempestadept

+0

Niezupełnie. Tak jak powiedziałem, to naprawdę trudne. Podczas pisania oryginalnej odpowiedzi otrzymałem 15 lub 20 linii, a potem zrozumiałem, że wciąż nie mam pojęcia, co piszę. – amalloy