2016-12-14 49 views
7

Próbuję zaimplementować serwer tcp 'echo. Proste rzeczy:Python multiprocessing and networking on Windows

  1. Klient wysyła wiadomość na serwer.
  2. Server odbiera komunikat
  3. Serwer przetwarza wiadomość na wielkie
  4. Server wysyła wiadomość do zmodyfikowanego klienta
  5. Klient drukuje odpowiedzi.

To działało dobrze, więc postanowiłem zrównoleglić serwer; sprawiają, że może obsługiwać wielu klientów w czasie. Ponieważ większość interpreterów Pythona ma GIL, wielowątkowość go nie wycina. Musiałem użyć wieloprocesorowych ... I chłopcze, to tam wszystko poszło w dół.

Używam systemu Windows 10 x64 i kombinezonu WinPython z Pythonem 3.5.2 x64.

Mój pomysł polega na utworzeniu gniazda, zainicjowaniu go (powiązanie i nasłuchiwanie), utworzeniu podprocesów i przekazaniu gniazda dzieciom. Ale z miłości do mnie ... Nie mogę wykonać tej pracy, moje podprocesy umierają niemal natychmiast. Początkowo miałem problemy z "wytrawianiem" gniazda ... Więc trochę goograłem i pomyślałem, że to jest problem. Próbowałem więc przekazać moje gniazdo przez kolejkę do przetwarzania wieloprocesowego, przez rurkę, a moją ostatnią próbą było "rozwidlenie" i przekazanie go jako obiektu bajtów podczas tworzenia przetwarzania. Nic nie działa.

Czy ktoś może rzucić trochę światła tutaj? Powiedz mi co jest nie tak? Może cały pomysł (dzielenie gniazd) jest zły ... A jeśli tak, PROSZĘ powiedzieć mi, w jaki sposób mogę osiągnąć mój pierwotny cel: umożliwienie mojemu serwerowi RZECZYWISTEGO obsługiwania wielu klientów jednocześnie (na Windows) (nie mów ja o wątkach, wszyscy wiemy, że nici Pythona nie zmniejszą ¬)

Warto również zauważyć, że żadne pliki nie są tworzone przez funkcję debugowania. Według mnie żaden proces nie trwał wystarczająco długo, by go uruchomić.

Typowa moc mojego kodu serwera jest (jedyna różnica między tras jest liczba procesów):

Server is running... 
Degree of parallelism: 4 
Socket created. 
Socket bount to: ('', 0) 
Process 3604 is alive: True 
Process 5188 is alive: True 
Process 6800 is alive: True 
Process 2844 is alive: True 

Press ctrl+c to kill all processes. 

Process 3604 is alive: False 
Process 3604 exit code: 1 
Process 5188 is alive: False 
Process 5188 exit code: 1 
Process 6800 is alive: False 
Process 6800 exit code: 1 
Process 2844 is alive: False 
Process 2844 exit code: 1 
The children died... 
Why god? 
WHYYyyyyy!!?!?!? 

Kod serwera:

# Imports 
import socket 
import packet 
import sys 
import os 
from time import sleep 
import multiprocessing as mp 
import pickle 
import io 

# Constants 
DEGREE_OF_PARALLELISM = 4 
DEFAULT_HOST = "" 
DEFAULT_PORT = 0 

def _parse_cmd_line_args(): 
    arguments = sys.argv 
    if len(arguments) == 1: 
     return DEFAULT_HOST, DEFAULT_PORT 
    else: 
     raise NotImplemented() 

def debug(data): 
    pid = os.getpid() 
    with open('C:\\Users\\Trauer\\Desktop\\debug\\'+str(pid)+'.txt', mode='a', 
       encoding='utf8') as file: 
     file.write(str(data) + '\n') 

def handle_connection(client): 
    client_data = client.recv(packet.MAX_PACKET_SIZE_BYTES) 
    debug('received data from client: ' + str(len(client_data))) 
    response = client_data.upper() 
    client.send(response)  
    debug('sent data from client: ' + str(response)) 

def listen(picklez):  
    debug('started listen function') 

    pid = os.getpid() 
    server_socket = pickle.loads(picklez) 
    debug('acquired socket') 

    while True: 
     debug('Sub process {0} is waiting for connection...'.format(str(pid))) 

     client, address = server_socket.accept() 
     debug('Sub process {0} accepted connection {1}'.format(str(pid), 
       str(client))) 

     handle_connection(client)   
     client.close() 
     debug('Sub process {0} finished handling connection {1}'. 
       format(str(pid),str(client))) 

if __name__ == "__main__":  
# Since most python interpreters have a GIL, multithreading won't cut 
# it... Oughta bust out some process, yo! 
    host_port = _parse_cmd_line_args() 
    print('Server is running...') 
    print('Degree of parallelism: ' + str(DEGREE_OF_PARALLELISM)) 

    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
    print('Socket created.') 

    server_socket.bind(host_port) 
    server_socket.listen(DEGREE_OF_PARALLELISM) 
    print('Socket bount to: ' + str(host_port))   

    buffer = io.BytesIO() 
    mp.reduction.ForkingPickler(buffer).dump(server_socket) 
    picklez = buffer.getvalue() 

    children = [] 
    for i in range(DEGREE_OF_PARALLELISM):   
     child_process = mp.Process(target=listen, args=(picklez,)) 
     child_process.daemon = True 
     child_process.start() 
     children.append(child_process) 

     while not child_process.pid: 
      sleep(.25) 

     print('Process {0} is alive: {1}'.format(str(child_process.pid), 
       str(child_process.is_alive())))  
    print()  

    kids_are_alive = True 
    while kids_are_alive: 
     print('Press ctrl+c to kill all processes.\n') 
     sleep(1) 

     exit_codes = [] 
     for child_process in children: 
      print('Process {0} is alive: {1}'.format(str(child_process.pid), 
       str(child_process.is_alive()))) 
      print('Process {0} exit code: {1}'.format(str(child_process.pid), 
       str(child_process.exitcode))) 
      exit_codes.append(child_process.exitcode) 

     if all(exit_codes): 
      # Why do they die so young? :(
      print('The children died...') 
      print('Why god?') 
      print('WHYYyyyyy!!?!?!?') 
      kids_are_alive = False 

edit: stały podpis „słuchać” . Moje procesy wciąż umierają natychmiast.

edit2: Użytkownik cmidi zwrócił uwagę, że ten kod działa w systemie Linux; więc moje pytanie brzmi: Jak mogę "wykonać tę pracę" w systemie Windows?

+1

Procesory wieloprocesorowe przekazują gniazda do procesów potomnych automatycznie. Implementacja systemu Windows wykorzystuje metodę 'share' oraz' shareshare'. – eryksun

+1

Wywołaj 'accept()' w procesie głównym i przekaż wynikową krotkę '(conn, addr)' do pracownika używającego współużytkowanego 'multiprocessing.Queue'. – eryksun

+0

Bez zmian, eryksun. Moje procesy wciąż umierają natychmiast. – Trauer

Odpowiedz

3

Można bezpośrednio przekazać gniazdo do procesu potomnego. wieloprocesorowe rejestruje zmniejszenie dla tego, dla których realizacja systemu Windows wykorzystuje następujące DupSocket klasę z multiprocessing.resource_sharer:

class DupSocket(object): 
    '''Picklable wrapper for a socket.''' 
    def __init__(self, sock): 
     new_sock = sock.dup() 
     def send(conn, pid): 
      share = new_sock.share(pid) 
      conn.send_bytes(share) 
     self._id = _resource_sharer.register(send, new_sock.close) 

    def detach(self): 
     '''Get the socket. This should only be called once.''' 
     with _resource_sharer.get_connection(self._id) as conn: 
      share = conn.recv_bytes() 
      return socket.fromshare(share) 

To wywołuje metodę Okna gniazdo share, która zwraca bufor z informacji protokołu wywoływania WSADuplicateSocket. Rejestruje się z zasobnikiem zasobów, aby wysłać ten bufor przez połączenie z procesem podrzędnym. Dziecko z kolei dzwoni pod numer detach, który odbiera bufor informacji o protokole i rekonstruuje gniazdo przez socket.fromshare.

Nie jest to bezpośrednio związane z twoim problemem, ale zalecam przeprojektowanie serwera, aby zamiast tego zadzwonił pod numer accept w procesie głównym, co jest normalne (np. W module Pythona socketserver.ForkingTCPServer). Przekaż wynikową krotkę (conn, address) pierwszemu dostępnemu pracownikowi ponad multiprocessing.Queue, który jest współużytkowany przez wszystkich pracowników w puli procesowej. Lub rozważ skorzystanie z multiprocessing.Pool z apply_async.

+0

Rozumiem. A co z [tym] (http://pastebin.com/uRjCBuaD) podejściem, czy jest to uzasadnione? – Trauer

+0

Nie sprawdziłem, jak przetwarza się zaległości, gdy gniazdo nasłuchu jest kopiowane do innego procesu. Jeśli każda kopia ma swój własny backlog, to prawdopodobnie nie jest to, co chcesz. – eryksun

+0

Rozumiem. Ostatnie pytanie i przestaję ci przeszkadzać, hehe. Czy wiesz, gdzie mogę znaleźć dokumentację na ten temat? – Trauer

0

def listen() cel/start dla procesów potomnych nie ma żadnego argumentu, ale zapewniają gniazdo odcinkach jako argument args=(picklez,) do procesu potomnego to spowodowałoby wyjątek w procesie potomnym i wyjściu natychmiast.

TypeError: listen() takes no arguments (1 given) 

def listen(picklez) powinno rozwiązać problem ten zapewni jeden argument do celu swoich procesów potomnych.

+0

Rzeczywiście pierwotnie opublikowana funkcja nie była kompletna. Opublikowalem tu niekompletny/niepoprawny kod. Poprawnym podpisem jest listen (picklez). Chodzi mi o to: niestety to nie jest problem; oryginalny kod ma prawidłowy podpis, ale moje procesy potomne wciąż giną, ale dziękuję za odpowiedź. – Trauer

+0

ściśle mówiąc 'accept' powinno być zablokowane pomiędzy dostępem do różnych procesów i kod wydaje się działać w środowisku Linux nawet bez serializacji gniazda – cmidi

+0

Obawiałem się tego ... Miałem przeczucie, że to komplikacja Windows. Dodam tę informację do mojego quesitonu. Dziękuję cmidi. – Trauer