2017-09-24 63 views
6

Mam następujący kod:jak wdrożyć ograniczoną rozmowę z modernizacją blokowania klienta i współprogram

val context = newFixedThreadPoolContext(nThreads = 10, name="myThreadPool") 
val total = 1_000_000 //can be other number as well 
val maxLimit = 1_000 
return runBlocking { 
    (0..total step maxLimit).map { 
    async(context) { 
     val offset = it 
     val limit = it + maxLimit 
     blockingHttpCall(offset, limit) 
    } 
    }.flatMap { 
    it.await() 
    }.associateBy { 
    ... 
    }.toMutableMap() 
} 

chciałbym tylko 10 połączeń wydarzy równocześnie API blokującym. Wygląda jednak na to, że powyższy kod nie działa tak, jak się spodziewam (myślę, że wszystkie połączenia rozpoczynają się od razu) lub przynajmniej nie rozumiem, jeśli tak się dzieje.
Jaki jest prawidłowy sposób wdrożenia? Czy to samo rozwiązanie działa, jeśli używam async api z retrofit?

+0

Co sprawia, że ​​myślisz, że wszystkie połączenia odbywają się jednocześnie? –

Odpowiedz

2

Nie znam dokładnie sprawy, ale najprostszy sposób - użyć OkHttp API skonfigurować poziom współbieżności, na przykład, jest to default concurrency strategy of OkHttp

Ale można mieć własną strategię, jeśli ustawisz własne Dispatcher instancji do OkHttpClient.Builder

oczywiście, można użyć również współprogram

aktualna implementacja jest nieprawidłowa, ponieważ tworzenie współprogram dyspozytora dla każdej pozycji, ale mają wspólne pulę wątków wszystkie współprogram należy używać tego samego wysyłającego, wystarczy przesunąć newFixedThreadPoolContext tworzenie poza pętlą (teraz masz 1000 dyspozytorów, każdy z 10 wątkami).

Ale nie polecam używania połączeń typu coroutine + blokujących, lepiej skonfigurować współbieżność OkHttp (jest bardziej elastyczny) i używać coroutines z połączeniami bez blokowania (można napisać własny adapter lub użyć istniejącej biblioteki, takiej jak kotlin-coroutines-retrofit) . Umożliwi to miksowanie żądań HTTP i kodu interfejsu użytkownika lub innych zadań.

Więc jeśli używasz wewnętrznej API bez blokowania i OkHttp, nie potrzebujesz specjalnego kodu do kontrolowania współbieżności, możesz oczywiście ograniczyć liczbę równoczesnych połączeń, tak jak w powyższym przykładzie (ze stałym dyspozytorem konstrukcja), ale naprawdę nie sądzę, że ma to sens, ponieważ można obniżyć poziom współbieżności, a nie go zwiększać.

Po przejściu do nieblokującego API możesz po prostu uruchamiać wszystkie swoje coroutines w dowolnym kontrolerze coroutines równolegle (nawet w wątku UI) i czekać na wyniki bez blokowania.

Ponadto niejawna kontrola współbieżności przy użyciu konfiguracji OkHttpClient wygląda lepiej pod względem architektury (można mieć kod DI, który konfiguruje Retrofit + OkHttp i dostarcza go do kodu klienta z wstępnie skonfigurowaną polityką współbieżności). Oczywiście możesz to osiągnąć za pomocą innych podejść, ale ten wygląda dla mnie bardziej naturalnie.

+0

Edytowałem implementację, aby mieć pulę wątków, faktycznie podkreśliłem, że 'val' tylko upraszcza pytanie, więc to był błąd w pytaniu. więc w takim razie czy naprawdę ograniczam poziom współbieżności? Czy to dlatego, że moje wątki w puli są blokowane podczas wykonywania połączenia http, więc nie można ich zawiesić? – oshai

+1

Tak, ograniczasz poziom współbieżności w swoim przykładzie, wszystkie wątki są blokowane i nie możesz uruchomić więcej niż żądań "liczba wątków". Możesz to łatwo sprawdzić, jeśli zamienisz 'blockingHttpCall' na' Thread.sleep() '+' println' Ale nie możesz użyć tego podejścia do połączeń bez blokowania (możesz zastąpić 'Thread.sleep()' opcją 'delay () 'i sprawdź jak zmieniło się zachowanie po tym) – gildor

+1

Ale nadal możesz ograniczyć współbieżność, jeśli przepisałeś swój przykład, gdy następna część żądań została uruchomiona (w twoim przykładzie żądanie rozpoczęło się od skonstruowania coroutine za pomocą' async') po wcześniejszym zakończeniu. Ale znowu to zależy od twojej sprawy (na przykład wielką rzeczą jest to, jak sobie radzisz z wynikami), a w przypadku Retrofitu wciąż uważam, że lepiej pozwolić OkHttpowi kontrolować współbieżność. – gildor