2012-03-06 8 views
10

Używam selera z django i rabbitmq do utworzenia kolejki wiadomości. Mam też pracownika, który pochodzi z innej maszyny. W widoku django Zaczynam proces takiego:selera - funkcja połączenia w zadaniu wykonanym

def processtask(request, name): 
    args = ["ls", "-l"] 
    MyTask.delay(args) 
    return HttpResponse("Task set to execute.") 

Moim zadaniem jest skonfigurowany tak:

class MyTask(Task): 
    def run(self, args): 
    p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 
    (out, err) = p.communicate() 
    return out 

Moje pytanie brzmi, jak można broker (mój projekt Django) teraz otrzymać wyjście z polecenia "ls -l", które pracownik wykonał na swoim komputerze. Sądzę, że najlepszą rzeczą dla pracownika byłoby wywołanie funkcji w brokerze, gdy jest ona gotowa do wysłania danych wyjściowych z wykonanego polecenia.

Chciałbym otrzymywać asynchronicznie dane wyjściowe z procesu roboczego, a następnie aktualizować stronę z danymi wyjściowymi, ale to na inny czas. Na razie chciałbym tylko otrzymać dane wyjściowe od pracownika.

Aktualizacja

Teraz Dodałem żądanie HTTP GET, który jest uruchamiany po zakończeniu zadania powiadamiania aplikacji internetowej, że zadanie zostało wykonane - Ja też wysyłając task_id w http GET . Metoda HTTP GET wywołuje widok Django, który tworzy AsyncResult i dostaje wynik, ale problemem jest to, że podczas wywoływania result.get() pojawia się następujący błąd:

/usr/lib64/python2.6/site-packages/django_celery-2.5.1-py2.6.egg/djcelery/managers.py:178: TxIsolationWarning: Polling results with transaction isolation level repeatable-read within the same transaction may give outdated results. Be sure to commit the transaction for each poll iteration. 
    "Polling results with transaction isolation level" 

jakieś pomysły dlaczego? Nie używam bazy danych, ponieważ używam rabbitmq z AMQP.

Aktualizacja.

Bardzo chciałbym użyć trzeciej opcji, która wydaje się najlepszą opcją - dla małych i dużych wartości zwracanych. Całe moje zadanie wygląda następująco:

class MyTask(Task): 
    def __call__(self, *args, **kwargs): 
    return self.run(*args, **kwargs) 

    def after_return(self, status, retval, task_id, args, kwargs, einfo): 
    if self.webhost is not None: 
     conn = httplib.HTTPConnection(self.webhost, self.webport) 
     conn.request("HEAD", "/vuln/task/output/"+task_id) 

    def run(self, args, webhost=None, webport=None): 
    self.webhost = webhost 
    self.webport = webport 
    r = "This is a basic result string used for code clarity" 
    return r 

Tak już nadpisane na after_return funkcję, która powinna również zwolnić blokadę na moim zadaniem, ponieważ funkcja Uruchom zadanie jest() już zwrócone wartości. W żądaniu HEAD zasadniczo nazywam funkcję django, która wywołuje AsyncResult na task_id, która powinna dostarczyć wynik zadania. W moim przypadku użyłem arbitralnego wyniku do celów testowych, ponieważ jest on przeznaczony tylko do testowania.

Chciałbym wiedzieć, dlaczego powyższy kod nie działa. Mogę używać on_success, ale nie sądzę, że to coś zmieni - czy też nie?

+0

Czy można zapisać dane wyjściowe polecenia w bazie danych? – jpic

+0

Cześć, nie, ponieważ pracownicy nie mają dostępu do bazy danych brokera i nie chcę, aby mieli dostęp. Zdecydowanie muszę odesłać wynik, a następnie przetworzyć go w brokerze. – eleanor

+1

Może mógłbyś stworzyć interfejs API HTTP, aby odesłać wynik? Jest kilka łatwych sposobów na zrobienie tego w Django. – jpic

Odpowiedz

14

Jeśli spojrzeć here znajdziesz następujące:

Django seler wykorzystuje MySQL śledzić wszystkie zadania/wyników, królika-MQ jest używany jako magistrali komunikacyjnej zasadzie.

To, co naprawdę się dzieje, to próba pobrania ASyncResult pracownika podczas wykonywania zadania (zadanie wywołało żądanie HTTP na serwerze, a ponieważ jeszcze nie powrócił, sesja blokowania db z pracownik jest nadal aktywny, a wiersz wyniku wciąż jest zablokowany). Kiedy Django spróbuje odczytać wynik zadania (jego stan i rzeczywistą wartość zwracaną przez funkcję run), znajdzie wiersz zablokowany i wyświetli ostrzeżenie.

Istnieje kilka sposobów, aby przejść o rozwiązywaniu w tym:

  1. Konfigurowanie innego zadania seler czerpać wynik i łańcuch to do zadania przetwarzania. W ten sposób oryginalne zadanie zostanie zakończone, zwolnij blokadę na db, a nowa ją zdobędzie, przeczytaj wynik w django i wykonaj to, co potrzebujesz. Popatrz na dokumenty selera na ten temat.

  2. Nie przejmuj się i po prostu wykonaj POST na Django z pełnym wynikiem przetwarzania dołączonym jako ładunek, zamiast próbować go pobrać przez db.

  3. Zastąpienie on_success w klasie zadań i POST żądanie powiadomienia do Django, a następnie w którym momencie blokada powinna zostać zwolniona w tabeli db.

Zauważ, że musisz zachować cały wynik przetwarzania (niezależnie od tego, jaki jest duży) w metodzie zwrotu (prawdopodobnie marynowany). Nie wspomniałeś, jak duży może być wynik, więc może być sens, aby po prostu zrobić scenariusz nr 2 powyżej (co zrobię). Ewentualnie pójdę z # 3. Nie zapomnij również o obsłudze metody on_failure w swoim zadaniu.

+0

Dziękuję za komentarz. Zaktualizowałem swoją odpowiedź, aby zadać dodatkowe pytanie, na które muszę odpowiedzieć, zanim zaakceptuję twoją odpowiedź, która jest naprawdę dobra. – eleanor