2016-12-23 69 views
16

Napisałem bibliotekę o nazwie amqp-worker, która udostępnia funkcję o nazwie worker, która odpytuje kolejkę wiadomości (np. RabbitMQ) dla wiadomości, wywołując procedurę obsługi po znalezieniu wiadomości. Następnie wraca do odpytywania.Usterka pamięci w rekurencyjnej funkcji IO - PAP

To cieknąca pamięć. Wyprofilowałem go, a na wykresie jest napisane, że sprawcą jest aplikacja o częściowym działaniu. Gdzie jest wyciek z mojego kodu? Jak mogę uniknąć nieszczelności podczas zapętlenia w IO z forever?

enter image description here

Oto kilka istotnych funkcji. The full source is here.

Example Program. Jest to wyciek

main :: IO() 
main = do 
    -- connect 
    conn <- Worker.connect (fromURI "amqp://guest:[email protected]:5672") 

    -- initialize the queues 
    Worker.initQueue conn queue 
    Worker.initQueue conn results 

    -- publish a message 
    Worker.publish conn queue (TestMessage "hello world") 

    -- create a worker, the program loops here 
    Worker.worker def conn queue onError (onMessage conn) 

worker

worker :: (FromJSON a, MonadBaseControl IO m, MonadCatch m) => WorkerOptions -> Connection -> Queue key a -> (WorkerException SomeException -> m()) -> (Message a -> m()) -> m() 
worker opts conn queue onError action = 
    forever $ do 
    eres <- consumeNext (pollDelay opts) conn queue 
    case eres of 
     Error (ParseError reason bd) -> 
     onError (MessageParseError bd reason) 

     Parsed msg -> 
     catch 
      (action msg) 
      (onError . OtherException (body msg)) 
    liftBase $ threadDelay (loopDelay opts) 

consumeNext

consumeNext :: (FromJSON msg, MonadBaseControl IO m) => Microseconds -> Connection -> Queue key msg -> m (ConsumeResult msg) 
consumeNext pd conn queue = 
    poll pd $ consume conn queue 

poll

poll :: (MonadBaseControl IO m) => Int -> m (Maybe a) -> m a 
poll us action = do 
    ma <- action 
    case ma of 
     Just a -> return a 
     Nothing -> do 
     liftBase $ threadDelay us 
     poll us action 
+0

Co twoja wersja ghc i jak się kompilujesz? – jberryman

+1

Jest ustawiony na lts-7.3, więc jest to GHC 8.0.1. Kompiluję z instalacją stosu --profile. Ale mam wyciek pamięci przy normalnej instalacji stosu. Używanie domyślnych opcji GHC z szablonu stosu: -threaded -rtsopts -with-rtsopts = -N –

+2

Ten przykład jest bardzo daleki od minimalnego - importujesz całą swoją bibliotekę ('Network.AMQP.Worker') w swoim przykładowym programie. W obecnej formie jest to zbyt szeroki zakres. – user2407038

Odpowiedz

14

Oto bardzo prosty przykład, który pokazuje problem:

main :: IO() 
main = worker 

{-# NOINLINE worker #-} 
worker :: (Monad m) => m() 
worker = 
    let loop = poll >> loop 
    in loop 

poll :: (Monad m) => m a 
poll = return() >> poll 
If you remove the `NOINLINE`, or specialize `m` to 
`IO` (while compiling with `-O`), the leak goes away. 

napisałem szczegółowy blog post o tym, dlaczego dokładnie ten kod przecieki pamięci. Szybkie podsumowanie jest, jak Reid zaznacza w swoim odpowiedź, że kod tworzy i zapamiętuje łańcuch częściowych zastosowań >> s.

ja również złożył ghc ticket na ten temat.

3

Wyciek pamięci była poll. Używając monad-loops, zmieniłem definicję na: Wygląda na to, że untilJust robi to samo co moja rekursja, ale naprawia przeciek.

Czy ktoś komentarz, dlaczego moja poprzednia definicja poll był wyciek pamięci?

{-# LANGUAGE FlexibleContexts #-} 

module Network.AMQP.Worker.Poll where 

import Control.Concurrent (threadDelay) 
import Control.Monad.Trans.Control (MonadBaseControl) 
import Control.Monad.Base (liftBase) 
import Control.Monad.Loops (untilJust) 

poll :: (MonadBaseControl IO m) => Int -> m (Maybe a) -> m a 
poll us action = untilJust $ do 
    ma <- action 
    case ma of 
     Just a -> return $ Just a 
     Nothing -> do 
     liftBase $ threadDelay us 
     return Nothing 
4

Może łatwiej przykład do zrozumienia jest to jeden

main :: IO() 
main = let c = count 0 
     in c >> c 

{-# NOINLINE count #-} 
count :: Monad m => Int -> m() 
count 1000000 = return() 
count n = return() >> count (n+1) 

Ocenianie f >> g dla działań IO rentowności pewnego rodzaju zamknięcie, które ma odniesienia do obu f i g (to w zasadzie skład f i g jako funkcji na żetony państwowe). count 0 zwraca thunk c, który oceni dużą strukturę zamknięć w postaci return() >> return() >> return() >> .... Kiedy wykonujemy c budujemy tę strukturę, a ponieważ musimy wykonać po raz drugi c cała struktura jest nadal aktywna. Tak więc ten program wycieka z pamięci (niezależnie od flag optymalizacyjnych).

Gdy count specjalizuje się w IO i włączone są optymalizacje, GHC udostępnia różne sztuczki, aby uniknąć budowania tej struktury danych; ale wszyscy polegają na tym, że wiedzą, że monada to IO.

Wracając do pierwotnego count :: Monad m => Int -> m(), możemy starać się unikać budowania tego wielkiego strukturę zmieniając ostatnią linię do

count n = return() >>= (\_ -> count (n+1)) 

Teraz wywołanie rekurencyjne jest ukryty wewnątrz lambda, więc c jest tylko niewielka struktura return() >>= (\_ -> BODY) . To faktycznie pozwala uniknąć wycieku przestrzeni podczas kompilacji bez optymalizacji. Jednak, gdy są włączone optymalizacje, GHC unosi się count (n+1) z ciała lambda (ponieważ nie zależy na argumencie) produkujące

count n = return() >>= (let body = count (n+1) in \_ -> body) 

i teraz c jest duża struktura ponownie ...

+0

W jaki sposób użycie 'NOINLINE' powoduje, że program jest porównywalny z oryginalnym, nieszczelnym? – Michael

+0

GHC nie oznacza ani nie specjalizuje się w ogólnym przypadku (gdy funkcja jest zdefiniowana w innym module, nie jest mała itp.) GHC zna wiele lew, a kiedy zminimalizujesz te sztuczki, może się pojawić. Używanie 'NOINLINE' zatrzymuje wiele z tych sztuczki pozwalają zminimalizować. –