2012-06-18 3 views
14

Co chciałbym zrobić, to mieć zestaw producentów goroutin (z których niektórzy mogą lub nie muszą się zakończyć) i rutyny konsumenckiej. Problem polega na tym zastrzeżeniu w nawiasach - nie znamy całkowitej liczby, która zwróci odpowiedź.Jaki jest najładniejszy idiom dla producenta/konsumenta w Go?

Więc to, co chcę zrobić to w ten sposób:

package main 

import (
    "fmt" 
    "math/rand" 
) 

func producer(c chan int) { 
    // May or may not produce. 
    success := rand.Float32() > 0.5 
    if success { 
    c <- rand.Int() 
    } 
} 

func main() { 
    c := make(chan int, 10) 
    for i := 0; i < 10; i++ { 
    go producer(c, signal) 
    } 

    // If we include a close, then that's WRONG. Chan will be closed 
    // but a producer will try to write to it. Runtime error. 
    close(c) 

    // If we don't close, then that's WRONG. All goroutines will 
    // deadlock, since the range keyword will look for a close. 
    for num := range c { 
    fmt.Printf("Producer produced: %d\n", num) 
    } 
    fmt.Println("All done.") 
} 

Więc problem jest, jeśli zamknę to się stało, gdybym nie blisko - to jeszcze nie tak (patrz komentarze w kodzie).

Teraz rozwiązaniem byłoby out-of-band kanałowy sygnał, że wszyscy producenci pisać na adres:

package main 

import (
    "fmt" 
    "math/rand" 
) 

func producer(c chan int, signal chan bool) { 
    success := rand.Float32() > 0.5 
    if success { 
    c <- rand.Int() 
    } 
    signal <- true 
} 

func main() { 
    c := make(chan int, 10) 
    signal := make(chan bool, 10) 
    for i := 0; i < 10; i++ { 
    go producer(c, signal) 
    } 

    // This is basically a 'join'. 
    num_done := 0 
    for num_done < 10 { 
    <- signal 
    num_done++ 
    } 
    close(c) 

    for num := range c { 
    fmt.Printf("Producer produced: %d\n", num) 
    } 
    fmt.Println("All done.") 
} 

I to zupełnie nie to, co chcę! Ale dla mnie wydaje się, że to kęs. Moje pytanie brzmi: czy istnieje jakiś idiom/sztuczka, która pozwala mi zrobić coś podobnego w łatwiejszy sposób?

miałem okiem tutaj: http://golang.org/doc/codewalk/sharemem/ I wydaje się, że complete chan (zainicjowanego na początku main) stosuje się w zakresie, ale nigdy nie została zamknięta. Nie rozumiem jak.

Jeśli ktoś ma jakiekolwiek spostrzeżenia, byłbym bardzo wdzięczny. Twoje zdrowie!


Edit: fls0815 ma odpowiedzi, a także odpowiedzi na pytanie, w jaki sposób działa zasięg kanał zbliżenie mniej.

Mój kod powyżej zmodyfikowaliśmy do pracy (zrobione przed fls0815 kodu uprzejmie dostarczonego):

package main 

import (
    "fmt" 
    "math/rand" 
    "sync" 
) 

var wg_prod sync.WaitGroup 
var wg_cons sync.WaitGroup 

func producer(c chan int) { 
    success := rand.Float32() > 0.5 
    if success { 
    c <- rand.Int() 
    } 
    wg_prod.Done() 
} 

func main() { 
    c := make(chan int, 10) 
    wg_prod.Add(10) 
    for i := 0; i < 10; i++ { 
    go producer(c) 
    } 

    wg_cons.Add(1) 
    go func() { 
    for num := range c { 
     fmt.Printf("Producer produced: %d\n", num) 
    } 
    wg_cons.Done() 
    }() 

    wg_prod.Wait() 
    close(c) 
    wg_cons.Wait() 
    fmt.Println("All done.") 
} 

Odpowiedz

14

tylko producenci powinni zamknąć kanały. Możesz osiągnąć swój cel, wywołując konsumenta (ów), który iteruje (range) nad kanałem wynikowym po rozpoczęciu produkcji. W swoim głównym wątku czekasz (patrz: sync.WaitGroup), dopóki twoi konsumenci/producenci nie zakończą swojej pracy. Po zakończeniu produkcji zamknij wynikowy kanał, który zmusi twoich klientów do wyjścia (range zakończy działanie po zamknięciu kanałów i braku zbuforowanego elementu).

Przykład Kod:

package main 

import (
    "log" 
    "sync" 
    "time" 
    "math/rand" 
    "runtime" 
) 

func consumer() { 
    defer consumer_wg.Done() 

    for item := range resultingChannel { 
     log.Println("Consumed:", item) 
    } 
} 

func producer() { 
    defer producer_wg.Done() 

    success := rand.Float32() > 0.5 
    if success { 
     resultingChannel <- rand.Int() 
    } 
} 

var resultingChannel = make(chan int) 
var producer_wg sync.WaitGroup 
var consumer_wg sync.WaitGroup 

func main() { 
    rand.Seed(time.Now().Unix()) 

    for c := 0; c < runtime.NumCPU(); c++ { 
     producer_wg.Add(1) 
     go producer() 
    } 

    for c := 0; c < runtime.NumCPU(); c++ { 
     consumer_wg.Add(1) 
     go consumer() 
    } 

    producer_wg.Wait() 

    close(resultingChannel) 

    consumer_wg.Wait() 
} 

Powodem umieścić close -statement do podstawowej funkcji, ponieważ mamy więcej niż jednego producenta. Zamknięcie kanału u jednego producenta w powyższym przykładzie doprowadziłoby do problemu, na który już natknąłeś się (pisanie na zamkniętych kanałach, z tego powodu, że mógł zostać jeden producent, który nadal tworzy dane). Kanały powinny być zamykane tylko wtedy, gdy nie ma już producenta (dlatego moja sugestia o zamknięciu kanału tylko przez producenta). W ten sposób konstruowane są kanały w Go. Here znajdziesz więcej informacji na temat zamykania kanałów.


Podobne do przykładu sharemem: AFAICS przykład ten biegnie przez niekończące się ponownie i ponownie-kolejkowanie Resources (od oczekiwaniu -> pełna -> oczekiwaniu -> pełna ... i tak dalej). To właśnie robi iteracja pod koniec głównego func. Odbiera ukończone zasoby i kolejkuje je ponownie za pomocą funkcji Resource.Sleep() do oczekujących. Kiedy nie ma ukończonego zasobu, czeka i blokuje nowe zasoby. Dlatego nie ma potrzeby zamykania kanałów, ponieważ są one cały czas używane.

+0

Cześć, dzięki za odpowiedź - to rzeczywiście to, czego szukasz. Czy możesz rozszerzyć swoją sugestię, że "Tylko producenci powinni zamknąć kanały". - Brzmi to jak zasada zdrowego rozsądku/zasady kodowania, ale zastanawiałem się, czy był też jakiś techniczny powód (ponieważ przykładowy kod, którego lista zawiera główną funkcję zamykającą kanał). Dzięki jeszcze raz! – Will

+1

Dodałem trochę więcej informacji. HTH. – fls0815

+0

Ahhh, to ma sens. Pomyślałem, że może to być trudna zasada - w której każdy producent musiałby sprawdzić, czy można zamknąć kanał (dlatego ostatni, który kończy, zamyka go). To oczywiście dużo bardziej nieprzyjemne (z bardziej niepotrzebnymi kontrolami) niż zamykanie go w main() w naszych przykładach, ale martwiłem się, że to jest sposób na robienie rzeczy (z jakiegoś powodu byłem nieświadomy). Nadal staram się poznać styl najlepszych praktyk. – Will

0

Zawsze istnieje wiele sposobów rozwiązania tych problemów. Oto rozwiązanie wykorzystujące proste kanały synchroniczne, które są fundamentalne w Go. Bez zbuforowanych kanałów, bez kanałów zamykających, bez grup WaitGroups.

To naprawdę nie jest tak daleko od twojego "kęgo" rozwiązania, i - przykro mi rozczarować - nie o wiele mniejszy. Wprowadza konsumenta w jego własną goroutine, aby konsument mógł konsumować numery, jak produkuje je producent. Rozróżnia także, że "próba" produkcji może zakończyć się sukcesem lub porażką. Jeśli produkcja się nie powiedzie, próba zostanie wykonana natychmiast. Jeśli się powiedzie, próba nie zostanie zakończona, dopóki liczba nie zostanie zużyta.

package main 

import (
    "fmt" 
    "math/rand" 
) 

func producer(c chan int, fail chan bool) { 
    if success := rand.Float32() > 0.5; success { 
     c <- rand.Int() 
    } else { 
     fail <- true 
    } 
} 

func consumer(c chan int, success chan bool) { 
    for { 
     num := <-c 
     fmt.Printf("Producer produced: %d\n", num) 
     success <- true 
    } 
} 

func main() { 
    const nTries = 10 
    c := make(chan int) 
    done := make(chan bool) 
    for i := 0; i < nTries; i++ { 
     go producer(c, done) 
    } 
    go consumer(c, done) 

    for i := 0; i < nTries; i++ { 
     <-done 
    } 
    fmt.Println("All done.") 
} 
0

Dodaję to dlatego, że istniejące odpowiedzi nie zrobić kilka rzeczy oczywiste. Po pierwsze, pętla zasięgu w przykładzie kodu źródłowego jest po prostu nieskończoną pętlą zdarzeń, aby na nowo sprawdzać i aktualizować tę samą listę adresów URL.

Następnie kanał, sam w sobie, już jest idiomatyczną kolejką konsumenta-producenta w Go. Rozmiar bufora asynchronicznego wspierającego kanał określa, ile producenci mogą wyprodukować przed uzyskaniem przeciwciśnienia. Ustaw N = 0 poniżej, aby zobaczyć konsumenta z blokadą kroku, bez nikogo, kto będzie ścigał się z wyprzedzeniem lub wyprzedzał. Jak to jest, N = 10 pozwoli producentowi wyprodukować do 10 produktów przed zablokowaniem.

Wreszcie, istnieje kilka fajnych idiomów do pisania, które komunikują sekwencyjne procesy w Go (np. Funkcje, które uruchamiają dla ciebie procedury i używają wzoru for/select do komunikowania się i akceptowania poleceń sterujących). Uważam, że WaitGroups jest niezgrabne i chciałbym zamiast tego zobaczyć przykłady idiomatyczne.

package main 

import (
    "fmt" 
    "time" 
) 

type control int 
const (
    sleep control = iota 
    die // receiver will close the control chan in response to die, to ack. 
) 

func (cmd control) String() string { 
    switch cmd { 
    case sleep: return "sleep" 
    case die: return "die" 
    } 
    return fmt.Sprintf("%d",cmd) 
} 

func ProduceTo(writechan chan<- int, ctrl chan control, done chan bool) { 
    var product int 
    go func() { 
     for { 
      select { 
     case writechan <- product: 
      fmt.Printf("Producer produced %v\n", product) 
      product++ 
     case cmd:= <- ctrl: 
      fmt.Printf("Producer got control cmd: %v\n", cmd) 
      switch cmd { 
      case sleep: 
       fmt.Printf("Producer sleeping 2 sec.\n") 
       time.Sleep(2000 * time.Millisecond) 
      case die: 
       fmt.Printf("Producer dies.\n") 
       close(done) 
       return 
      } 
      } 
     } 
    }() 
} 

func ConsumeFrom(readchan <-chan int, ctrl chan control, done chan bool) { 
    go func() { 
     var product int 
     for { 
      select { 
      case product = <-readchan: 
       fmt.Printf("Consumer consumed %v\n", product) 
      case cmd:= <- ctrl: 
       fmt.Printf("Consumer got control cmd: %v\n", cmd) 
       switch cmd { 
       case sleep: 
        fmt.Printf("Consumer sleeping 2 sec.\n") 
        time.Sleep(2000 * time.Millisecond) 
       case die: 
        fmt.Printf("Consumer dies.\n") 
        close(done) 
        return 
       } 

      } 
     } 
    }() 
} 

func main() { 

    N := 10 
    q := make(chan int, N) 

    prodCtrl := make(chan control) 
    consCtrl := make(chan control) 

    prodDone := make(chan bool) 
    consDone := make(chan bool) 


    ProduceTo(q, prodCtrl, prodDone) 
    ConsumeFrom(q, consCtrl, consDone) 

    // wait for a moment, to let them produce and consume 
    timer := time.NewTimer(10 * time.Millisecond) 
    <-timer.C 

    // tell producer to pause 
    fmt.Printf("telling producer to pause\n") 
    prodCtrl <- sleep 

    // wait for a second 
    timer = time.NewTimer(1 * time.Second) 
    <-timer.C 

    // tell consumer to pause 
    fmt.Printf("telling consumer to pause\n") 
    consCtrl <- sleep 


    // tell them both to finish 
    prodCtrl <- die 
    consCtrl <- die 

    // wait for that to actually happen 
    <-prodDone 
    <-consDone 
} 
0

Można używać prostych kanałów niebuforowane bez grup poczekać, jeśli użyć wzorca generator z funkcją Fanin.

W schemacie generatora każdy producent zwraca kanał i odpowiada za jego zamknięcie. Funkcja fanIn następnie iteruje na tych kanałach i przekazuje wartości zwrócone na nich w dół do pojedynczego kanału, który zwraca.

Problem oczywiście polega na tym, że funkcja fanIn przekazuje wartość zerową typu kanału (int), gdy każdy kanał jest zamknięty.

Można obejść go, używając wartości zerowej typu kanału jako wartości wskaźnika i wykorzystując tylko wyniki z kanału FanIn, jeśli nie są one wartością zerową.

Oto przykład:

package main 

import (
    "fmt" 
    "math/rand" 
) 

const offset = 1 

func producer() chan int { 
    cout := make(chan int) 
    go func() { 
     defer close(cout) 
     // May or may not produce. 
     success := rand.Float32() > 0.5 
     if success { 
      cout <- rand.Int() + offset 
     } 
    }() 
    return cout 
} 

func fanIn(cin []chan int) chan int { 
    cout := make(chan int) 
    go func() { 
     defer close(cout) 
     for _, c := range cin { 
      cout <- <-c 
     } 
    }() 
    return cout 
} 

func main() { 
    chans := make([]chan int, 0) 
    for i := 0; i < 10; i++ { 
     chans = append(chans, producer()) 
    } 

    for num := range fanIn(chans) { 
     if num > offset { 
      fmt.Printf("Producer produced: %d\n", num) 
     } 
    } 
    fmt.Println("All done.") 
}