2015-10-16 28 views
6

Próbuję zrobić program, aby dużo połączeń internetowych gniazd do serwera Mam Utworzono:Stopniowo tworzyć zadania async i czekać na wszystkie z nich, aby zakończyć

class WebSocketClient(): 

    @asyncio.coroutine 
    def run(self): 
     print(self.client_id, 'Connecting') 
     ws = yield from aiohttp.ws_connect(self.url) 
     print(self.client_id, 'Connected') 
     print(self.client_id, 'Sending the message') 
     ws.send_str(self.make_new_message()) 

     while not ws.closed: 
      msg = yield from ws.receive() 

      if msg.tp == aiohttp.MsgType.text: 
       print(self.client_id, 'Received the echo') 
       yield from ws.close() 
       break 

     print(self.client_id, 'Closed') 


@asyncio.coroutine 
def make_clients(): 

    for client_id in range(args.clients): 
     yield from WebSocketClient(client_id, WS_CHANNEL_URL.format(client_id=client_id)).run() 


event_loop.run_until_complete(make_clients()) 

Problem polega na tym, że wszyscy klienci wykonywać swoją pracę jeden po drugim:

0 Connecting 
0 Connected 
0 Sending the message 
0 Received the echo 
0 Closed 
1 Connecting 
1 Connected 
1 Sending the message 
1 Received the echo 
1 Closed 
... 

Próbowałem użyć asyncio.wait, ale wszyscy klienci zaczynają razem. Chcę, aby były tworzone stopniowo i połączone z serwerem natychmiast po utworzeniu każdego z nich. W tym samym czasie kontynuując tworzenie nowych klientów.

Jakie podejście należy zastosować, aby to osiągnąć?

+0

można [użyć semafora, aby ograniczyć liczbę jednoczesnych połączeń] (http://stackoverflow.com/a/20722204/4279) – jfs

+0

Proszę ozdobić WebSocketClient jako ' @ coroutine' –

+0

@AndrewSvetlov tak, jest udekorowany - błąd kopiowania/wklejania – warvariuc

Odpowiedz

7

Stosowanie asyncio.wait jest dobrym podejściem. Można łączyć je z asyncio.ensure_future i asyncio.sleep aby utworzyć zadania stopniowo:

@asyncio.coroutine 
def make_clients(nb_clients, delay): 
    futures = [] 
    for client_id in range(nb_clients): 
     url = WS_CHANNEL_URL.format(client_id=client_id) 
     coro = WebSocketClient(client_id, url).run() 
     futures.append(asyncio.ensure_future(coro)) 
     yield from asyncio.sleep(delay) 
    yield from asyncio.wait(futures) 

EDIT: I wdrożone FutureSet klasy, który powinien robić, co chcesz. Ten zestaw może być wypełniony kontraktami futures i usuwa je automatycznie po zakończeniu. Możliwe jest również czekanie na zakończenie wszystkich kontraktów terminowych.

class FutureSet: 

    def __init__(self, maxsize, *, loop=None): 
     self._set = set() 
     self._loop = loop 
     self._maxsize = maxsize 
     self._waiters = [] 

    @asyncio.coroutine 
    def add(self, item): 
     if not asyncio.iscoroutine(item) and \ 
      not isinstance(item, asyncio.Future): 
      raise ValueError('Expecting a coroutine or a Future') 
     if item in self._set: 
      return 
     while len(self._set) >= self._maxsize: 
      waiter = asyncio.Future(loop=self._loop) 
      self._waiters.append(waiter) 
      yield from waiter 
     item = asyncio.async(item, loop=self._loop)  
     self._set.add(item) 
     item.add_done_callback(self._remove) 

    def _remove(self, item): 
     if not item.done(): 
      raise ValueError('Cannot remove a pending Future') 
     self._set.remove(item) 
     if self._waiters: 
      waiter = self._waiters.pop(0) 
      waiter.set_result(None) 

    @asyncio.coroutine 
    def wait(self): 
     return asyncio.wait(self._set) 

Przykład:

@asyncio.coroutine 
def make_clients(nb_clients, limit=0): 
    futures = FutureSet(maxsize=limit) 
    for client_id in range(nb_clients): 
     url = WS_CHANNEL_URL.format(client_id=client_id) 
     client = WebSocketClient(client_id, url) 
     yield from futures.add(client.run()) 
    yield from futures.wait() 
+0

'asyncio.Queue' jest * końcową * klasą nie przeznaczoną do dziedziczenia. Zatem użytkownicy nie powinni wyprowadzać własnych klas z 'asyncio.Queue' nawet jeśli jest to technicznie możliwe. –

+0

@AndrewSvetlov Przypuszczam, że użytkownicy mogą chcieć dziedziczyć po 'asyncio.Queue', aby utworzyć inny rodzaj kolejek (np.' Asyncio.PriorityQueue' lub 'asyncio.LifoQueue'), ale w tym przypadku byłem po prostu leniwy: p I i tak go pozbyłem. – Vincent

+0

Nie, użytkownik nie może (przynajmniej nie powinien). 'LifoQueue' i' PriorityQueue' to klasy 'asyncio', nieprzeznaczone do dziedziczenia. Jedyne klasy "asyncio" przeznaczone do dziedziczenia to 'Protocol' i family. Państwo zostało wymówione przez Guido van Rossuma kilka razy, kiedy zaprojektowaliśmy bibliotekę. –