2013-08-27 9 views
14

Powiedzmy, że mam prosty model producenta/konsumenta, w którym konsument chce przekazać producentowi pewne państwo. Na przykład, niech obiekty przepływające dalej będą obiektami, które chcemy zapisać do pliku, a obiektami upstream będzie jakiś token reprezentujący miejsce, w którym obiekt został zapisany w pliku (na przykład przesunięcie).Idiomatyczne lampy dwukierunkowe z dalszym stanem bez utraty

Te dwa procesy może wyglądać następująco (z pipes-4.0)

{-# LANGUAGE GeneralizedNewtypeDeriving #-} 

import Pipes 
import Pipes.Core 
import Control.Monad.Trans.State  
import Control.Monad 

newtype Object = Obj Int 
       deriving (Show) 

newtype ObjectId = ObjId Int 
       deriving (Show, Num) 

writeObjects :: Proxy ObjectId Object() X IO r 
writeObjects = evalStateT (forever go) (ObjId 0) 
    where go = do i <- get 
       obj <- lift $ request i 
       lift $ lift $ putStrLn $ "Wrote "++show obj 
       modify (+1) 

produceObjects :: [Object] -> Proxy X() ObjectId Object IO() 
produceObjects = go 
    where go [] = return() 
     go (obj:rest) = do 
      lift $ putStrLn $ "Producing "++show obj 
      objId <- respond obj 
      lift $ putStrLn $ "Object "++show obj++" has ID "++show objId 
      go rest 

objects = [ Obj i | i <- [0..10] ] 

Proste jak to może być, miałem uczciwej trochę trudności rozumowanie o tym, jak je skomponować. Idealnie, którą chcemy przepływ push-oparty kontroli jak na poniższej ilustracji,

  1. writeObjects rozpoczyna się od blokowania na request, wysławszy początkową ObjId 0 upstream.
  2. produceObjects wysyła pierwszy obiekt, Obj 0, dalszego
  3. writeObjects pisze przedmiotu i zwiększa swój stan, i czeka na request, tym razem wysyłając ObjId 1 upstream
  4. respond w produceObjects zwrotów z ObjId 0
  5. produceObjects kontynuuje w kroku (2) z drugim obiektem, Obj 1

Moja pierwsza próba to ze składu push-oparty następująco,

main = void $ run $ produceObjects objects >>~ const writeObjects 

Uwaga wykorzystanie const aby obejść inaczej niezgodnych typów (jest to prawdopodobne, gdzie leży problem). W tym przypadku jednak, okazuje się, że ObjId 0 zostanie zjedzone,

Producing Obj 0 
Wrote Obj 0 
Object Obj 0 has ID ObjId 1 
Producing Obj 1 
... 

Podejście oparte pociągowy,

main = void $ run $ const (produceObjects objects) +>> writeObjects 

cierpi podobny problem, tym razem spada Obj 0.

Jak można skomponować te elementy w pożądany sposób?

Odpowiedz

14

Wybór składu do użycia zależy od tego, który składnik powinien zainicjować cały proces. Jeśli chcesz, aby kolejna rura zainicjowała proces, chcesz użyć kompilacji opartej na ciągach (np. (>+>)/(+>>)), ale jeśli chcesz, aby rura przednia zainicjowała proces, powinieneś użyć kompozycji opartej na push (tj. (>>~)/(>~>)) . Błędy typu, które otrzymałeś, ostrzegały cię, że w twoim kodzie jest błąd logiczny: nie określiłeś wyraźnie, który komponent inicjuje proces jako pierwszy.

Z Twojego opisu oczywiste jest, że chcesz, aby przepływ sterowania rozpoczynał się od produceObjects, więc chcesz użyć kompozycji opartej na trybie push. Gdy użyjesz kompozycji opartej na push, typ operatora kompozycji powie Ci wszystko, co musisz wiedzieć o tym, jak naprawić swój kod.Wezmę jego typ i specjalizują się go do łańcucha składzie:

-- Here I'm using the `Server` and `Client` type synonyms to simplify the types 
(>>~) :: Server ObjectId Object IO() 
     -> (Object -> Client ObjectId Object IO()) 
     -> Effect IO() 

Jak już zauważyłem, błąd typu masz kiedy próbował użyć (>>~) mówiłem, że brakuje argumentu typu Object do listy writeObjects funkcja. To statycznie wymusza, że ​​nie można uruchomić żadnego kodu w writeObjects przed otrzymaniem pierwszego Object (przez początkowy argument).

Rozwiązaniem jest przepisać swoją funkcję writeObjects tak:

writeObjects :: Object -> Proxy ObjectId Object() X IO r 
writeObjects obj0 = evalStateT (go obj0) (ObjId 0) 
    where go obj = do i <- get 
        lift $ lift $ putStrLn $ "Wrote "++ show obj 
        modify (+1) 
        obj' <- lift $ request i 
        go obj' 

To wtedy daje poprawne zachowanie:

>>> run $ produceObjects objects >>~ writeObjects 
Producing Obj 0 
Wrote Obj 0 
Object Obj 0 has ID ObjId 0 
Producing Obj 1 
Wrote Obj 1 
Object Obj 1 has ID ObjId 1 
Producing Obj 2 
Wrote Obj 2 
Object Obj 2 has ID ObjId 2 
Producing Obj 3 
Wrote Obj 3 
Object Obj 3 has ID ObjId 3 
Producing Obj 4 
Wrote Obj 4 
Object Obj 4 has ID ObjId 4 
Producing Obj 5 
Wrote Obj 5 
Object Obj 5 has ID ObjId 5 
Producing Obj 6 
Wrote Obj 6 
Object Obj 6 has ID ObjId 6 
Producing Obj 7 
Wrote Obj 7 
Object Obj 7 has ID ObjId 7 
Producing Obj 8 
Wrote Obj 8 
Object Obj 8 has ID ObjId 8 
Producing Obj 9 
Wrote Obj 9 
Object Obj 9 has ID ObjId 9 
Producing Obj 10 
Wrote Obj 10 
Object Obj 10 has ID ObjId 10 

Można się zastanawiać, dlaczego ten wymóg, że jeden z dwóch rur trwa początkowy argument ma sens, poza abstrakcyjnym uzasadnieniem, że tak wymaga prawo kategorii. Zwykłe angielskie wyjaśnienie jest takie, że alternatywą jest to, że będziesz potrzebował bufora pierwszego przesłanego Object "pomiędzy" dwiema rurami zanim writeObjects osiągnie pierwsze oświadczenie request. Takie podejście prowadzi do wielu problematycznych zachowań i naruszeń, ale prawdopodobnie najważniejszym problemem jest to, że skład rur nie byłby już skojarzony, a kolejność efektów zmieniałaby się w zależności od kolejności, w jakiej komponowałeś.

Zaletą dwukierunkowych operatorów składu rur jest to, że typy działają tak, że zawsze można wywnioskować, czy dany składnik jest "aktywny" (tj. Inicjuje sterowanie), czy też "pasywny" (tzn. Oczekuje na dane wejściowe). przez badanie typu. Jeśli kompozycja mówi, że pewna rura (taka jak writeObjects) musi przyjąć argument, to jest pasywna. Jeśli nie przyjmuje żadnego argumentu (np. produceObjects), to jest aktywny i inicjuje kontrolę. Tak więc kompozycja zmusza cię do posiadania co najwyżej jednej aktywnej rury w twoim rurociągu (rura, która nie bierze początkowego argumentu) i to jest rura, która rozpoczyna kontrolę.

4

"Stały są tam, gdzie upuszczasz dane. W celu uzyskania wszystkich danych, prawdopodobnie chcesz zrobić push-workflow oparty następująco:

writeObjects :: Object -> Proxy ObjectId Object() X IO r 
writeObjects obj = go 0 obj 
    where 
    go objid obj = do 
     lift $ putStrLn $ "Wrote "++show obj 
     obj' <- request objid 
     go (objid + 1) obj' 

-- produceObjects as before 

main = void $ run $ produceObjects objects >>~ writeObjects 
2

Byliśmy dyskusji to na liście mailingowej, ale pomyślałem, rzucam go tutaj jako dobrze dla tych, którzy są zainteresowani.

Twój problem polega na tym, że masz dwóch uczestników, którzy są gotowi wypluć między sobą wartości. Żadne z nich nie potrzebuje wejścia drugiego w celu wytworzenia wartości. Więc kto pierwszy idzie? Dobrze Mówi się:

writeObjects rozpoczyna się od blokowania na żądanie, wysławszy początkowa ObjId 0 upstream

Dobrze więc, co oznacza, że ​​musimy opóźnić produceObjects tak, że czeka na sygnał ObjId przed wypluwanie odpowiedniego obiektu (nawet jeśli najwyraźniej nie wymaga tego ID).

Zanurzenie w wewnętrznych elementach proxy, tutaj jest magiczna inkantacja, której nie będę się starał wyjaśnić bardzo dokładnie w tym momencie.Podstawowym założeniem jest po prostu wziąć wejście przed jej potrzebujesz, a następnie zastosować wejście, gdy są potrzebne, ale potem udawać, że potrzebny jest nowy wkład (choć nie trzeba że jeden jeszcze):

delayD :: (Monad m) => Proxy a' a b' b m r -> b' -> Proxy a' a b' b m r 
delayD p0 b' = case p0 of 
    Request a' f -> Request a' (go . f) 
    Respond b g -> Respond b (delayD (g b')) 
    M m   -> M (liftM go m) 
    Pure r  -> Pure r 
    where 
    go p = delayD p b' 

teraz można to wykorzystać na produceObjects objects zamiast const, a druga próba działa zgodnie z oczekiwaniami:

delayD (produceObjects objects) +>> writeObjects 

Jesteśmy omawiania delayD na liście mailingowej, aby zobaczyć, czy włączenie zasługuje w standardowych rur repertuaru.