Oto, jak możesz to zrobić z Mongo.
UWAGA: Musiałem uczynić projekt nieco bardziej wyrozumiałym, ponieważ zadania Celery nie gwarantują wykonania dokładnie momentu, w którym eta
jest spełniony lub skończy się countdown
.
Ponadto, wygasające indeksy Mongo są czyszczone co minutę; Nie można opierać projektu wokół rekordów, które są usuwane, gdy tylko pojawi się eta
.
Tak czy inaczej, przepływ jest coś takiego:
- Kod Klient zwraca
my_task
.
preflight
zwiększa licznik połączeń i zwraca go jako flight_id
_my_task
ma zostać wykonany po TTL
sekund.
- Po uruchomieniu
_my_task
sprawdza, czy numer flight_id
jest nadal aktualny. Jeśli nie, przerwie się.
- ... Jakiś czas później ... Mongo sprząta nieaktualne wpisy w kolekcji via an expiring index.
@celery.task(track_started=False, ignore_result=True)
def my_task(my_arg):
flight_id = preflight(inflight_collection, 'my_task', HASH(my_arg), TTL)
_my_task.apply_async((my_arg,), {'flight_id':flight_id}, countdown=TTL)
@celery.task(track_started=False, ignore_result=True)
def _my_task(my_arg, flight_id=None):
if not check_for_takeoff(inflight_collection, 'my_task', HASH(my_arg), flight_id):
return
# ... actual work ... #
kod Biblioteka:
TTL = 5 * 60 # Run tasks after 5 minutes
EXPIRY = 6 * TTL # This needs to be much larger than TTL.
# We need to store a list of task-executions currently pending
inflight_collection = db['celery_In_Flight']
inflight_collection.create_index([('fn', pymongo.ASCENDING,),
('key', pymongo.ASCENDING,)])
inflight_collection.create_index('eta', expiresAfterSeconds=EXPIRY)
def preflight(collection, fn, key, ttl):
eta = datetime.datetime.now() + datetime.timedelta(seconds=ttl)
result = collection.find_one_and_update({
'fn': fn,
'key': key,
}, {
'$set': {
'eta': eta
},
'$inc': {
'flightId': 1
}
}, upsert=True, return_document=pymongo.ReturnDocument.AFTER)
print 'Preflight[{}][{}] = {}'.format(fn, key, result['flightId'])
return result['flightId']
def check_for_takeoff(collection, fn, key, flight_id):
result = collection.find_one({
'fn': fn,
'key': key
})
ready = result is None or result['flightId'] == flight_id
print 'Check[{}][{}] = {}, {}'.format(fn, key, result['flightId'], ready)
return ready
Możesz użyć pamięci podręcznej. Wiele magazynów kluczy ma zapisy czasowe, spróbuj pobrać wynik ze sklepu, jeśli nie ma wyniku, uruchom zadanie i zapisz wynik z czasem wygaśnięcia przed powrotem. Użyj tylko jednego pracownika, aby zadania były wykonywane sekwencyjnie. Unikaj schematów blokowania, chyba że chcesz uporać się z nieaktualnymi blokadami. –
Och, absolutnie. Ale wolałbym uniknąć skomplikowanych fragmentów implementacji ujawniania siebie (sprawdzanie argumentów, wyników śledzenia itp.) I zastanawiam się, czy istnieje jakikolwiek standardowy sposób robienia tego. –
Pisanie dekoratora pamięci podręcznej w Pythonie jest bardzo proste (może to być 4 linie), szkoda, że nie miałem czasu opublikować pełnej odpowiedzi. –