diff options
author | Michael Meeks <michael.meeks@collabora.com> | 2020-04-09 14:43:51 +0100 |
---|---|---|
committer | Michael Meeks <michael.meeks@collabora.com> | 2020-04-10 10:06:23 +0200 |
commit | 5710c8632383e92372e1d81b6e26acc975e25ec4 (patch) | |
tree | cd9e16cde8584bead5685ed067920ef2045666ba /net | |
parent | 050820f45945bcceaffaa3098236b011fbd98772 (diff) |
Poll - switch to ppoll for closer to microsecond accuracy.
Change-Id: Ib8a2bb6f60302df8631edadbbb8db626894c457c
Reviewed-on: https://gerrit.libreoffice.org/c/online/+/92000
Tested-by: Jenkins CollaboraOffice <jenkinscollaboraoffice@gmail.com>
Reviewed-by: Michael Meeks <michael.meeks@collabora.com>
Diffstat (limited to 'net')
-rw-r--r-- | net/DelaySocket.cpp | 14 | ||||
-rw-r--r-- | net/ServerSocket.hpp | 4 | ||||
-rw-r--r-- | net/Socket.cpp | 140 | ||||
-rw-r--r-- | net/Socket.hpp | 142 | ||||
-rw-r--r-- | net/SslSocket.hpp | 6 | ||||
-rw-r--r-- | net/WebSocketHandler.hpp | 24 |
6 files changed, 172 insertions, 158 deletions
diff --git a/net/DelaySocket.cpp b/net/DelaySocket.cpp index cfd8b7e54..b07b46cc4 100644 --- a/net/DelaySocket.cpp +++ b/net/DelaySocket.cpp @@ -77,17 +77,17 @@ public: // FIXME - really need to propagate 'noDelay' etc. // have a debug only lookup of delayed sockets for this case ? - int getPollEvents(std::chrono::steady_clock::time_point now, - int &timeoutMaxMs) override + int pgetPollEvents(std::chrono::steady_clock::time_point now, + int64_t &timeoutMaxMicroS) override { if (_chunks.size() > 0) { - int remainingMs = std::chrono::duration_cast<std::chrono::milliseconds>( + int64_t remainingMicroS = std::chrono::duration_cast<std::chrono::microseconds>( (*_chunks.begin())->getSendTime() - now).count(); - if (remainingMs < timeoutMaxMs) - DELAY_LOG("#" << getFD() << " reset timeout max to " << remainingMs - << "ms from " << timeoutMaxMs << "ms\n"); - timeoutMaxMs = std::min(timeoutMaxMs, remainingMs); + if (remainingMicroS < timeoutMaxMicroS) + DELAY_LOG("#" << getFD() << " reset timeout max to " << remainingMicroS + << "us from " << timeoutMaxMicroS << "us\n"); + timeoutMaxMicroS = std::min(timeoutMaxMicroS, remainingMicroS); } if (_chunks.size() > 0 && diff --git a/net/ServerSocket.hpp b/net/ServerSocket.hpp index 1017aeb5e..65f826913 100644 --- a/net/ServerSocket.hpp +++ b/net/ServerSocket.hpp @@ -65,8 +65,8 @@ public: /// Returns a valid Socket shared_ptr on success only. virtual std::shared_ptr<Socket> accept(); - int getPollEvents(std::chrono::steady_clock::time_point /* now */, - int & /* timeoutMaxMs */) override + int pgetPollEvents(std::chrono::steady_clock::time_point /* now */, + int64_t & /* timeoutMaxMicroS */) override { return POLLIN; } diff --git a/net/Socket.cpp b/net/Socket.cpp index 5bb1fa250..47ae1ad41 100644 --- a/net/Socket.cpp +++ b/net/Socket.cpp @@ -35,7 +35,7 @@ #endif #include "WebSocketHandler.hpp" -int SocketPoll::DefaultPollTimeoutMs = 5000; +int SocketPoll::DefaultPollTimeoutMicroS = 5000 * 1000; std::atomic<bool> SocketPoll::InhibitThreadChecks(false); std::atomic<bool> Socket::InhibitThreadChecks(false); @@ -194,6 +194,136 @@ void SocketPoll::pollingThreadEntry() LOG_INF("Finished polling thread [" << _name << "]."); } +int SocketPoll::ppoll(int64_t timeoutMaxMicroS) +{ + if (_runOnClientThread) + checkAndReThread(); + else + assertCorrectThread(); + + std::chrono::steady_clock::time_point now = + std::chrono::steady_clock::now(); + + // The events to poll on change each spin of the loop. + psetupPollFds(now, timeoutMaxMicroS); + const size_t size = _pollSockets.size(); + + int rc; + do + { +#if !MOBILEAPP +# if HAVE_PPOLL + LOG_TRC("ppoll start, timeoutMicroS: " << timeoutMaxMicroS << " size " << size); + timeoutMaxMicroS = std::max(timeoutMaxMicroS, (int64_t)0); + struct timespec timeout; + timeout.tv_sec = timeoutMaxMicroS / (1000 * 1000); + timeout.tv_nsec = (timeoutMaxMicroS % (1000 * 1000)) * 1000; + rc = ::ppoll(&_pollFds[0], size + 1, &timeout, nullptr); + LOG_TRC("ppoll result " << rc << " errno " << strerror(errno)); +# else + int timeoutMaxMs = (timeoutMaxMicroS + 9999) / 1000; + LOG_TRC("Legacy Poll start, timeoutMs: " << timeoutMaxMs); + rc = ::poll(&_pollFds[0], size + 1, std::max(timeoutMaxMs,0)); +# endif +#else + LOG_TRC("SocketPoll Poll"); + int timeoutMaxMs = (timeoutMaxMicroS + 9999) / 1000; + rc = fakeSocketPoll(&_pollFds[0], size + 1, std::max(timeoutMaxMs,0)); +#endif + } + while (rc < 0 && errno == EINTR); + LOG_TRC("Poll completed with " << rc << " live polls max (" << + timeoutMaxMicroS << "us)" << ((rc==0) ? "(timedout)" : "")); + + // First process the wakeup pipe (always the last entry). + if (_pollFds[size].revents) + { + std::vector<CallbackFn> invoke; + { + std::lock_guard<std::mutex> lock(_mutex); + + // Clear the data. +#if !MOBILEAPP + int dump = ::read(_wakeup[0], &dump, sizeof(dump)); +#else + LOG_TRC("Wakeup pipe read"); + int dump = fakeSocketRead(_wakeup[0], &dump, sizeof(dump)); +#endif + // Copy the new sockets over and clear. + _pollSockets.insert(_pollSockets.end(), + _newSockets.begin(), _newSockets.end()); + + // Update thread ownership. + for (auto &i : _newSockets) + i->setThreadOwner(std::this_thread::get_id()); + + _newSockets.clear(); + + // Extract list of callbacks to process + std::swap(_newCallbacks, invoke); + } + + for (const auto& callback : invoke) + { + try + { + callback(); + } + catch (const std::exception& exc) + { + LOG_ERR("Exception while invoking poll [" << _name << + "] callback: " << exc.what()); + } + } + + try + { + wakeupHook(); + } + catch (const std::exception& exc) + { + LOG_ERR("Exception while invoking poll [" << _name << + "] wakeup hook: " << exc.what()); + } + } + + // This should only happen when we're stopping. + if (_pollSockets.size() != size) + return rc; + + // Fire the poll callbacks and remove dead fds. + std::chrono::steady_clock::time_point newNow = + std::chrono::steady_clock::now(); + + for (int i = static_cast<int>(size) - 1; i >= 0; --i) + { + SocketDisposition disposition(_pollSockets[i]); + try + { + _pollSockets[i]->handlePoll(disposition, newNow, + _pollFds[i].revents); + } + catch (const std::exception& exc) + { + LOG_ERR("Error while handling poll for socket #" << + _pollFds[i].fd << " in " << _name << ": " << exc.what()); + disposition.setClosed(); + rc = -1; + } + + if (disposition.isMove() || disposition.isClosed()) + { + LOG_DBG("Removing socket #" << _pollFds[i].fd << " (of " << + _pollSockets.size() << ") from " << _name); + _pollSockets.erase(_pollSockets.begin() + i); + } + + disposition.execute(); + } + + return rc; +} + void SocketPoll::wakeupWorld() { for (const auto& fd : getWakeupsArray()) @@ -384,8 +514,8 @@ void SocketDisposition::execute() _socketMove = nullptr; } -const int WebSocketHandler::InitialPingDelayMs = 25; -const int WebSocketHandler::PingFrequencyMs = 18 * 1000; +const int WebSocketHandler::InitialPingDelayMicroS = 25 * 1000; +const int WebSocketHandler::PingFrequencyMicroS = 18 * 1000 * 1000; void WebSocketHandler::dumpState(std::ostream& os) { @@ -398,8 +528,8 @@ void WebSocketHandler::dumpState(std::ostream& os) void StreamSocket::dumpState(std::ostream& os) { - int timeoutMaxMs = SocketPoll::DefaultPollTimeoutMs; - int events = getPollEvents(std::chrono::steady_clock::now(), timeoutMaxMs); + int64_t timeoutMaxMicroS = SocketPoll::DefaultPollTimeoutMicroS; + int events = pgetPollEvents(std::chrono::steady_clock::now(), timeoutMaxMicroS); os << "\t" << getFD() << "\t" << events << "\t" << _inBuffer.size() << "\t" << _outBuffer.size() << "\t" << " r: " << _bytesRecvd << "\t w: " << _bytesSent << "\t" diff --git a/net/Socket.hpp b/net/Socket.hpp index ab56e5d10..852012424 100644 --- a/net/Socket.hpp +++ b/net/Socket.hpp @@ -154,8 +154,8 @@ public: /// Prepare our poll record; adjust @timeoutMaxMs downwards /// for timeouts, based on current time @now. /// @returns POLLIN and POLLOUT if output is expected. - virtual int getPollEvents(std::chrono::steady_clock::time_point now, - int &timeoutMaxMs) = 0; + virtual int pgetPollEvents(std::chrono::steady_clock::time_point now, + int64_t &timeoutMaxMicroS) = 0; /// Handle results of events returned from poll virtual void handlePoll(SocketDisposition &disposition, @@ -370,8 +370,8 @@ public: /// Prepare our poll record; adjust @timeoutMaxMs downwards /// for timeouts, based on current time @now. /// @returns POLLIN and POLLOUT if output is expected. - virtual int getPollEvents(std::chrono::steady_clock::time_point now, - int &timeoutMaxMs) = 0; + virtual int pgetPollEvents(std::chrono::steady_clock::time_point now, + int64_t &timeoutMaxMicroS) = 0; /// Do we need to handle a timeout ? virtual void checkTimeout(std::chrono::steady_clock::time_point /* now */) {} @@ -476,7 +476,7 @@ public: ~SocketPoll(); /// Default poll time - useful to increase for debugging. - static int DefaultPollTimeoutMs; + static int DefaultPollTimeoutMicroS; static std::atomic<bool> InhibitThreadChecks; /// Stop the polling thread. @@ -533,7 +533,7 @@ public: { while (continuePolling()) { - poll(DefaultPollTimeoutMs); + ppoll(DefaultPollTimeoutMicroS); } } @@ -576,123 +576,7 @@ public: /// Poll the sockets for available data to read or buffer to write. /// Returns the return-value of poll(2): 0 on timeout, /// -1 for error, and otherwise the number of events signalled. - int poll(int timeoutMaxMs) - { - if (_runOnClientThread) - checkAndReThread(); - else - assertCorrectThread(); - - std::chrono::steady_clock::time_point now = - std::chrono::steady_clock::now(); - - // The events to poll on change each spin of the loop. - setupPollFds(now, timeoutMaxMs); - const size_t size = _pollSockets.size(); - - int rc; - do - { -#if !MOBILEAPP - LOG_TRC("Poll start, timeoutMs: " << timeoutMaxMs); - rc = ::poll(&_pollFds[0], size + 1, std::max(timeoutMaxMs,0)); -#else - LOG_TRC("SocketPoll Poll"); - rc = fakeSocketPoll(&_pollFds[0], size + 1, std::max(timeoutMaxMs,0)); -#endif - } - while (rc < 0 && errno == EINTR); - LOG_TRC("Poll completed with " << rc << " live polls max (" << - timeoutMaxMs << "ms)" << ((rc==0) ? "(timedout)" : "")); - - // First process the wakeup pipe (always the last entry). - if (_pollFds[size].revents) - { - std::vector<CallbackFn> invoke; - { - std::lock_guard<std::mutex> lock(_mutex); - - // Clear the data. -#if !MOBILEAPP - int dump = ::read(_wakeup[0], &dump, sizeof(dump)); -#else - LOG_TRC("Wakeup pipe read"); - int dump = fakeSocketRead(_wakeup[0], &dump, sizeof(dump)); -#endif - // Copy the new sockets over and clear. - _pollSockets.insert(_pollSockets.end(), - _newSockets.begin(), _newSockets.end()); - - // Update thread ownership. - for (auto &i : _newSockets) - i->setThreadOwner(std::this_thread::get_id()); - - _newSockets.clear(); - - // Extract list of callbacks to process - std::swap(_newCallbacks, invoke); - } - - for (const auto& callback : invoke) - { - try - { - callback(); - } - catch (const std::exception& exc) - { - LOG_ERR("Exception while invoking poll [" << _name << - "] callback: " << exc.what()); - } - } - - try - { - wakeupHook(); - } - catch (const std::exception& exc) - { - LOG_ERR("Exception while invoking poll [" << _name << - "] wakeup hook: " << exc.what()); - } - } - - // This should only happen when we're stopping. - if (_pollSockets.size() != size) - return rc; - - // Fire the poll callbacks and remove dead fds. - std::chrono::steady_clock::time_point newNow = - std::chrono::steady_clock::now(); - - for (int i = static_cast<int>(size) - 1; i >= 0; --i) - { - SocketDisposition disposition(_pollSockets[i]); - try - { - _pollSockets[i]->handlePoll(disposition, newNow, - _pollFds[i].revents); - } - catch (const std::exception& exc) - { - LOG_ERR("Error while handling poll for socket #" << - _pollFds[i].fd << " in " << _name << ": " << exc.what()); - disposition.setClosed(); - rc = -1; - } - - if (disposition.isMove() || disposition.isClosed()) - { - LOG_DBG("Removing socket #" << _pollFds[i].fd << " (of " << - _pollSockets.size() << ") from " << _name); - _pollSockets.erase(_pollSockets.begin() + i); - } - - disposition.execute(); - } - - return rc; - } + int ppoll(int64_t timeoutMaxMicroS); /// Write to a wakeup descriptor static void wakeup (int fd) @@ -811,8 +695,8 @@ private: const std::string &pathAndQuery); /// Initialize the poll fds array with the right events - void setupPollFds(std::chrono::steady_clock::time_point now, - int &timeoutMaxMs) + void psetupPollFds(std::chrono::steady_clock::time_point now, + int64_t &timeoutMaxMicroS) { const size_t size = _pollSockets.size(); @@ -820,7 +704,7 @@ private: for (size_t i = 0; i < size; ++i) { - int events = _pollSockets[i]->getPollEvents(now, timeoutMaxMs); + int events = _pollSockets[i]->pgetPollEvents(now, timeoutMaxMicroS); assert(events >= 0); // Or > 0 even? _pollFds[i].fd = _pollSockets[i]->getFD(); _pollFds[i].events = events; @@ -920,12 +804,12 @@ public: Socket::shutdown(); } - int getPollEvents(std::chrono::steady_clock::time_point now, - int &timeoutMaxMs) override + int pgetPollEvents(std::chrono::steady_clock::time_point now, + int64_t &timeoutMaxMicroS) override { // cf. SslSocket::getPollEvents assertCorrectThread(); - int events = _socketHandler->getPollEvents(now, timeoutMaxMs); + int events = _socketHandler->pgetPollEvents(now, timeoutMaxMicroS); if (!_outBuffer.empty() || _shutdownSignalled) events |= POLLOUT; return events; diff --git a/net/SslSocket.hpp b/net/SslSocket.hpp index 27e075328..e6b7f908f 100644 --- a/net/SslSocket.hpp +++ b/net/SslSocket.hpp @@ -126,11 +126,11 @@ public: return handleSslState(SSL_write(_ssl, buf, len)); } - int getPollEvents(std::chrono::steady_clock::time_point now, - int & timeoutMaxMs) override + int pgetPollEvents(std::chrono::steady_clock::time_point now, + int64_t & timeoutMaxMicroS) override { assertCorrectThread(); - int events = getSocketHandler()->getPollEvents(now, timeoutMaxMs); + int events = getSocketHandler()->pgetPollEvents(now, timeoutMaxMicroS); if (_sslWantsTo == SslWantsTo::Read) { diff --git a/net/WebSocketHandler.hpp b/net/WebSocketHandler.hpp index b23c3951f..24c3a839a 100644 --- a/net/WebSocketHandler.hpp +++ b/net/WebSocketHandler.hpp @@ -48,8 +48,8 @@ protected: static const unsigned char Mask = 0x80; }; - static const int InitialPingDelayMs; - static const int PingFrequencyMs; + static const int InitialPingDelayMicroS; + static const int PingFrequencyMicroS; public: /// Perform upgrade ourselves, or select a client web socket. @@ -81,8 +81,8 @@ public: const Poco::Net::HTTPRequest& request) : _socket(socket) , _lastPingSentTime(std::chrono::steady_clock::now() - - std::chrono::milliseconds(PingFrequencyMs) - - std::chrono::milliseconds(InitialPingDelayMs)) + std::chrono::microseconds(PingFrequencyMicroS) - + std::chrono::microseconds(InitialPingDelayMicroS)) , _pingTimeUs(0) , _shuttingDown(false) , _isClient(false) @@ -430,14 +430,14 @@ public: } } - int getPollEvents(std::chrono::steady_clock::time_point now, - int & timeoutMaxMs) override + int pgetPollEvents(std::chrono::steady_clock::time_point now, + int64_t & timeoutMaxMicroS) override { if (!_isClient) { - const int timeSincePingMs = - std::chrono::duration_cast<std::chrono::milliseconds>(now - _lastPingSentTime).count(); - timeoutMaxMs = std::min(timeoutMaxMs, PingFrequencyMs - timeSincePingMs); + const int64_t timeSincePingMicroS = + std::chrono::duration_cast<std::chrono::microseconds>(now - _lastPingSentTime).count(); + timeoutMaxMicroS = std::min(timeoutMaxMicroS, PingFrequencyMicroS - timeSincePingMicroS); } int events = POLLIN; if (_msgHandler && _msgHandler->hasQueuedMessages()) @@ -493,9 +493,9 @@ private: if (_isClient) return; - const int timeSincePingMs = - std::chrono::duration_cast<std::chrono::milliseconds>(now - _lastPingSentTime).count(); - if (timeSincePingMs >= PingFrequencyMs) + const int64_t timeSincePingMicroS = + std::chrono::duration_cast<std::chrono::microseconds>(now - _lastPingSentTime).count(); + if (timeSincePingMicroS >= PingFrequencyMicroS) { const std::shared_ptr<StreamSocket> socket = _socket.lock(); if (socket) |