2014-06-17 16 views
6

Używam pakietu SDK AWS dla języka Java i używam klienta asynchronizującego asynchroniczny sqs do żądania wsadowego, aby zmniejszyć koszty.Jak zmusić klienta AmazonSQSBufferedAsyncClient do opróżnienia wiadomości?

Gdy moja aplikacja zostanie wyłączona, chcę się upewnić, że żadne wiadomości nie czekają w buforze, ale nie ma żadnej metody, którą widzę na kliencie.

Czy AmazonSQSBufferedAsyncClient.shutdown() opróżnia moje wiadomości po wywołaniu? Spojrzałem na source code i jest niejasne. Metoda nazywa shutdown() na każdym QueueBuffer że ma, ale w środku QueueBuffer.shutdown() mówi

public void shutdown() { 
    //send buffer does not require shutdown, only 
    //shut down receive buffer 
    receiveBuffer.shutdown(); 
} 

Ponadto dokumentacja .shutdown() mówi:

Wyłącza tego obiektu klienta, uwalniając wszelkie zasoby, które mogą być trzymany otwarty. Jest to metoda opcjonalna, a osoby dzwoniące nie są proszone o numer , aby ją wywołać, ale mogą, jeśli chcą jawnie zwolnić wszystkie otwarte zasoby . Po wyłączeniu klienta nie należy go używać do tworzenia kolejnych żądań.

W przypadku tej aplikacji muszę zagwarantować, że podczas buforowania wiadomości nie zostaną utracone. Czy muszę obsługiwać to ręcznie, używając normalnego AmazonSQSClient zamiast buforowania/asynchronicznego?

+1

znalazłem to, co wydaje się być to samo pytanie, a obecnie bez odpowiedzi: https://forums.aws.amazon.com/thread.jspa?threadID=122189 – Daenyth

Odpowiedz

1

Wersja 1.11.37 zestawu SDK ma w tym celu parametr konfiguracyjny w wersji QueueBufferConfig.

AmazonSQSBufferedAsyncClient bufClient = 
    new AmazonSQSBufferedAsyncClient(
     realAsyncClient, 
     new QueueBufferConfig() 
      .withFlushOnShutdown(true) 
    ); 
1

Istnieje metoda jawnego wywoływania koloru, ale nie jest dostępna i faktycznie nie mogłem znaleźć żadnego połączenia z tą metodą w kodzie amazońskim. Wygląda na to, że czegoś brakuje.

Po wywołaniu zamknięcie na kliencie asynchroniczny to wykonuje następujący kod:

public void shutdown() { 
    for(QueueBuffer buffer : buffers.values()) { 
     buffer.shutdown(); 
    } 
    realSQS.shutdown(); 
} 

I QueueBuffer # shutdown() wygląda następująco:

/** 
* Shuts down the queue buffer. Once this method has been called, the 
* queue buffer is not operational and all subsequent calls to it may fail 
* */ 
public void shutdown() { 
    //send buffer does not require shutdown, only 
    //shut down receive buffer 
    receiveBuffer.shutdown(); 
} 

Więc wydaje się, że nie są one celowo wywołanie metody sendBuffer.shutdown(), która wypróżnia wszystkie wiadomości w buforze, które nie są jeszcze wysyłane.

Czy znalazłeś przypadek po zamknięciu klienta SQS i zgubieniu wiadomości? Wygląda na to, że są tego świadomi i ta sprawa nie powinna się zdarzyć, ale jeśli chcesz mieć pewność, że możesz nazwać tę metodę refleksją, która jest naprawdę nieprzyjemna, ale zaspokoi twoje potrzeby.

AmazonSQSBufferedAsyncClient asyncSqsClient = <your initialization code of the client>; 
    Field buffersField = ReflectionUtils.findField(AmazonSQSBufferedAsyncClient.class, "buffers"); 
    ReflectionUtils.makeAccessible(buffersField); 
    LinkedHashMap<String, Object> buffers = (LinkedHashMap<String, Object>) ReflectionUtils.getField(buffersField, asyncSqsClient); 
    for (Object buffer : buffers.values()) { 
     Class<?> clazz = Class.forName("com.amazonaws.services.sqs.buffered.QueueBuffer"); 
     SendQueueBuffer sendQueueBuffer = (SendQueueBuffer) ReflectionUtils.getField(ReflectionUtils.findField(clazz, "sendBuffer"), buffer); 
     sendQueueBuffer.flush();//finally 
    } 

Coś takiego powinno zadziałać, tak myślę. Daj mi znać!