2015-02-19 20 views
5

próbuję enqueue wiadomości w Azure kolejek asynchronicznie tak:asynchronicznie delegowania Azure kolejek

private async Task EnqueueItemsAsync(IEnumerable<string> messages) { 
      var tasks = messages.Select(msg => _queue.AddMessageAsync(new CloudQueueMessage(msg), 
       null, null, null, null)); 

      await Task.WhenAll(tasks); 
     } 

Jeśli mogę to prawo to mówi „start enqueuing jedną pozycję po drugiej bez nich, aby dostać zamieszczonych oczekiwania, utrzymać odniesienie dla każdego zadania, a następnie poczekać, aż wszystkie zostaną opublikowane ".

Ten kod działa prawidłowo w większości przypadków, ale w przypadku dużej liczby elementów (5000) zaczyna się podkręcać, a następnie zgłasza wyjątek limitu czasu (po zagnieżdżeniu ~ 3500 elementów).

Rozwiązałem go czeka każdy z nich, aby zakończyć przed kontynuowaniem następny

private async Task EnqueueItemsAsync(IEnumerable<string> messages) { 
      foreach (var message in messages) { 
       await _queue.AddMessageAsync(new CloudQueueMessage(message), null, null, null, null); 
      } 
     } 

Czy ktoś może wyjaśnić, dlaczego tak się stało?

Wyjątek:

System.AggregateException który zawija wiele takich wyjątków: Microsoft.WindowsAzure.Storage.Core.Util.AsyncExtensions.<>c__DisplayClass4.<CreateCallbackVoid>b__3(IAsyncResult ar) Prośba o informacje RequestID: RequestDate: Komunikat stanu: < --- ---> (Wewnętrzna Wyjątek # 1) Microsoft.WindowsAzure.Storage. StorageException: Klient nie mógł zakończyć operacji w określonym czasie. ---> System.TimeoutException: Klient nie mógł zakończyć operacji w określonym czasie. --- Koniec wewnętrznego śledzenia stosu wyjątku --- Microsoft.WindowsAzure.Storage.Core.Executor.Executor.EndExecuteAsync [T] (Wynik IAsyncResult ) `.

+0

Która linia zgłasza wyjątek? i która metoda? Opublikuj nazwę wyjątku, wiadomość i stacktrace –

+0

@SriramSakthivel dzięki za wzmiankę, właśnie zaktualizowałem mój post –

Odpowiedz

6

Zaprojektowano kolejkę na Azure o przepustowości 2000 wiadomości na sekundę.

Patrz: Azure Storage Scalability and Performance Targets

Gdy aplikacja osiągnie granicę co partycja może obsługiwać do obciążenia, Azure Storage rozpocznie powrót kod błędu 503 (Serwer zajęty) lub kod błędu 500 (Operation Timeout) odpowiedzi. Gdy to nastąpi, aplikacja powinna zastosować politykę wykładniczą do ponowienia. Exponential backoff pozwala na zmniejszenie obciążenia na partycji i zmniejszenie skoków ruchu do tej partycji.

+0

Dzięki, myślę, że to rozwiązuje zagadkę. Tak więc mój pierwszy kod uruchamia pętlę for w czasie krótszym niż sekunda, rozpoczynając 5000 żądań post w mniej niż sekundę, co daje odpowiedź na timeout ... wydaje się uzasadnione –

1

Wygląda na to, że można stworzyć bardziej solidny mechanizm, przekazując QueryRequestOptions do AddMessageAsync.

Przed wysłaniem zapytania komunikat żądania dodaje te właściwości do polecenia.

Chciałbym spróbować przekazać QueryRequestOptions i ustawić wartość na MaximumExecutionTime i ServerTimeout o większej wartości.

ten sposób wniosek jest wypełniony przed wysłaniem:

// Microsoft.WindowsAzure.Storage.Queue.QueueRequestOptions 
internal void ApplyToStorageCommand<T>(RESTCommand<T> cmd) 
{ 
    if (this.LocationMode.HasValue) 
    { 
     cmd.LocationMode = this.LocationMode.Value; 
    } 
    if (this.ServerTimeout.HasValue) 
    { 
     cmd.ServerTimeoutInSeconds = new int?((int)this.ServerTimeout.Value.TotalSeconds); 
    } 
    if (this.OperationExpiryTime.HasValue) 
    { 
     cmd.OperationExpiryTime = this.OperationExpiryTime; 
     return; 
    } 
    if (this.MaximumExecutionTime.HasValue) 
    { 
     cmd.OperationExpiryTime = new DateTime?(DateTime.Now + this.MaximumExecutionTime.Value); 
    } 
} 

I to jest, jak to się wysłania:

rESTCommand.PreProcessResponse = delegate(RESTCommand<NullType> cmd, HttpWebResponse resp, Exception ex, OperationContext ctx) 
{ 
    HttpResponseParsers.ProcessExpectedStatusCodeNoException<NullType>(HttpStatusCode.Created, resp, NullType.Value, cmd, ex); 
    return NullType.Value; 
}; 
+0

Dzięki, ciekawe podejście do zminimalizowania szansy na timeout, jednak chciałem zrozumieć co spowodowało przekroczenie limitu czasu. –