summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
authorMichael Meeks <michael.meeks@collabora.com>2020-04-09 14:43:51 +0100
committerMichael Meeks <michael.meeks@collabora.com>2020-04-10 10:06:23 +0200
commit5710c8632383e92372e1d81b6e26acc975e25ec4 (patch)
treecd9e16cde8584bead5685ed067920ef2045666ba /net
parent050820f45945bcceaffaa3098236b011fbd98772 (diff)
Poll - switch to ppoll for closer to microsecond accuracy.
Change-Id: Ib8a2bb6f60302df8631edadbbb8db626894c457c Reviewed-on: https://gerrit.libreoffice.org/c/online/+/92000 Tested-by: Jenkins CollaboraOffice <jenkinscollaboraoffice@gmail.com> Reviewed-by: Michael Meeks <michael.meeks@collabora.com>
Diffstat (limited to 'net')
-rw-r--r--net/DelaySocket.cpp14
-rw-r--r--net/ServerSocket.hpp4
-rw-r--r--net/Socket.cpp140
-rw-r--r--net/Socket.hpp142
-rw-r--r--net/SslSocket.hpp6
-rw-r--r--net/WebSocketHandler.hpp24
6 files changed, 172 insertions, 158 deletions
diff --git a/net/DelaySocket.cpp b/net/DelaySocket.cpp
index cfd8b7e54..b07b46cc4 100644
--- a/net/DelaySocket.cpp
+++ b/net/DelaySocket.cpp
@@ -77,17 +77,17 @@ public:
// FIXME - really need to propagate 'noDelay' etc.
// have a debug only lookup of delayed sockets for this case ?
- int getPollEvents(std::chrono::steady_clock::time_point now,
- int &timeoutMaxMs) override
+ int pgetPollEvents(std::chrono::steady_clock::time_point now,
+ int64_t &timeoutMaxMicroS) override
{
if (_chunks.size() > 0)
{
- int remainingMs = std::chrono::duration_cast<std::chrono::milliseconds>(
+ int64_t remainingMicroS = std::chrono::duration_cast<std::chrono::microseconds>(
(*_chunks.begin())->getSendTime() - now).count();
- if (remainingMs < timeoutMaxMs)
- DELAY_LOG("#" << getFD() << " reset timeout max to " << remainingMs
- << "ms from " << timeoutMaxMs << "ms\n");
- timeoutMaxMs = std::min(timeoutMaxMs, remainingMs);
+ if (remainingMicroS < timeoutMaxMicroS)
+ DELAY_LOG("#" << getFD() << " reset timeout max to " << remainingMicroS
+ << "us from " << timeoutMaxMicroS << "us\n");
+ timeoutMaxMicroS = std::min(timeoutMaxMicroS, remainingMicroS);
}
if (_chunks.size() > 0 &&
diff --git a/net/ServerSocket.hpp b/net/ServerSocket.hpp
index 1017aeb5e..65f826913 100644
--- a/net/ServerSocket.hpp
+++ b/net/ServerSocket.hpp
@@ -65,8 +65,8 @@ public:
/// Returns a valid Socket shared_ptr on success only.
virtual std::shared_ptr<Socket> accept();
- int getPollEvents(std::chrono::steady_clock::time_point /* now */,
- int & /* timeoutMaxMs */) override
+ int pgetPollEvents(std::chrono::steady_clock::time_point /* now */,
+ int64_t & /* timeoutMaxMicroS */) override
{
return POLLIN;
}
diff --git a/net/Socket.cpp b/net/Socket.cpp
index 5bb1fa250..47ae1ad41 100644
--- a/net/Socket.cpp
+++ b/net/Socket.cpp
@@ -35,7 +35,7 @@
#endif
#include "WebSocketHandler.hpp"
-int SocketPoll::DefaultPollTimeoutMs = 5000;
+int SocketPoll::DefaultPollTimeoutMicroS = 5000 * 1000;
std::atomic<bool> SocketPoll::InhibitThreadChecks(false);
std::atomic<bool> Socket::InhibitThreadChecks(false);
@@ -194,6 +194,136 @@ void SocketPoll::pollingThreadEntry()
LOG_INF("Finished polling thread [" << _name << "].");
}
+int SocketPoll::ppoll(int64_t timeoutMaxMicroS)
+{
+ if (_runOnClientThread)
+ checkAndReThread();
+ else
+ assertCorrectThread();
+
+ std::chrono::steady_clock::time_point now =
+ std::chrono::steady_clock::now();
+
+ // The events to poll on change each spin of the loop.
+ psetupPollFds(now, timeoutMaxMicroS);
+ const size_t size = _pollSockets.size();
+
+ int rc;
+ do
+ {
+#if !MOBILEAPP
+# if HAVE_PPOLL
+ LOG_TRC("ppoll start, timeoutMicroS: " << timeoutMaxMicroS << " size " << size);
+ timeoutMaxMicroS = std::max(timeoutMaxMicroS, (int64_t)0);
+ struct timespec timeout;
+ timeout.tv_sec = timeoutMaxMicroS / (1000 * 1000);
+ timeout.tv_nsec = (timeoutMaxMicroS % (1000 * 1000)) * 1000;
+ rc = ::ppoll(&_pollFds[0], size + 1, &timeout, nullptr);
+ LOG_TRC("ppoll result " << rc << " errno " << strerror(errno));
+# else
+ int timeoutMaxMs = (timeoutMaxMicroS + 9999) / 1000;
+ LOG_TRC("Legacy Poll start, timeoutMs: " << timeoutMaxMs);
+ rc = ::poll(&_pollFds[0], size + 1, std::max(timeoutMaxMs,0));
+# endif
+#else
+ LOG_TRC("SocketPoll Poll");
+ int timeoutMaxMs = (timeoutMaxMicroS + 9999) / 1000;
+ rc = fakeSocketPoll(&_pollFds[0], size + 1, std::max(timeoutMaxMs,0));
+#endif
+ }
+ while (rc < 0 && errno == EINTR);
+ LOG_TRC("Poll completed with " << rc << " live polls max (" <<
+ timeoutMaxMicroS << "us)" << ((rc==0) ? "(timedout)" : ""));
+
+ // First process the wakeup pipe (always the last entry).
+ if (_pollFds[size].revents)
+ {
+ std::vector<CallbackFn> invoke;
+ {
+ std::lock_guard<std::mutex> lock(_mutex);
+
+ // Clear the data.
+#if !MOBILEAPP
+ int dump = ::read(_wakeup[0], &dump, sizeof(dump));
+#else
+ LOG_TRC("Wakeup pipe read");
+ int dump = fakeSocketRead(_wakeup[0], &dump, sizeof(dump));
+#endif
+ // Copy the new sockets over and clear.
+ _pollSockets.insert(_pollSockets.end(),
+ _newSockets.begin(), _newSockets.end());
+
+ // Update thread ownership.
+ for (auto &i : _newSockets)
+ i->setThreadOwner(std::this_thread::get_id());
+
+ _newSockets.clear();
+
+ // Extract list of callbacks to process
+ std::swap(_newCallbacks, invoke);
+ }
+
+ for (const auto& callback : invoke)
+ {
+ try
+ {
+ callback();
+ }
+ catch (const std::exception& exc)
+ {
+ LOG_ERR("Exception while invoking poll [" << _name <<
+ "] callback: " << exc.what());
+ }
+ }
+
+ try
+ {
+ wakeupHook();
+ }
+ catch (const std::exception& exc)
+ {
+ LOG_ERR("Exception while invoking poll [" << _name <<
+ "] wakeup hook: " << exc.what());
+ }
+ }
+
+ // This should only happen when we're stopping.
+ if (_pollSockets.size() != size)
+ return rc;
+
+ // Fire the poll callbacks and remove dead fds.
+ std::chrono::steady_clock::time_point newNow =
+ std::chrono::steady_clock::now();
+
+ for (int i = static_cast<int>(size) - 1; i >= 0; --i)
+ {
+ SocketDisposition disposition(_pollSockets[i]);
+ try
+ {
+ _pollSockets[i]->handlePoll(disposition, newNow,
+ _pollFds[i].revents);
+ }
+ catch (const std::exception& exc)
+ {
+ LOG_ERR("Error while handling poll for socket #" <<
+ _pollFds[i].fd << " in " << _name << ": " << exc.what());
+ disposition.setClosed();
+ rc = -1;
+ }
+
+ if (disposition.isMove() || disposition.isClosed())
+ {
+ LOG_DBG("Removing socket #" << _pollFds[i].fd << " (of " <<
+ _pollSockets.size() << ") from " << _name);
+ _pollSockets.erase(_pollSockets.begin() + i);
+ }
+
+ disposition.execute();
+ }
+
+ return rc;
+}
+
void SocketPoll::wakeupWorld()
{
for (const auto& fd : getWakeupsArray())
@@ -384,8 +514,8 @@ void SocketDisposition::execute()
_socketMove = nullptr;
}
-const int WebSocketHandler::InitialPingDelayMs = 25;
-const int WebSocketHandler::PingFrequencyMs = 18 * 1000;
+const int WebSocketHandler::InitialPingDelayMicroS = 25 * 1000;
+const int WebSocketHandler::PingFrequencyMicroS = 18 * 1000 * 1000;
void WebSocketHandler::dumpState(std::ostream& os)
{
@@ -398,8 +528,8 @@ void WebSocketHandler::dumpState(std::ostream& os)
void StreamSocket::dumpState(std::ostream& os)
{
- int timeoutMaxMs = SocketPoll::DefaultPollTimeoutMs;
- int events = getPollEvents(std::chrono::steady_clock::now(), timeoutMaxMs);
+ int64_t timeoutMaxMicroS = SocketPoll::DefaultPollTimeoutMicroS;
+ int events = pgetPollEvents(std::chrono::steady_clock::now(), timeoutMaxMicroS);
os << "\t" << getFD() << "\t" << events << "\t"
<< _inBuffer.size() << "\t" << _outBuffer.size() << "\t"
<< " r: " << _bytesRecvd << "\t w: " << _bytesSent << "\t"
diff --git a/net/Socket.hpp b/net/Socket.hpp
index ab56e5d10..852012424 100644
--- a/net/Socket.hpp
+++ b/net/Socket.hpp
@@ -154,8 +154,8 @@ public:
/// Prepare our poll record; adjust @timeoutMaxMs downwards
/// for timeouts, based on current time @now.
/// @returns POLLIN and POLLOUT if output is expected.
- virtual int getPollEvents(std::chrono::steady_clock::time_point now,
- int &timeoutMaxMs) = 0;
+ virtual int pgetPollEvents(std::chrono::steady_clock::time_point now,
+ int64_t &timeoutMaxMicroS) = 0;
/// Handle results of events returned from poll
virtual void handlePoll(SocketDisposition &disposition,
@@ -370,8 +370,8 @@ public:
/// Prepare our poll record; adjust @timeoutMaxMs downwards
/// for timeouts, based on current time @now.
/// @returns POLLIN and POLLOUT if output is expected.
- virtual int getPollEvents(std::chrono::steady_clock::time_point now,
- int &timeoutMaxMs) = 0;
+ virtual int pgetPollEvents(std::chrono::steady_clock::time_point now,
+ int64_t &timeoutMaxMicroS) = 0;
/// Do we need to handle a timeout ?
virtual void checkTimeout(std::chrono::steady_clock::time_point /* now */) {}
@@ -476,7 +476,7 @@ public:
~SocketPoll();
/// Default poll time - useful to increase for debugging.
- static int DefaultPollTimeoutMs;
+ static int DefaultPollTimeoutMicroS;
static std::atomic<bool> InhibitThreadChecks;
/// Stop the polling thread.
@@ -533,7 +533,7 @@ public:
{
while (continuePolling())
{
- poll(DefaultPollTimeoutMs);
+ ppoll(DefaultPollTimeoutMicroS);
}
}
@@ -576,123 +576,7 @@ public:
/// Poll the sockets for available data to read or buffer to write.
/// Returns the return-value of poll(2): 0 on timeout,
/// -1 for error, and otherwise the number of events signalled.
- int poll(int timeoutMaxMs)
- {
- if (_runOnClientThread)
- checkAndReThread();
- else
- assertCorrectThread();
-
- std::chrono::steady_clock::time_point now =
- std::chrono::steady_clock::now();
-
- // The events to poll on change each spin of the loop.
- setupPollFds(now, timeoutMaxMs);
- const size_t size = _pollSockets.size();
-
- int rc;
- do
- {
-#if !MOBILEAPP
- LOG_TRC("Poll start, timeoutMs: " << timeoutMaxMs);
- rc = ::poll(&_pollFds[0], size + 1, std::max(timeoutMaxMs,0));
-#else
- LOG_TRC("SocketPoll Poll");
- rc = fakeSocketPoll(&_pollFds[0], size + 1, std::max(timeoutMaxMs,0));
-#endif
- }
- while (rc < 0 && errno == EINTR);
- LOG_TRC("Poll completed with " << rc << " live polls max (" <<
- timeoutMaxMs << "ms)" << ((rc==0) ? "(timedout)" : ""));
-
- // First process the wakeup pipe (always the last entry).
- if (_pollFds[size].revents)
- {
- std::vector<CallbackFn> invoke;
- {
- std::lock_guard<std::mutex> lock(_mutex);
-
- // Clear the data.
-#if !MOBILEAPP
- int dump = ::read(_wakeup[0], &dump, sizeof(dump));
-#else
- LOG_TRC("Wakeup pipe read");
- int dump = fakeSocketRead(_wakeup[0], &dump, sizeof(dump));
-#endif
- // Copy the new sockets over and clear.
- _pollSockets.insert(_pollSockets.end(),
- _newSockets.begin(), _newSockets.end());
-
- // Update thread ownership.
- for (auto &i : _newSockets)
- i->setThreadOwner(std::this_thread::get_id());
-
- _newSockets.clear();
-
- // Extract list of callbacks to process
- std::swap(_newCallbacks, invoke);
- }
-
- for (const auto& callback : invoke)
- {
- try
- {
- callback();
- }
- catch (const std::exception& exc)
- {
- LOG_ERR("Exception while invoking poll [" << _name <<
- "] callback: " << exc.what());
- }
- }
-
- try
- {
- wakeupHook();
- }
- catch (const std::exception& exc)
- {
- LOG_ERR("Exception while invoking poll [" << _name <<
- "] wakeup hook: " << exc.what());
- }
- }
-
- // This should only happen when we're stopping.
- if (_pollSockets.size() != size)
- return rc;
-
- // Fire the poll callbacks and remove dead fds.
- std::chrono::steady_clock::time_point newNow =
- std::chrono::steady_clock::now();
-
- for (int i = static_cast<int>(size) - 1; i >= 0; --i)
- {
- SocketDisposition disposition(_pollSockets[i]);
- try
- {
- _pollSockets[i]->handlePoll(disposition, newNow,
- _pollFds[i].revents);
- }
- catch (const std::exception& exc)
- {
- LOG_ERR("Error while handling poll for socket #" <<
- _pollFds[i].fd << " in " << _name << ": " << exc.what());
- disposition.setClosed();
- rc = -1;
- }
-
- if (disposition.isMove() || disposition.isClosed())
- {
- LOG_DBG("Removing socket #" << _pollFds[i].fd << " (of " <<
- _pollSockets.size() << ") from " << _name);
- _pollSockets.erase(_pollSockets.begin() + i);
- }
-
- disposition.execute();
- }
-
- return rc;
- }
+ int ppoll(int64_t timeoutMaxMicroS);
/// Write to a wakeup descriptor
static void wakeup (int fd)
@@ -811,8 +695,8 @@ private:
const std::string &pathAndQuery);
/// Initialize the poll fds array with the right events
- void setupPollFds(std::chrono::steady_clock::time_point now,
- int &timeoutMaxMs)
+ void psetupPollFds(std::chrono::steady_clock::time_point now,
+ int64_t &timeoutMaxMicroS)
{
const size_t size = _pollSockets.size();
@@ -820,7 +704,7 @@ private:
for (size_t i = 0; i < size; ++i)
{
- int events = _pollSockets[i]->getPollEvents(now, timeoutMaxMs);
+ int events = _pollSockets[i]->pgetPollEvents(now, timeoutMaxMicroS);
assert(events >= 0); // Or > 0 even?
_pollFds[i].fd = _pollSockets[i]->getFD();
_pollFds[i].events = events;
@@ -920,12 +804,12 @@ public:
Socket::shutdown();
}
- int getPollEvents(std::chrono::steady_clock::time_point now,
- int &timeoutMaxMs) override
+ int pgetPollEvents(std::chrono::steady_clock::time_point now,
+ int64_t &timeoutMaxMicroS) override
{
// cf. SslSocket::getPollEvents
assertCorrectThread();
- int events = _socketHandler->getPollEvents(now, timeoutMaxMs);
+ int events = _socketHandler->pgetPollEvents(now, timeoutMaxMicroS);
if (!_outBuffer.empty() || _shutdownSignalled)
events |= POLLOUT;
return events;
diff --git a/net/SslSocket.hpp b/net/SslSocket.hpp
index 27e075328..e6b7f908f 100644
--- a/net/SslSocket.hpp
+++ b/net/SslSocket.hpp
@@ -126,11 +126,11 @@ public:
return handleSslState(SSL_write(_ssl, buf, len));
}
- int getPollEvents(std::chrono::steady_clock::time_point now,
- int & timeoutMaxMs) override
+ int pgetPollEvents(std::chrono::steady_clock::time_point now,
+ int64_t & timeoutMaxMicroS) override
{
assertCorrectThread();
- int events = getSocketHandler()->getPollEvents(now, timeoutMaxMs);
+ int events = getSocketHandler()->pgetPollEvents(now, timeoutMaxMicroS);
if (_sslWantsTo == SslWantsTo::Read)
{
diff --git a/net/WebSocketHandler.hpp b/net/WebSocketHandler.hpp
index b23c3951f..24c3a839a 100644
--- a/net/WebSocketHandler.hpp
+++ b/net/WebSocketHandler.hpp
@@ -48,8 +48,8 @@ protected:
static const unsigned char Mask = 0x80;
};
- static const int InitialPingDelayMs;
- static const int PingFrequencyMs;
+ static const int InitialPingDelayMicroS;
+ static const int PingFrequencyMicroS;
public:
/// Perform upgrade ourselves, or select a client web socket.
@@ -81,8 +81,8 @@ public:
const Poco::Net::HTTPRequest& request)
: _socket(socket)
, _lastPingSentTime(std::chrono::steady_clock::now() -
- std::chrono::milliseconds(PingFrequencyMs) -
- std::chrono::milliseconds(InitialPingDelayMs))
+ std::chrono::microseconds(PingFrequencyMicroS) -
+ std::chrono::microseconds(InitialPingDelayMicroS))
, _pingTimeUs(0)
, _shuttingDown(false)
, _isClient(false)
@@ -430,14 +430,14 @@ public:
}
}
- int getPollEvents(std::chrono::steady_clock::time_point now,
- int & timeoutMaxMs) override
+ int pgetPollEvents(std::chrono::steady_clock::time_point now,
+ int64_t & timeoutMaxMicroS) override
{
if (!_isClient)
{
- const int timeSincePingMs =
- std::chrono::duration_cast<std::chrono::milliseconds>(now - _lastPingSentTime).count();
- timeoutMaxMs = std::min(timeoutMaxMs, PingFrequencyMs - timeSincePingMs);
+ const int64_t timeSincePingMicroS =
+ std::chrono::duration_cast<std::chrono::microseconds>(now - _lastPingSentTime).count();
+ timeoutMaxMicroS = std::min(timeoutMaxMicroS, PingFrequencyMicroS - timeSincePingMicroS);
}
int events = POLLIN;
if (_msgHandler && _msgHandler->hasQueuedMessages())
@@ -493,9 +493,9 @@ private:
if (_isClient)
return;
- const int timeSincePingMs =
- std::chrono::duration_cast<std::chrono::milliseconds>(now - _lastPingSentTime).count();
- if (timeSincePingMs >= PingFrequencyMs)
+ const int64_t timeSincePingMicroS =
+ std::chrono::duration_cast<std::chrono::microseconds>(now - _lastPingSentTime).count();
+ if (timeSincePingMicroS >= PingFrequencyMicroS)
{
const std::shared_ptr<StreamSocket> socket = _socket.lock();
if (socket)