2017-08-30 70 views
11

Jestem w trakcie przenoszenia jakiegoś kodu synchronicznego do asyncio przy użyciu aiohttp. synchroniczny kod trwał 15 minut, więc mam nadzieję, że poprawi to.Python aiohttp/asyncio - jak przetwarzać zwrócone dane

Mam działający kod, który pobiera dane z niektórych adresów URL i zwraca treść każdego z nich. Ale jest to tylko przeciwko 1 stronie laboratorium, mam ponad 70 rzeczywistych stron.

Więc jeśli dostałem pętlę, aby utworzyć listę wszystkich adresów URL dla wszystkich witryn, które utworzyłyby 700 adresów URL na liście do przetworzenia. Czy przetwarzanie ich nie wydaje mi się problemem?

Ale robiąc "rzeczy" z wynikami, nie jestem pewien jak programować? Mam już kod, który zrobi "stuff" do każdego zwróconego wyniku, ale nie jestem pewien, jak zaprogramować odpowiedni wynik.

Po uruchomieniu kodu przetwarza wszystkie adresy URL i w zależności od czasu do uruchomienia zwraca nieznane zamówienie?

Czy potrzebuję funkcji, która przetworzy każdy rodzaj wyniku?

import asyncio, aiohttp, ssl 
from bs4 import BeautifulSoup 

def page_content(page): 
    return BeautifulSoup(page, 'html.parser') 


async def fetch(session, url): 
    with aiohttp.Timeout(15, loop=session.loop): 
     async with session.get(url) as response: 
      return page_content(await response.text()) 

async def get_url_data(urls, username, password): 
    tasks = [] 
    # Fetch all responses within one Client session, 
    # keep connection alive for all requests. 
    async with aiohttp.ClientSession(auth=aiohttp.BasicAuth(username, password)) as session: 
     for i in urls: 
      task = asyncio.ensure_future(fetch(session, i)) 
      tasks.append(task) 

     responses = await asyncio.gather(*tasks) 
     # you now have all response bodies in this variable 
     for i in responses: 
      print(i.title.text) 
     return responses 


def main(): 
    username = 'monitoring' 
    password = '*********' 
    ip = '10.10.10.2' 
    urls = [ 
     'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'10.10.0.1'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.10.0.1'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'frontend.domain.com'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'planner.domain.com'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.10.10.1'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.11.11.1'), 
     'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'10.12.12.60'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.12.12.60'), 
     'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'lon-dc-01.domain.com'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'lon-dc-01.domain.com'), 
     ] 
    loop = asyncio.get_event_loop() 
    future = asyncio.ensure_future(get_url_data(urls,username,password)) 
    data = loop.run_until_complete(future) 
    print(data) 

if __name__ == "__main__": 
    main() 

Odpowiedz

2

Twój kod nie jest daleko od znaku. asyncio.gather zwraca wyniki w kolejności argumentów, więc kolejność jest tutaj zachowywana, ale page_content nie zostanie wywołana w kolejności.

Kilka poprawek:

Przede wszystkim nie trzeba tutaj ensure_future. Tworzenie zadania jest potrzebne tylko wtedy, gdy starasz się, aby dany coroutine przeżył jego rodzica, tzn. Czy zadanie musi nadal działać, nawet jeśli została utworzona funkcja, która go utworzyła. Oto, co trzeba jest wywołanie asyncio.gather zamiast bezpośrednio ze swoimi współprogram:

async def get_url_data(urls, username, password): 
    async with aiohttp.ClientSession(...) as session: 
     responses = await asyncio.gather(*(fetch(session, i) for i in urls)) 
    for i in responses: 
     print(i.title.text) 
    return responses 

Ale nazywając to by zaplanować wszystko sprowadzić w tym samym czasie, a także z dużej liczby adresów URL, to jest daleka od optymalnej. Zamiast tego należy wybrać maksymalną współbieżność i upewnić się, że w danym momencie uruchomionych jest co najwyżej X pobierania. Aby to zaimplementować, możesz użyć atrybutu asyncio.Semaphore(20), semafor ten może zostać pobrany tylko przez maksymalnie 20 osób, więc pozostali będą czekać, aż uzyskają dostęp do miejsca.

CONCURRENCY = 20 
TIMEOUT = 15 

async def fetch(session, sem, url): 
    async with sem: 
     async with session.get(url) as response: 
      return page_content(await response.text()) 

async def get_url_data(urls, username, password): 
    sem = asyncio.Semaphore(CONCURRENCY) 
    async with aiohttp.ClientSession(...) as session: 
     responses = await asyncio.gather(*(
      asyncio.wait_for(fetch(session, sem, i), TIMEOUT) 
      for i in urls 
     )) 
    for i in responses: 
     print(i.title.text) 
    return responses 

W ten sposób wszystkie pobrania są uruchamiane natychmiast, ale tylko 20 z nich będzie mogło uzyskać semafor. Inne będą blokować przy pierwszej instrukcji async with i czekać, aż zostanie wykonane kolejne pobieranie.

Wymieniłem również aiohttp.Timeout na oficjalny odpowiednik asyncio.

Wreszcie, dla rzeczywistego przetwarzania danych, jeśli jesteś ograniczony czasem procesora, asyncio prawdopodobnie nie pomoże ci dużo. Będziesz musiał użyć tutaj ProcessPoolExecutor, aby dokonać równoległej pracy z innym procesorem. run_in_executor będzie prawdopodobnie używać.

+0

Dzięki, rozumiem wszystko, co powiedziałeś, ale straciłeś mnie w części ProcessPoolExecutor. Muszę mieć oddzielny procesor CPU wyniki? jak mam to zrobic? i jak mogę je przetworzyć w kolejności lub czy potrzebuję funkcji, która przetwarza wszystkie wyniki bez względu na rodzaj? – AlexW

2

Oto przykład z concurrent.futures.ProcessPoolExecutor. Jeśli zostanie utworzony bez określenia max_workers, implementacja będzie zamiast tego używać os.cpu_count. Zauważ też, że asyncio.wrap_future jest publiczne, ale nieudokumentowane. Alternatywnie istnieje AbstractEventLoop.run_in_executor.

import asyncio 
from concurrent.futures import ProcessPoolExecutor 

import aiohttp 
import lxml.html 


def process_page(html): 
    '''Meant for CPU-bound workload''' 
    tree = lxml.html.fromstring(html) 
    return tree.find('.//title').text 


async def fetch_page(url, session): 
    '''Meant for IO-bound workload''' 
    async with session.get(url, timeout = 15) as res: 
     return await res.text() 


async def process(url, session, pool): 
    html = await fetch_page(url, session) 
    return await asyncio.wrap_future(pool.submit(process_page, html)) 


async def dispatch(urls): 
    pool = ProcessPoolExecutor() 
    async with aiohttp.ClientSession() as session: 
     coros = (process(url, session, pool) for url in urls) 
     return await asyncio.gather(*coros) 


def main(): 
    urls = [ 
     'https://stackoverflow.com/', 
     'https://serverfault.com/', 
     'https://askubuntu.com/', 
     'https://unix.stackexchange.com/' 
    ] 
    result = asyncio.get_event_loop().run_until_complete(dispatch(urls)) 
    print(result) 

if __name__ == '__main__': 
    main()