2012-01-21 18 views
13

Zauważyłem, że zeromq gniazdo PUB będzie buforuje wszystkie dane wychodzące jeśli jest podłączenie npZeroMQ gniazdo PUB buforuje całe wychodzącej danych, gdy jest łączący

import zmq 
import time 
context = zmq.Context() 

# create a PUB socket 
pub = context.socket (zmq.PUB) 
pub.connect("tcp://127.0.0.1:5566") 
# push some message before connected 
# they should be dropped 
for i in range(5): 
    pub.send('a message should not be dropped') 

time.sleep(1) 

# create a SUB socket 
sub = context.socket (zmq.SUB) 
sub.bind("tcp://127.0.0.1:5566") 
sub.setsockopt(zmq.SUBSCRIBE, "") 

time.sleep(1) 

# this is the only message we should see in SUB 
pub.send('hi') 

while True: 
    print sub.recv() 

Sub wiąże po tych wiadomościach, że powinien zostać usunięty, ponieważ PUB powinien usuwać wiadomości, jeśli nikt do niego nie łączy. Ale zamiast upuszczać wiadomości, buforuje wszystkie wiadomości.

a message should not be dropped 
a message should not be dropped 
a message should not be dropped 
a message should not be dropped 
a message should not be dropped 
hi 

Jak widać, te „wiadomości nie powinny zostać pominięte” są buforowane przez gniazdo, raz robi podłączeniu go opróżnić je do gniazda SUB. Jeśli zwiążę gniazdo PUB i połączę się z gniazdem SUB, to działa poprawnie.

import zmq 
import time 
context = zmq.Context() 

# create a PUB socket 
pub = context.socket (zmq.PUB) 
pub.bind("tcp://127.0.0.1:5566") 
# push some message before connected 
# they should be dropped 
for i in range(5): 
    pub.send('a message should not be dropped') 

time.sleep(1) 

# create a SUB socket 
sub = context.socket (zmq.SUB) 
sub.connect("tcp://127.0.0.1:5566") 
sub.setsockopt(zmq.SUBSCRIBE, "") 

time.sleep(1) 

# this is the only message we should see in SUB 
pub.send('hi') 

while True: 
    print repr(sub.recv()) 

i można zobaczyć tylko wyjście

'hi' 

Takie dziwne zachowanie powodować problemu, to buforuje wszystkie dane na gnieździe łączącym, mam dwa serwery, Serwer publikuje danych na serwerze B

Server A -- publish --> Server B 

Działa dobrze, jeśli serwer B zostanie podłączony do trybu online. Ale co jeśli uruchomię serwer A i nie uruchomię serwera B?

W rezultacie, łączące gniazdo PUB na serwerze A przechowuje wszystkie te dane, zużycie pamięci staje się coraz wyższe.

Oto problem, czy tego rodzaju zachowanie jest błędem lub funkcją? Jeśli jest to funkcja, gdzie mogę znaleźć dokument wspominający to zachowanie? I w jaki sposób mogę przerwać podłączanie gniazda PUB do wszystkich danych?

Dzięki.

Odpowiedz

6

czy bloków gniazd lub spadnie wiadomości zależy od typu gniazda, jak to opisano w poniższym ZMQ::Socket documentation (podkreślenie moje):

ZMQ :: HWM: Pobierz high water mark

ZMQ: : Opcja HWM pobiera znacznik wysokiej wody dla określonego gniazda. Znak wysokiej wody jest twardym ograniczeniem maksymalnej liczby niezapisanych komunikatów 0MQ, które będzie umieszczać w kolejce w pamięci dla dowolnego pojedynczego elementu równorzędnego, z którym komunikuje się określone gniazdo.

Jeśli został osiągnięty limit ten gniazdo wchodzi wyjątkowy stan iw zależności od typu gniazda, 0MQ podejmuje odpowiednie działania takie jak blokowanie lub upuszczenie wysłane wiadomości. Odwołaj się do indywidualnych opisów gniazd w ZMQ :: Socket, aby uzyskać szczegółowe informacje na temat dokładnej operacji podejmowanej dla każdego typu gniazda.

Domyślna wartość ZMQ :: HWM wynosząca zero oznacza "brak limitu".

Można zobaczyć, czy będzie blokować lub wpaść pominie dokumentacji typu gniazdo dla ZMQ::HWM option action który będzie albo Block lub Drop.

Akcja dla ZMQ::PUB jest Drop, więc jeśli nie jest upuszczenie należy sprawdzić HWM (High Water Mark) wartość i zważali na ostrzeżenia, że ​​Domyślna wartość ZMQ :: HWM od zera oznacza „bez limitu”, oznacza to, że nie wejdzie w stan wyjątkowy, dopóki systemowi nie zabraknie pamięci (w którym to momencie nie wiem, jak się zachowuje).

+0

Wiem, że mogę ustawić HWM, aby ograniczyć liczbę wiadomości w buforze. Ale to nie rozwiązuje problemu, ale sposób, w jaki PUB obsługuje stan HWM, to zrzucanie nowych wiadomości. Oznacza to, że jeśli ustawisz HWM, tylko wiodące wiadomości będą przechowywane w buforze. To, co piszę, to system przesyłania strumieniowego audio. Tego rodzaju zachowanie sprawia, że ​​korzystanie z niego jest bardzo irytujące. Powiedzmy, że wysyłasz wiadomości [1, 2, 3, 4], a następnie HWM ustawiono na 2, wtedy gniazdo buforuje [1, 2] dla ciebie, wszystkie nowe wiadomości są usuwane. Ale w przypadku przesyłania strumieniowego dźwięku najważniejszą częścią są nowe nadchodzące dane. Czy istnieje sposób na dostosowanie sposobu, w jaki HWM odrzuca wiadomość? –

+0

Ach, więc masz na myśli zachowanie, które chciałbyś, żeby HWM było ustawione na 2 i wysyłałeś [1, 2, 3, 4], wtedy powinno spaść [1, 2] i zachować [3, 4], ale potem jeśli wysłałeś 5, powinien upuścić 3, a skończysz z [4, 5]? Nie sądzę, że takie zachowanie istnieje w ZMQ. – aculich

+0

To jest bardzo interesujące. Z pewnością możliwość odrzucenia "starszych" wiadomości byłaby konieczna w przypadku niektórych aplikacji (typowa jest telefonia IP). –

0

Tak więc bind() i connect() powodują dwa różne zachowania. Dlaczego po prostu nie wybierzesz tego, który preferujesz (wygląda na to, że bind()) i używasz tego?

Zasadniczo jest to funkcja ZeroMQ, która buforuje wychodzące wiadomości aż do nawiązania połączenia.

+0

Ponieważ mam wiele węzłów, które chcą publikować dane na jeden znany serwer. Oczywiście mogę wiązać po stronie PUB, ale w rezultacie potrzebuję adresu N dla każdego węzła, serwer nie wie, ile węzłów będzie. Myślę, że bind i connect nie powinny wpływać na zachowanie, kiedy połączenie jest nawiązywane, nie ma różnicy między bind i connect, to dlaczego robi różnicę? Nie rozumiem: S –

+0

Oh OK. Cóż, myślę, że ZeroMQ zachowuje się zgodnie z oczekiwaniami i zgodnie z założeniami, więc możesz przesłać zapytanie o połączenie przed wysłaniem danych. –

+0

@JohnZwinck Wybór 'bind()' vs 'connect()' nie jest oparty na preferencjach, ale powinien być oparty na sposobie jego użycia. Używa go poprawnie z 'bind()' na serwerze (wydawca) i 'connect()' na kliencie (subskrybencie). I nie zawsze buforuje wiadomości wychodzące, ale jest to określane przez typ gniazda i wartość znaku wysokiej wody jako [wyjaśnione tutaj w odniesieniu do dokumentacji] (http://stackoverflow.com/a/8958699/462302). – aculich

0

Powinieneś być w stanie ustawić wysoki znak wody w gnieździe za pomocą hwm ustawiania gniazda pub. Umożliwia zdefiniowanie liczby wiadomości.

4

Czuję, że to zachowanie jest semantyczną funkcją zmq_connect(). To znaczy: , gdy zmq_connect() zwraca sukces, a następnie połączenie jest konceptualnie ustanowione, a tym samym połączenie-PUB rozpoczyna wyświetlanie w kolejce wiadomości zamiast zrzucania.

Poniższy fragment z „ZMQ Guide” to aluzja do tego:

Teoretycznie z gniazd ØMQ, to nie ma znaczenia, którą stroną łączy, a który wiąże koniec. Jednak w przypadku gniazd PUB-SUB, jeśli podłączysz gniazdo SUB i połączysz gniazdo PUB, gniazdo SUB może odbierać stare wiadomości, tj. Wiadomości wysłane przed uruchomieniem SUB. To jest artefakt sposobu wiązania/łączenia działa. Najlepiej powiązać PUB i połączyć SUB, jeśli możesz.

dalszych częściach zmq_connect() podano wskazówek, jak pokazano poniżej:

Główne różnice w stosunku do konwencjonalnych gniazd

Ogólnie rzecz biorąc, w konwencjonalne oprawki stanowią synchronicznego interfejs albo połączeniowego niezawodny bajtowe strumienie (SOCK_STREAM) lub bezsolidne datagramy (SOCK_DGRAM). Dla porównania, gniazda ØMQ przedstawiają abstrakcję asynchronicznej kolejki komunikatów w postaci , z dokładną semantyką kolejkowania w zależności od używanego typu gniazda . W przypadku konwencjonalnych strumieni transferu gniazd bajtów lub dyskretnych datagramów, gniazda ØMQ przesyłają dyskretne wiadomości.

Gniazda ØMQ będące asynchroniczny oznacza, że ​​czasy fizycznej konfiguracji połączenie i zburzyć, podłącz i skuteczne dostawy są przezroczysty dla użytkownika i zorganizowany przez samego ØMQ. Ponadto, komunikaty mogą być umieszczane w kolejce w przypadku, gdy element równorzędny jest niedostępny do ich odbierania.

0

Oto hack, które mogą pomóc ...

Określ ZMQ::HWM do określonej liczby, powiedzmy 10.Po nawiązaniu połączenia wywołaj metodę recv gniazda w gnieździe, aż odrzuci wszystkie buforowane wiadomości, a dopiero NA uruchomi twoją główną pętlę odbiorczą.