2017-02-07 64 views
6

Niedawno robię eksperyment w projekcie GIT, aby zrozumieć ramy przetwarzania dużych danych.Seler i Rabbitmq: OSTRZEŻENIE/MainProcess] Otrzymano i usunięto nieznaną wiadomość. Niepoprawny cel?!? - eksperyment na GIT

1, projekt GIT: https://github.com/esperdyne/celery-message-processing

mamy następujące składniki:

1, AMPQ broker (RabbitMQ): działa jako bufor komunikatu, który pracuje jako skrzynkę pocztową do wymiany wiadomości dla różnych użytkowników!

2, pracownik: działa jako serwer usług w celu świadczenia usług dla różnych klientów usług. 3 kolejka („seler”: działa jako kontener multi-przetwórczego, który jest używany do obsługi różnych przypadkach pracownik jednocześnie

klucz konfiguracja może być postrzegane jako mieszkiem.

Używamy obiekt proj/celery.py zdefiniować aplikację, definicja może być postrzegane jak poniżej:

app = Celery('proj', 
     broker='amqp://', 
     backend='redis://localhost', 
     include=['proj.tasks']) 

wpisać kod tutaj

kiedy uruchomić aplikację:

1, kiedy uruchamiamy aplikację, widzieliśmy komunikat wygenerowany z rabbitmq, ale seler nie mógł obsłużyć wiadomości.

Plik Parse.log wygląda następująco: [2017-02-04 14: 28: 06,909: OSTRZEŻENIE/MainProcess] Otrzymano i usunięto nieznany komunikat. Zły cel podróży?!?

mamy następujące pytanie:

4.2.1 mechanizm AMQP enter image description here Widzimy, że AMQP działa jako bufor komunikatów, a następnie nie będzie nadawca wiadomości i fetcher wiadomość:

Na powyższym diagramie, kto jest nadawcą wiadomości i kto jest dostawcą wiadomości.

4.2.2 Definicja komunikatu W naszej aplikacji nie można znaleźć kodu do zdefiniowania komunikatu do wysłania lub otrzymania formularza AMQP.

4.2.3 Monitor wiadomości Jak możemy monitorować wysyłanie i odbieranie wiadomości w AMQP. Mam nadzieję, że nauczyciel poprowadzi nas do rozwiązania problemu i przedstawi nam szczegółowe wprowadzenie na temat interwencji maklera brokerskiego w celu uzyskania szczegółowych informacji na temat:

!

uwaga: dziennik błędów można zobaczyć tutaj

[2017-02-04 14:28:06,909: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!? 



The full contents of the message body was: body: [[u'maildir/allen-  p/inbox/1.'], {}, {u'errbacks': None, u'callbacks': None, u'chord': None, u'chain': [{u'chord_size': None, u'task': u'celery.group', u'args': [], u'immutable': False, u'subtask_type': u'group', u'kwargs': {u'tasks': [{u'chord_size': None, u'task': u'proj.tasks.deploy_db', u'args': [], u'options': {u'reply_to': u'3d9de118-f9d0-3bee-9972-b6a4d4482446', u'task_id': u'3cafda16-3e7c-44db-b05e-1327ef97ffc3'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}, {u'chord_size': None, u'task': u'proj.tasks.deploy_es', u'args': [], u'options': {u'reply_to': u'3d9de118-f9d0-3bee-9972-b6a4d4482446', u'task_id': u'1f4c728b-680d-4dde-98b9-b153d5282780'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}]}, u'options': {u'parent_id': None, u'task_id': u'f21c911e-f2ac-462e-9662-2efbd27bcf91', u'root_id': None}}]}] (801b) 
{content_type:'application/json' content_encoding:'utf-8' 
    delivery_info:{'consumer_tag': 'None4', 'redelivered': False, 'routing_key': 'parse', 'delivery_tag': 623422L, 'exchange': ''} headers={'\xe5\xca.\xdb\x00\x00\x00\x00\x00': None, 'P&5\x07\x00': None, 'T\nKB\x00\x00\x00': 'fc8f0bed-665f-4699-89dd-a56fc247ea8b', 'N\xfd\x17=\x00\x00': '[email protected]', '\xcfb\xddR': 'py', '9*\xa8': None, '\xb7/b\x84\x00\x00\x00': 0, '\xe0\x0b\xfa\x89\x00\x00\x00': None, '\xdfR\xc4x\x00\x00\x00\x00\x00': [None, None], 'T3\x1d ': 'proj.tasks.parse', '\xae\xbf': 'fc8f0bed-665f-4699-89dd-a56fc247ea8b', '\x11s\x1f\xd8\x00\x00\x00\x00': "('maildir/allen-p/inbox/1.',)", 'UL\xa1\xfc\x00\x00\x00\x00\x00\x00': '{}'}} 


[2017-02-04 15:47:22,463: INFO/MainProcess] Connected to amqp://guest:**@localhost:5672// 
[2017-02-04 15:47:22,473: INFO/MainProcess] mingle: searching for neighbors 
[2017-02-04 15:47:23,503: INFO/MainProcess] mingle: sync with 2 nodes 
[2017-02-04 15:47:23,504: INFO/MainProcess] mingle: sync complete 
[2017-02-04 15:47:23,530: INFO/MainProcess] [email protected] ready. 
[2017-02-04 15:47:24,890: INFO/MainProcess] sync with [email protected] 
[2017-02-04 15:47:51,017: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!? 

The full contents of the message body was: body: [[u'maildir/allen-p/inbox/1.'], {}, {u'errbacks': None, u'callbacks': None, u'chord': None, u'chain': [{u'chord_size': None, u'task': u'celery.group', u'args': [], u'immutable': False, u'subtask_type': u'group', u'kwargs': {u'tasks': [{u'chord_size': None, u'task': u'proj.tasks.deploy_db', u'args': [], u'options': {u'reply_to': u'bd66dd5c-516d-3b51-ab40-c8337a33b18e', u'task_id': u'765e5bbe-198f-405c-b10c-023d35e03981'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}, {u'chord_size': None, u'task': u'proj.tasks.deploy_es', u'args': [], u'options': {u'reply_to': u'bd66dd5c-516d-3b51-ab40-c8337a33b18e', u'task_id': u'7dacb897-d023-40b5-9874-e00b75107bbd'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}]}, u'options': {u'parent_id': None, u'task_id': u'f0d41289-33e2-4c8c-8d84-9d1d4c5a9c80', u'root_id': None}}]}] (801b) 
{content_type:'application/json' content_encoding:'utf-8' 
    delivery_info:{'consumer_tag': 'None4', 'redelivered': False, 'routing_key': 'parse', 'delivery_tag': 3L, 'exchange': ''} headers={'\xe5\xca.\xdb\x00\x00\x00\x00\x00': None, 'P&5\x07\x00': None, 'T\nKB\x00\x00\x00': '4d7754ed-0e36-4731-ae99-a84f42b8eba1', 'N\xfd\x17=\x00\x00': '[email protected]', '\xcfb\xddR': 'py', '9*\xa8': None, '\xb7/b\x84\x00\x00\x00': 0, '\xe0\x0b\xfa\x89\x00\x00\x00': None, '\xdfR\xc4x\x00\x00\x00\x00\x00': [None, None], 'T3\x1d ': 'proj.tasks.parse', '\xae\xbf': '4d7754ed-0e36-4731-ae99-a84f42b8eba1', '\x11s\x1f\xd8\x00\x00\x00\x00': "('maildir/allen-p/inbox/1.',)", 'UL\xa1\xfc\x00\x00\x00\x00\x00\x00': '{}'}} 

enter code here 

Odpowiedz

10

Dobrze byłoby dać wersje selera i librabbitmq używasz. Ponieważ miałem bardzo podobny problem, domyślam się, że używasz selera 4.0.2 i librabbitmq 1.6.1.

W takim przypadku jest to znany problem ze zgodnością, można zapoznać się z https://github.com/celery/celery/issues/3675 i https://github.com/celery/librabbitmq/issues/93.

Pierwszy link daje wam zalecenia, aby rozwiązać swój problem a mianowicie:

  • odinstalować librabbitmq pip uninstall librabbitmq (może trzeba zadzwonić tę komendę wielokrotnie)

  • zmianę wystąpień amqp do pyamqp w twoich adresach URL. (Chociaż nie znajduje się w twoim pliku konfiguracyjnym, jeśli używasz go, co nie działało dla mnie).

Aby odpowiedzieć dokładniej na inne pytania: masz rację, mówiąc, że istnieje nadawca i osoba pobierająca.

Rola nadawcy jest przejmowana przez aplikację utworzoną po wywołaniu Celery(...). Jedną z jego funkcji jest rejestrowanie zadań, a jeśli spojrzysz na jego implementację w app/base.py, zobaczysz, że implementuje ona metodę send_task, która jest bezpośrednio wywoływana przez metodę apply_async klasy Task. Rola tej metody polega na wysyłaniu opracowanej wersji zadania przez przewód do brokera, aby mógł go pobrać pracownik. Protokołem aplikacji używanym do przesyłania komunikatu jest amqp, dla którego implementacją jest librabbitmq.

Po drugiej stronie drutu znajduje się inna instancja uruchomiona przez pracownika, który wykonuje czynność pobierania. W języku selera nazywa się to Consumer. Jego implementację można znaleźć w pliku worker/consumer/consumer.py. Zobaczysz, że implementuje on kod create_task_handler, który z kolei definiuje funkcje on_task_received, które podnoszą wyświetlany błąd. Jest to funkcja wywoływana, gdy nowe zadanie jest pobierane od pracownika, a następnie przetwarzane.

Proponowane rozwiązanie polega zatem na zmianie implementacji protokołu amqp, tak aby TypeError nie został podniesiony w on_task_received (co wydaje mi się, że spowodowałoby to problem z kodowaniem).

Mam nadzieję, że odpowie na wszystkie Twoje pytania i zapewni lepszy wgląd w działanie selera. Powinienem zakończyć, mówiąc, że zgodnie z moją wiedzą "konwencjonalne" używanie Selera nigdy nie wymagałoby od ciebie manipulowania tymi rodzajami wnętrz, i że możesz osiągnąć 99% tego, czego możesz chcieć, zaimplementując niestandardowe klasy zadań i niestandardowe backendy, na przykład .

+0

Hi Anis: to jest naprawdę tak nic e ciebie, aby mi pomóc w tym pytaniu! Muszę nazywać cię Pan Fantastic! 1) pip2.7 zainstaluj librabbitmq-1.6.1.tar.gz 2) pip2.7 zainstaluj selera-4.0.2.tar.gz. To jest dokładnie ta wersja oprogramowania, którą zainstalowałem! Przestrzegam twoich rad! A teraz mój projekt działa teraz! Dzisiaj jestem taki szczęśliwy! Że miła przyjaciółka Anis pomoże mi w tej sprawie! – arthur

0

Po to, aby znaleźć odpowiedź również tutaj. W wątku Anis dotyczy 23doors mentions że nowy protokół domyślny seler 4 nie jest odtwarzany miłe z librabbitmq:

Widocznie librabbitmq problem jest związany z nowym domyślnym protokołem w selera 4.x

Wspomina również, że aby rozwiązać ten problem, można skorzystać ze starszych protokół ofert Seler przez ustawienie (jeśli używasz Django):

CELERY_TASK_PROTOCOL = 1 

W przeciwnym razie można ustawić następujące w swojej celeryconf.py plik

app.conf.task_protocol = 1 

Wszystko kredytowej 23doors :)