2012-08-23 14 views
6

Jeśli jest to idiotyczne pytanie, przepraszam i pójdzie ukryć moją głowę ze wstydu, ale:Python/RQ - monitorowanie statusu pracownika

Używam RQ kolejce do pracy w Pythonie. Chcę, aby działało tak:

  1. Zadanie A rozpoczyna się. Zadanie A pobiera dane za pośrednictwem interfejsu API sieci Web i przechowuje je.
  2. Zadanie A działa.
  3. Zadanie zakończone.
  4. Po zakończeniu A rozpoczyna się zadanie B. Zlecenie B sprawdza każdy rekord przechowywany przez zadanie A i dodaje dodatkowe dane odpowiedzi.
  5. Po zakończeniu zadania B użytkownik otrzymuje szczęśliwy e-mail z informacją, że raport jest gotowy.

Mój kod do tej pory:

redis_conn = Redis() 
use_connection(redis_conn) 
q = Queue('normal', connection=redis_conn) # this is terrible, I know - fixing later 
w = Worker(q) 
job = q.enqueue(getlinksmod.lsGet, theURL,total,domainid) 
w.work() 

Przypuszczałem moim najlepszym rozwiązaniem było mieć 2 pracowników, jeden dla pracy A i jeden dla B. Praca robotnik B może monitorować zadania A, a gdy praca Zrobiono A, zacznij pracę B.

Czego nie mogę wymyślić, aby uratować moje życie, to to, jak mogę zatrudnić jednego pracownika do monitorowania statusu innego. Mogę pobrać identyfikator pracy z zadania A z job.id. Mogę pobrać nazwę pracownika z w.name. Ale nie myśl o tym, jak przekazać tę informację innemu pracownikowi.

Czy jest o wiele prostszy sposób na zrobienie tego, czego zupełnie brakuje?

+1

Jeśli zlecenie B nie może być wykonane do czasu zakończenia zadania A (sugerując, że nie mogą być uruchomione równolegle), dlaczego w ogóle używać funkcji rq? Po prostu wykonaj je sekwencyjnie (w oddzielnym wątku lub procesie, jeśli nie chcesz blokować aplikacji). –

+0

Każde zadanie dla A i B zajmuje bardzo dużo czasu i może się zdarzyć osobno, więc chciałbym móc utrzymuj dużo pracy A niezależnie od pracy B. Jeśli jest to zbyt trudne, mogę się jednak poddać. – user1066609

+0

Czy masz pary A i B, które pasują do siebie, czy też dowolne B może być zależne od dowolnego A? Ponieważ w tym drugim przypadku masz jeden problem z synchronizacją. :-) –

Odpowiedz

0

Prawdopodobnie jesteś zbyt głęboko zaangażowany w projekt, aby go zmienić, ale jeśli nie, spójrz na numer Twisted. http://twistedmatrix.com/trac/ Używam go teraz do projektu, który trafia w interfejsy API, niszczy zawartość internetową itp. Wykonuje wiele zadań równolegle, a także porządkuje określone zadania, więc zadanie B nie zostanie wykonane, dopóki zadanie A nie zostanie wykonane.

To jest najlepszy samouczek do nauki Twisted, jeśli chcesz spróbować. http://krondo.com/?page_id=1327

0

Połączyć rzeczy, które zadanie A i zadanie B wykonują w jednej funkcji, a następnie użyć np. multiprocessing.Pool (to jest metoda map_async) w celu wyodrębnienia różnych procesów.

Nie znam rq, ale multiprocessing jest częścią standardowej biblioteki. Domyślnie używa tyle procesów, ile procesorów ma rdzeń, co z mojego doświadczenia jest zwykle wystarczające do nasycenia maszyny.

2

Od this page na rq docs, wygląda jak każdy obiekt job posiada atrybut result, wpłacone przez job.result, co można sprawdzić. Jeśli zadanie nie zostało zakończone, będzie to None, ale jeśli upewnisz się, że twoje zadanie zwraca pewną wartość (nawet tylko "Done"), możesz poprosić innego pracownika, aby sprawdził wynik pierwszej pracy, a następnie rozpocznij pracę tylko wtedy, gdy job.result ma wartość, co oznacza, że ​​pierwszy pracownik został ukończony.

6

Aktualizacja Styczeń 2015, to prośba przyciąganie jest teraz połączone, a parametr zostanie zmieniona na depends_on, tj:

second_job = q.enqueue(email_customer, depends_on=first_job) 

oryginalny post pozostaje nienaruszony dla osób prowadzących starsze wersje i tak:

Przesłałem żądanie pobrania (https://github.com/nvie/rq/pull/207) w celu obsługi zależności zadań w RQ. Gdy ta prośba przyciąganie zostanie scalona, ​​będziesz w stanie to zrobić:

def generate_report(): 
    pass 

def email_customer(): 
    pass 

first_job = q.enqueue(generate_report) 
second_job = q.enqueue(email_customer, after=first_job) 
# In the second enqueue call, job is created, 
# but only moved into queue after first_job finishes 

Na razie Proponuję napisanie funkcji otoki do kolejno uruchomić swoje miejsca pracy. Na przykład:

def generate_report(): 
    pass 

def email_customer(): 
    pass 

def generate_report_and_email(): 
    generate_report() 
    email_customer() # You can also enqueue this function, if you really want to 

# Somewhere else 
q.enqueue(generate_report_and_email)