2015-07-26 43 views
5

W this pytanie opisałem boost :: asio i boost :: współprogram wykorzystania wzoru, który powoduje losowe awarie mojego wniosku i wydałem wyciąg z mojego kodu i valgrind i GDB wyjściowego.Co jest nie tak z tym schematem zwiększania :: asio i boost :: coroutine?

W celu dokładniejszego zbadania problemu utworzyłem mniejszy wniosek o zatwierdzenie koncepcji, który stosuje ten sam wzorzec. Widziałem, że ten sam problem pojawia się w mniejszym programie, którego źródło publikuję tutaj.

Kod rozpoczyna kilka wątków i tworzy pulę połączeń z kilkoma fałszywymi połączeniami (numery dostarczone przez użytkownika). Dodatkowe argumenty są liczbami całkowitymi bez znaku, który pełni rolę fałszywych żądań. Fałszywa implementacja funkcji sendRequest po prostu uruchamia asynchroniczny licznik czasu oczekiwania na liczbę sekund równą liczbie wejściowej i yileds z funkcji.

Czy ktoś może zobaczyć problem z tym kodem i czy może zaproponować poprawkę?

#include "asiocoroutineutils.h" 
#include "concurrentqueue.h" 

#include <iostream> 
#include <thread> 

#include <boost/lexical_cast.hpp> 

using namespace std; 
using namespace boost; 
using namespace utils; 

#define id this_thread::get_id() << ": " 

// --------------------------------------------------------------------------- 

/*! 
* \brief This is a fake Connection class 
*/ 
class Connection 
{ 
public: 
    Connection(unsigned connectionId) 
     : _id(connectionId) 
    { 
    } 

    unsigned getId() const 
    { 
     return _id; 
    } 

    void sendRequest(asio::io_service& ioService, 
        unsigned seconds, 
        AsioCoroutineJoinerProxy, 
        asio::yield_context yield) 
    { 
     cout << id << "Connection " << getId() 
      << " Start sending: " << seconds << endl; 

     // waiting on this timer is palceholder for any asynchronous operation 
     asio::steady_timer timer(ioService); 
     timer.expires_from_now(chrono::seconds(seconds)); 
     coroutineAsyncWait(timer, yield); 

     cout << id << "Connection " << getId() 
      << " Received response: " << seconds << endl; 
    } 

private: 
    unsigned _id; 
}; 

typedef std::unique_ptr<Connection> ConnectionPtr; 
typedef std::shared_ptr<asio::steady_timer> TimerPtr; 

// --------------------------------------------------------------------------- 

class ConnectionPool 
{ 
public: 
    ConnectionPool(size_t connectionsCount) 
    { 
     for(size_t i = 0; i < connectionsCount; ++i) 
     { 
      cout << "Creating connection: " << i << endl; 
      _connections.emplace_back(new Connection(i)); 
     } 
    } 

    ConnectionPtr getConnection(TimerPtr timer, 
           asio::yield_context& yield) 
    { 
     lock_guard<mutex> lock(_mutex); 

     while(_connections.empty()) 
     { 
      cout << id << "There is no free connection." << endl; 

      _timers.emplace_back(timer); 
      timer->expires_from_now(
       asio::steady_timer::clock_type::duration::max()); 

      _mutex.unlock(); 
      coroutineAsyncWait(*timer, yield); 
      _mutex.lock(); 

      cout << id << "Connection was freed." << endl; 
     } 

     cout << id << "Getting connection: " 
      << _connections.front()->getId() << endl; 

     ConnectionPtr connection = std::move(_connections.front()); 
     _connections.pop_front(); 
     return connection; 
    } 

    void addConnection(ConnectionPtr connection) 
    { 
     lock_guard<mutex> lock(_mutex); 

     cout << id << "Returning connection " << connection->getId() 
      << " to the pool." << endl; 

     _connections.emplace_back(std::move(connection)); 

     if(_timers.empty()) 
      return; 

     auto timer = _timers.back(); 
     _timers.pop_back(); 
     auto& ioService = timer->get_io_service(); 

     ioService.post([timer]() 
     { 
      cout << id << "Wake up waiting getConnection." << endl; 
      timer->cancel(); 
     }); 
    } 

private: 
    mutex _mutex; 
    deque<ConnectionPtr> _connections; 
    deque<TimerPtr> _timers; 
}; 

typedef unique_ptr<ConnectionPool> ConnectionPoolPtr; 

// --------------------------------------------------------------------------- 

class ScopedConnection 
{ 
public: 
    ScopedConnection(ConnectionPool& pool, 
        asio::io_service& ioService, 
        asio::yield_context& yield) 
     : _pool(pool) 
    { 
     auto timer = make_shared<asio::steady_timer>(ioService); 
     _connection = _pool.getConnection(timer, yield); 
    } 

    Connection& get() 
    { 
     return *_connection; 
    } 

    ~ScopedConnection() 
    { 
     _pool.addConnection(std::move(_connection)); 
    } 

private: 
    ConnectionPool& _pool; 
    ConnectionPtr _connection; 
}; 

// --------------------------------------------------------------------------- 

void sendRequest(asio::io_service& ioService, 
       ConnectionPool& pool, 
       unsigned seconds, 
       asio::yield_context yield) 
{ 
    cout << id << "Constructing request ..." << endl; 

    AsioCoroutineJoiner joiner(ioService); 

    ScopedConnection connection(pool, ioService, yield); 

    asio::spawn(ioService, bind(&Connection::sendRequest, 
           connection.get(), 
           std::ref(ioService), 
           seconds, 
           AsioCoroutineJoinerProxy(joiner), 
           placeholders::_1)); 

    joiner.join(yield); 

    cout << id << "Processing response ..." << endl; 
} 

// --------------------------------------------------------------------------- 

void threadFunc(ConnectionPool& pool, 
       ConcurrentQueue<unsigned>& requests) 
{ 
    try 
    { 
     asio::io_service ioService; 

     while(true) 
     { 
      unsigned request; 
      if(!requests.tryPop(request)) 
       break; 

      cout << id << "Scheduling request: " << request << endl; 

      asio::spawn(ioService, bind(sendRequest, 
             std::ref(ioService), 
             std::ref(pool), 
             request, 
             placeholders::_1)); 
     } 

     ioService.run(); 
    } 
    catch(const std::exception& e) 
    { 
     cerr << id << "Error: " << e.what() << endl; 
    } 
} 

// --------------------------------------------------------------------------- 

int main(int argc, char* argv[]) 
{ 
    if(argc < 3) 
    { 
     cout << "Usage: ./async_request poolSize threadsCount r0 r1 ..." 
      << endl; 
     return -1; 
    } 

    try 
    { 
     auto poolSize = lexical_cast<size_t>(argv[1]); 
     auto threadsCount = lexical_cast<size_t>(argv[2]); 

     ConcurrentQueue<unsigned> requests; 
     for(int i = 3; i < argc; ++i) 
     { 
      auto request = lexical_cast<unsigned>(argv[i]); 
      requests.tryPush(request); 
     } 

     ConnectionPoolPtr pool(new ConnectionPool(poolSize)); 

     vector<unique_ptr<thread>> threads; 
     for(size_t i = 0; i < threadsCount; ++i) 
     { 
      threads.emplace_back(
       new thread(threadFunc, std::ref(*pool), std::ref(requests))); 
     } 

     for_each(threads.begin(), threads.end(), mem_fn(&thread::join)); 
    } 
    catch(const std::exception& e) 
    { 
     cerr << "Error: " << e.what() << endl; 
    } 

    return 0; 
} 

Oto niektóre narzędzia pomocnicze użyte w powyższym kodzie:

#pragma once 

#include <boost/asio/steady_timer.hpp> 
#include <boost/asio/spawn.hpp> 

namespace utils 
{ 

inline void coroutineAsyncWait(boost::asio::steady_timer& timer, 
           boost::asio::yield_context& yield) 
{ 
    boost::system::error_code ec; 
    timer.async_wait(yield[ec]); 
    if(ec && ec != boost::asio::error::operation_aborted) 
     throw std::runtime_error(ec.message()); 
} 

class AsioCoroutineJoiner 
{ 
public: 
    explicit AsioCoroutineJoiner(boost::asio::io_service& io) 
     : _timer(io), _count(0) {} 

    void join(boost::asio::yield_context yield) 
    { 
     assert(_count > 0); 
     _timer.expires_from_now(
      boost::asio::steady_timer::clock_type::duration::max()); 
     coroutineAsyncWait(_timer, yield); 
    } 

    void inc() 
    { 
     ++_count; 
    } 

    void dec() 
    { 
     assert(_count > 0); 
     --_count; 
     if(0 == _count) 
      _timer.cancel(); 
    } 

private: 
    boost::asio::steady_timer _timer; 
    std::size_t _count; 

}; // AsioCoroutineJoiner class 

class AsioCoroutineJoinerProxy 
{ 
public: 
    AsioCoroutineJoinerProxy(AsioCoroutineJoiner& joiner) 
     : _joiner(joiner) 
    { 
     _joiner.inc(); 
    } 

    AsioCoroutineJoinerProxy(const AsioCoroutineJoinerProxy& joinerProxy) 
     : _joiner(joinerProxy._joiner) 
    { 
     _joiner.inc(); 
    } 

    ~AsioCoroutineJoinerProxy() 
    { 
     _joiner.dec(); 
    } 

private: 
    AsioCoroutineJoiner& _joiner; 

}; // AsioCoroutineJoinerProxy class 

} // utils namespace 

Dla kompletności kodu ostatnim brakującym elementem jest ConcurrentQueue klasa. Jest zbyt długi, aby go tutaj wkleić, ale jeśli chcesz, możesz go znaleźć here.

przykład korzystanie z aplikacji jest:

./connectionpooltest 3 3 5 7 8 1 0 9 2 4 3 6

gdzie pierwsza liczba 3 są fałszywe połączenia policzyć i druga liczba 3 to liczba używanych wątków. Liczby po nich to fałszywe prośby.

Wyjście valgrind i GDB jest taka sama jak we wspomnianej powyżej question.

Używana wersja wzmocnienia to 1.57. Kompilator to GCC 4.8.3. System operacyjny Linux CentOS zwolnić 7.1.1503

+5

'#define id this_thread :: get_id() <<": "' Czy mówisz poważnie? – erenon

+0

możliwy duplikat [Co powoduje losową awarię w boost :: coroutine?] (Http: // stackoverflow.com/questions/31610415/what-causes-a-random-crash-in-boostcoroutine) – PSIAlt

+0

Ale kod jest teraz kompletny (wygląda na to) @PSIAlt Dałbym mu taką szansę – sehe

Odpowiedz

1

Wydaje się, że wszystkie Valgrind błędy są spowodowane powodu BOOST_USE_VALGRIND makro nie jest zdefiniowany jako Tanner Sansbury punktów w komentarzu związanych this pytanie. Wygląda na to, że program jest poprawny.