2017-02-10 64 views
7

Próbuję zalogować się do witryny jednocześnie, używając wielu poświadczeń z aiohttp i asyncio. W funkcji create_tasks generuję listę sesji, które będą używane dla każdego z nich. Powodem, dla którego nie mogę po prostu utworzyć sesji w ramach funkcji login jest to, że ten sam obiekt sesji będzie używany w całym kodzie. Próbuję stworzyć sposób, w jaki mogę użyć menedżera kontekstu do obsługi zamykania sesji (aby uniknąć błędów środowiska wykonawczego, pozostawiając je otwarte).Jak zmienić ten kod, aby używać menedżerów kontekstów?

Następujący kod działa zgodnie z przeznaczeniem (równoczesne gromadzenie strony logowania i analizowanie tokenu w puli procesowej), ale generuje sesje oddzielnie od zadań i wymaga ich zamknięcia na końcu.

from bs4 import BeautifulSoup 
from concurrent.futures import ProcessPoolExecutor 
import aiohttp 
import asyncio 

#TODO: make this safe, handle exceptions 

LOGIN_URL = "http://example.com/login" 
CLIENT_CNT = 10 
proc_pool = ProcessPoolExecutor(CLIENT_CNT) 

def get_key(text): 
    soup = BeautifulSoup(text, "html.parser") 
    form = soup.find("form") 
    key = form.find("input", attrs={"type": "hidden", "name": "authenticityToken"}) 
    return key.get("value", None) 

async def login(username:str, password:str, session:aiohttp.ClientSession, sem:asyncio.BoundedSemaphore, loop:asyncio.AbstractEventLoop=None): 
    loop = loop or asyncio.get_event_loop() 
    async with sem: 
     async with session.get(LOGIN_URL) as resp: 
      x = await asyncio.ensure_future(loop.run_in_executor(proc_pool, get_key, await resp.text())) 
      print(x) 

def create_tasks(usernames, passwords, sem:asyncio.BoundedSemaphore, loop:asyncio.AbstractEventLoop=None): 
    loop = loop or asyncio.get_event_loop() 
    tasks = [] 
    sessions = [] 
    for u, p in zip(usernames, passwords): 
     session = aiohttp.ClientSession(loop=loop) 
     sessions.append(session) 
     tasks.append(login(u, p, session, sem, loop)) 
    return tasks, sessions 

if __name__ == "__main__": 
    loop = asyncio.get_event_loop() 
    sem = asyncio.BoundedSemaphore(CLIENT_CNT) 
    usernames = ("a", "b", "c", "d", "e", "f", "g") 
    passwords = ("a", "b", "c", "d", "e", "f", "g") 
    tasks, sessions = create_tasks(usernames, passwords, sem, loop) 
    loop.run_until_complete(asyncio.gather(*tasks, loop=loop)) 
    for session in sessions: 
     session.close() 

ja poprzednio wykonane create_tasks się współprogram napisał klasy otoki, aby iterables async i próbuje za pomocą

async with aiohttp.ClientSession() as session: 
    tasks.append(login(u, p, session, sem, loop) 

Ale jak się obawiałem, to powiedział, że sesja została już zamknięta przez czas został uruchomiony.

+1

nie powinien "def create_tasks()' być 'async def create_tasks()'? – Juggernaut

+0

To było, gdy użyłem asynchronicznego menedżera kontekstów dla każdej sesji, ale zmieniłem to ponieważ ta metoda nie działała i 'asynchroniczna def' była niepotrzebna. – Goodies

+0

Dlaczego zadanie nie może utworzyć sesji, wykonać proces logowania (uzyskanie pliku cookie, tokenu itd.) I samemu zamknąć sesję? Nie rozumiem, dlaczego sesja, a nie sesja _factory_, taka jak 'aiohttp.ClientSession', powinna zostać przypisana do zadania. – 9000

Odpowiedz

0

Naprawdę nie wyjaśniłeś, jakiego rodzaju zadań potrzebujesz, a czego potrzebujesz?

Coś bardziej skomplikowanego?

Czy chcesz, aby była ona specyficzna dla nazwy użytkownika/hasła?

Czy na koniec chcesz zapisać wszystkie odpowiedzi?

Dla tego kodu zakładam, że nazwa użytkownika/hasło nie ma znaczenia, ale może się szybko zmienić.

Zamiast sposobu inicjowania sesji osobno użyłem wzorca konsument/producent.

Każdy konsument sesję z menedżerem kontekstów, również nie potrzebuje semafora (z powodu kolejki).

import asyncio 
from concurrent.futures import ProcessPoolExecutor 

from aiohttp import ClientSession 
from bs4 import BeautifulSoup 

LOGIN_URL = "http://example.com/login" 
CLIENT_CNT = 10 
proc_pool = ProcessPoolExecutor(CLIENT_CNT) 


def get_key(text): 
    soup = BeautifulSoup(text, "html.parser") 
    form = soup.find("form") 
    key = form.find("input", attrs={"type": "hidden", "name": "authenticityToken"}) 
    return key.get("value", None) 


async def init_consumer(username: str, password: str, loop, queue): 
    loop = loop or asyncio.get_event_loop() 
    async with ClientSession(loop=loop) as session: 
     # init the session with creds? i you didn't use the username/password 
     async with session.get(LOGIN_URL) as login_resp: 
      x = await asyncio.ensure_future(loop.run_in_executor(proc_pool, get_key, await login_resp.text())) 
      print(x) 
     url = await queue.get() 
     while url is not None: 
      # Do things with session and queue 
      async with session.get(url) as resp: 
       rsp_as_txt = await resp.text() 
      queue.task_done() 
      url = await queue.get() 


async def generate_tasks(queue): 
    tasks = ["http://www.example.com" for i in range(20)] 
    # putting all tasks in queue 
    for task in tasks: 
     await queue.put(task) 
    # waiting for all tasks to finish 
    queue.join() 
    # Telling consumer to finish process 
    for i in range(queue.maxsize): 
     queue.put(None) 


async def run(loop): 
    queue = asyncio.Queue(CLIENT_CNT) 
    usernames = ("a", "b", "c", "d", "e", "f", "g") 
    passwords = ("a", "b", "c", "d", "e", "f", "g") 
    consumers = [asyncio.ensure_future(init_consumer(u, p, loop, queue)) for u, p in zip(usernames, passwords)] 
    return await generate_tasks(queue) 


if __name__ == "__main__": 
    loop = asyncio.get_event_loop() 
    loop.run_until_complete(run(loop=loop)) 
4

Oto struktura sprawia, że ​​rozumowanie łatwiejszy:

async def user(u, p, ...): 
    """Everything a single user does""" 
    auth = await login(u, p) 
    await download_something(auth, ...) 
    await post_something(auth, ...) 

async def login(u, p): ... 
    async with aiohttp.ClientSession() as session: 
     async with session.get("http://xxx/login", ...) as r: 
      data = await r.json() 
      return data["something"] 

async def download_xxx(...): ... 
async def post_xxx(...): ... 

async def everything(): 
    creds = [("u1", "p1"), ...] 
    flows = [asyncio.ensure_future(user(cred)) for cred in creds] 
    for flow in flows: 
     await flow 

Caveat programatora: aiohttp domyślnie pojawia się do przechowywania plików cookie, upewnij się, że nie cross-zapylają płynie twoja użytkownika.

Punkty premiowe za: poprawne użycie asyncio.gather() w ostatniej funkcji asynchronicznej.

+0

Jeśli chcesz przechowywać dane uwierzytelniające w 'aiohttp.ClientSession' (jawne lub pliki cookie), może być najbardziej elegancka podklasa (ale jest to ścisłe powiązanie). Inną opcją jest wyrwanie danych auth (tokenu jak powyżej lub cookie) i przekazanie tego wprost. –

1

Użyj ExitStack.

from contextlib import ExitStack 

def create_tasks(..., context): 
    tasks = [] 
    for username in usernames: 
     session = aiohttp.ClientSession() 
     tasks.append(...) 
     context.enter_context(session) 
    return tasks 

if __name__ == "__main__": 
    context = ExitStack() 
    tasks = create_tasks(..., context) 
    with context: 
     loop.run_until_complete(asyncio.gather(*tasks)) 
+0

Do tej pory może to być najlepszy kandydat, chociaż będę musiał przeprowadzić pewne testy. – Goodies

+0

Wygląda bardzo elegancko! Bardzo ... "funkcjonalny". Jeśli to działa dla OP, oznacza to nieco więcej bałaganu, na przykład przekazywanie sesji dookoła, ale wciąż może być tego warte. –

+0

Tutaj alternatywą dla przekazywania sesji wokół (jak w pytaniu) jest przekazywanie kontekstu. Musisz przesłać odwołanie do sesji w jakiejś formie, jeśli chcesz je później zamknąć. – Alvra