2014-04-09 18 views
6

Mam aplikację, która używa funkcji Boost.Asio do komunikacji z gniazdami TCP i UDP. Rozumiem, że "A" w "Asio" oznacza Asynchronous, więc biblioteka jest skłonna do zachęcania użytkownika do korzystania z asynchronicznych operacji we/wy, gdy jest to możliwe. Mam kilka przypadków, w których synchroniczne odczyty gniazd są preferowane. Jednocześnie jednak chciałbym ustawić limit czasu dla tych połączeń odbierających, więc nie ma możliwości zablokowania odczytu w nieskończoność.Czy można odczytać z gniazda synchronicznie za pomocą funkcji Boost.Asio z przekroczeniem czasu w wielowątkowej usłudze we-wy?

To wydaje się być dość powszechnym problemem wśród użytkowników Boost.Asio, z następującymi pytaniami przeszłości przepełnienie stosu na temat:

Może wystąpić błąd pl więcej. Istnieje nawet examples in the documentation, jak zaimplementować operacje synchroniczne z limitami czasu. Sprowadzają się do konwersji operacji synchronicznej na asynchroniczną, a następnie uruchomienia jej równolegle z asio::deadline_timer. Program obsługi wygaśnięcia timera może następnie anulować odczyt asynchroniczny w przypadku wygaśnięcia limitu czasu. Wygląda to mniej więcej tak (fragment zaczerpnięty z powyższym połączonego przykład):

std::size_t receive(const boost::asio::mutable_buffer& buffer, 
     boost::posix_time::time_duration timeout, boost::system::error_code& ec) 
    { 
    // Set a deadline for the asynchronous operation. 
    deadline_.expires_from_now(timeout); 

    // Set up the variables that receive the result of the asynchronous 
    // operation. The error code is set to would_block to signal that the 
    // operation is incomplete. Asio guarantees that its asynchronous 
    // operations will never fail with would_block, so any other value in 
    // ec indicates completion. 
    ec = boost::asio::error::would_block; 
    std::size_t length = 0; 

    // Start the asynchronous operation itself. The handle_receive function 
    // used as a callback will update the ec and length variables. 
    socket_.async_receive(boost::asio::buffer(buffer), 
     boost::bind(&client::handle_receive, _1, _2, &ec, &length)); 

    // Block until the asynchronous operation has completed. 
    do io_service_.run_one(); while (ec == boost::asio::error::would_block); 

    return length; 
    } 

to rzeczywiście jest stosunkowo czyste rozwiązanie: uruchomić asynchroniczne operacje, a następnie ręcznie sondować asio::io_service do wykonywania asynchronicznych teleskopowe jeden naraz aż albo async_receive() kończy się lub upłynie czas timera.

Co jednak z przypadkiem, w którym podstawowa usługa wejścia/wyjścia jest już uruchamiana w jednym lub kilku wątkach tła? W takim przypadku nie ma gwarancji, że moduły obsługi dla operacji asynchronicznych będą uruchamiane przez wątek pierwszoplanowy w powyższym fragmencie, więc run_one() nie powróci, dopóki nie zostanie wykonany jakiś późniejszy, prawdopodobnie niepowiązany, program obsługi. To sprawi, że gniazdo będzie raczej nie odpowiadać.

asio::io_service ma funkcję poll_one(), która sprawdzi kolejkę usługi bez blokowania, ale nie widzę dobrego sposobu blokowania wątku pierwszego planu (emulowanie synchronicznego zachowania wywołania) do momentu uruchomienia programu obsługi, z wyjątkiem przypadku, gdy nie są już wątkami tła, które wykonują już asio::io_service::run().

widzę dwóch możliwych rozwiązań, z których żadne nie lubię:

  1. Wykorzystanie zmiennej stanu lub podobną konstrukcję, aby planie bloku gwintu po rozpoczęciu operacji asynchronicznych. W module obsługi połączenia async_receive() zasygnalizuj zmienną warunku, aby odblokować wątek. To powoduje pewne blokowanie dla każdego odczytu, którego chciałbym uniknąć, ponieważ chciałbym osiągnąć maksymalną możliwą przepustowość na podstawie odczytów z UDP. W przeciwnym razie jest to wykonalne i prawdopodobnie zrobię to, o ile nie przedstawi się lepsza metoda.

  2. Upewnij się, że gniazdo ma swój własny asio::io_service, który nie jest uruchamiany przez wątki tła. Utrudnia to używanie asynchronicznych operacji we/wy z gniazdem w przypadkach, w których jest to pożądane.

Wszelkie pomysły na inne sposoby osiągnięcia tego w bezpieczny sposób?


marginesie: Istnieje kilka odpowiedzi na poprzednie pytania, które opowiadają SO używając opcji SO_RCVTIMEO gniazda wdrożyć gniazda czytać limit czasu. Brzmi świetnie w teorii, ale nie wydaje się działać na mojej platformie co najmniej (Ubuntu 12.04, Boost v1.55). Mogę ustawić limit czasu gniazda, ale nie zapewni to pożądanego efektu w Asio. Odpowiedni kod jest /boost/asio/detail/impl/socket_ops.ipp:

size_t sync_recvfrom(socket_type s, state_type state, buf* bufs, 
    size_t count, int flags, socket_addr_type* addr, 
    std::size_t* addrlen, boost::system::error_code& ec) 
{ 
    if (s == invalid_socket) 
    { 
    ec = boost::asio::error::bad_descriptor; 
    return 0; 
    } 

    // Read some data. 
    for (;;) 
    { 
    // Try to complete the operation without blocking. 
    signed_size_type bytes = socket_ops::recvfrom(
     s, bufs, count, flags, addr, addrlen, ec); 

    // Check if operation succeeded. 
    if (bytes >= 0) 
     return bytes; 

    // Operation failed. 
    if ((state & user_set_non_blocking) 
     || (ec != boost::asio::error::would_block 
      && ec != boost::asio::error::try_again)) 
     return 0; 

    // Wait for socket to become ready. 
    if (socket_ops::poll_read(s, 0, ec) < 0) 
     return 0; 
    } 
} 

Jeśli gniazdo czytać razy się, wywołanie recvfrom() powyżej zwróciłby EAGAIN lub EWOULDBLOCK, które są tłumaczone na boost::asio::error::try_again lub boost::asio::error::would_block. W tym przypadku, powyższy kod będzie wywołać funkcję poll_read(), która dla mojego platformy wygląda następująco:

int poll_read(socket_type s, state_type state, boost::system::error_code& ec) 
{ 
    if (s == invalid_socket) 
    { 
    ec = boost::asio::error::bad_descriptor; 
    return socket_error_retval; 
    } 

    pollfd fds; 
    fds.fd = s; 
    fds.events = POLLIN; 
    fds.revents = 0; 
    int timeout = (state & user_set_non_blocking) ? 0 : -1; 
    clear_last_error(); 
    int result = error_wrapper(::poll(&fds, 1, timeout), ec); 
    if (result == 0) 
    ec = (state & user_set_non_blocking) 
     ? boost::asio::error::would_block : boost::system::error_code(); 
    else if (result > 0) 
    ec = boost::system::error_code(); 
    return result; 
} 

I snipped się warunkowo skompilowanego kodu dla innych platform. Jak widać, jeśli gniazdo nie jest gniazdem bez blokady, kończy się wywoływanie poll() z nieskończonym czasem oczekiwania, blokując w ten sposób, aż gniazdo ma dane do odczytania (i udaremniając próbę przekroczenia limitu czasu). Dlatego opcja SO_RCVTIMEO nie działa.

Odpowiedz

6

Wsparcie Boost.Asio dla futures może stanowić eleganckie rozwiązanie. Gdy operacja asynchroniczna zostanie podana jako wartość obsługi boost::asio::use_future, funkcja inicjująca zwróci obiekt std::future, który otrzyma wynik operacji. Ponadto, jeśli operacja zakończy się niepowodzeniem, error_code zostanie przekonwertowany na system_error i przekazany dzwoniącemu przez future.

W latach Boost.Asio C++ 11 future datytime client example dedykowany wątek prowadzi io_service i główny wątek inicjuje operacje asynchroniczne synchronicznie następnie czeka na zakończenie operacji, takiej jak poniżej:

std::array<char, 128> recv_buf; 
udp::endpoint sender_endpoint; 
std::future<std::size_t> recv_length = 
    socket.async_receive_from(
     boost::asio::buffer(recv_buf), 
     sender_endpoint, 
     boost::asio::use_future); 

// Do other things here while the receive completes. 

std::cout.write(
    recv_buf.data(), 
    recv_length.get()); // Blocks until receive is complete. 

Podczas korzystania z future s ogólne podejście do implementacji odczytu synchronicznego z limitem czasu jest takie samo jak poprzednio. Zamiast korzystać z odczytu synchronicznego, można użyć odczytu asynchronicznego i asynchronicznie czekać na zegar. Jedyną niewielką zmianą jest to, że zamiast blokować io_service lub okresowo sprawdzać predykat, należy wywołać future::get(), aby zablokować, dopóki operacja nie zakończy się sukcesem lub niepowodzeniem (takim jak przekroczenie limitu czasu).

Jeśli C++ 11 nie jest dostępny, to typ powrotu dla operacji asynchronicznych można dostosować do Boost.Thread's future, jak zademonstrowano w odpowiedzi this.

+0

Dzięki za wgląd; Nie wiedziałem, że Asio może korzystać z kontraktów futures w ten sposób. Bez patrzenia na to, implementacja jest prawdopodobnie podobna do mojego istniejącego podejścia polegającego na użyciu 'condition_variable' do blokowania wywołującego wątku do czasu zakończenia operacji, ale miło jest mieć coś nieco bardziej standardowego. –

+0

Dzięki za odpowiedź. Ten kod nie działa dla mnie [link] (https://stackoverflow.com/questions/46166923/). – ar2015