summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
authorMichael Meeks <michael.meeks@collabora.com>2020-03-06 17:43:46 +0000
committerMichael Meeks <michael.meeks@collabora.com>2020-03-11 16:48:03 +0100
commite924625cc1af8736505f363fc525d20a6373bb95 (patch)
tree6775062347a832542949ce4a586a150f8ece1c14 /net
parentfb27d9a79679f6aac5f2199163d5ba3e6421f2e5 (diff)
re-factor: Socket / WebSocketHandler.
Essentially we want to be able to separate low-level socket code for eg. TCP vs. UDS, from Protocol handling: eg. WebSocketHandler and client sessions themselves which handle and send messages which now implement the simple MessageHandlerInterface. Some helpful renaming too: s/SocketHandlerInterface/ProtocolHandlerInterface/ Change-Id: I58092b5e0b5792fda47498fb2c875851eada461d Reviewed-on: https://gerrit.libreoffice.org/c/online/+/90138 Tested-by: Jenkins CollaboraOffice <jenkinscollaboraoffice@gmail.com> Reviewed-by: Michael Meeks <michael.meeks@collabora.com>
Diffstat (limited to 'net')
-rw-r--r--net/Socket.cpp8
-rw-r--r--net/Socket.hpp109
-rw-r--r--net/SslSocket.hpp2
-rw-r--r--net/WebSocketHandler.hpp63
4 files changed, 153 insertions, 29 deletions
diff --git a/net/Socket.cpp b/net/Socket.cpp
index cbe5a0a52..5bb1fa250 100644
--- a/net/Socket.cpp
+++ b/net/Socket.cpp
@@ -204,7 +204,7 @@ void SocketPoll::wakeupWorld()
void SocketPoll::insertNewWebSocketSync(
const Poco::URI &uri,
- const std::shared_ptr<SocketHandlerInterface>& websocketHandler)
+ const std::shared_ptr<ProtocolHandlerInterface>& websocketHandler)
{
LOG_INF("Connecting to " << uri.getHost() << " : " << uri.getPort() << " : " << uri.getPath());
@@ -277,7 +277,7 @@ void SocketPoll::insertNewWebSocketSync(
// should this be a static method in the WebsocketHandler(?)
void SocketPoll::clientRequestWebsocketUpgrade(const std::shared_ptr<StreamSocket>& socket,
- const std::shared_ptr<SocketHandlerInterface>& websocketHandler,
+ const std::shared_ptr<ProtocolHandlerInterface>& websocketHandler,
const std::string &pathAndQuery)
{
// cf. WebSocketHandler::upgradeToWebSocket (?)
@@ -304,7 +304,7 @@ void SocketPoll::clientRequestWebsocketUpgrade(const std::shared_ptr<StreamSocke
void SocketPoll::insertNewUnixSocket(
const std::string &location,
const std::string &pathAndQuery,
- const std::shared_ptr<SocketHandlerInterface>& websocketHandler)
+ const std::shared_ptr<ProtocolHandlerInterface>& websocketHandler)
{
int fd = socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0);
@@ -337,7 +337,7 @@ void SocketPoll::insertNewUnixSocket(
void SocketPoll::insertNewFakeSocket(
int peerSocket,
- const std::shared_ptr<SocketHandlerInterface>& websocketHandler)
+ const std::shared_ptr<ProtocolHandlerInterface>& websocketHandler)
{
LOG_INF("Connecting to " << peerSocket);
int fd = fakeSocketSocket();
diff --git a/net/Socket.hpp b/net/Socket.hpp
index c95b93dd7..99fdf259a 100644
--- a/net/Socket.hpp
+++ b/net/Socket.hpp
@@ -344,12 +344,21 @@ private:
};
class StreamSocket;
+class MessageHandlerInterface;
-/// Interface that handles the actual incoming message.
-class SocketHandlerInterface
+/// Interface that decodes the actual incoming message.
+class ProtocolHandlerInterface :
+ public std::enable_shared_from_this<ProtocolHandlerInterface>
{
+protected:
+ /// We own a message handler, after decoding the socket data we pass it on as messages.
+ std::shared_ptr<MessageHandlerInterface> _msgHandler;
public:
- virtual ~SocketHandlerInterface() {}
+ // ------------------------------------------------------------------
+ // Interface for implementing low level socket goodness from streams.
+ // ------------------------------------------------------------------
+ virtual ~ProtocolHandlerInterface() { }
+
/// Called when the socket is newly created to
/// set the socket associated with this ResponseClient.
/// Will be called exactly once.
@@ -374,10 +383,81 @@ public:
/// Will be called exactly once.
virtual void onDisconnect() {}
+ // -----------------------------------------------------------------
+ // Interface for external MessageHandlers
+ // -----------------------------------------------------------------
+public:
+ void setMessageHandler(const std::shared_ptr<MessageHandlerInterface> &msgHandler)
+ {
+ _msgHandler = msgHandler;
+ }
+
+ /// Clear all external references
+ virtual void dispose() { _msgHandler.reset(); }
+
+ virtual int sendTextMessage(const std::string &msg, const size_t len, bool flush = false) const = 0;
+ virtual int sendBinaryMessage(const char *data, const size_t len, bool flush = false) const = 0;
+ virtual void shutdown(bool goingAway = false, const std::string &statusMessage = "") = 0;
+
+ virtual void getIOStats(uint64_t &sent, uint64_t &recv) = 0;
+
/// Append pretty printed internal state to a line
virtual void dumpState(std::ostream& os) { os << "\n"; }
};
+/// A ProtocolHandlerInterface with dummy sending API.
+class SimpleSocketHandler : public ProtocolHandlerInterface
+{
+public:
+ SimpleSocketHandler() {}
+ int sendTextMessage(const std::string &, const size_t, bool) const override { return 0; }
+ int sendBinaryMessage(const char *, const size_t , bool ) const override { return 0; }
+ void shutdown(bool, const std::string &) override {}
+ void getIOStats(uint64_t &, uint64_t &) override {}
+};
+
+/// Interface that receives and sends incoming messages.
+class MessageHandlerInterface :
+ public std::enable_shared_from_this<MessageHandlerInterface>
+{
+protected:
+ std::shared_ptr<ProtocolHandlerInterface> _protocol;
+ MessageHandlerInterface(const std::shared_ptr<ProtocolHandlerInterface> &protocol) :
+ _protocol(protocol)
+ {
+ }
+ virtual ~MessageHandlerInterface() {}
+
+public:
+ /// Setup, after construction for shared_from_this
+ void initialize()
+ {
+ if (_protocol)
+ _protocol->setMessageHandler(shared_from_this());
+ }
+
+ /// Clear all external references
+ virtual void dispose()
+ {
+ if (_protocol)
+ {
+ _protocol->dispose();
+ _protocol.reset();
+ }
+ }
+
+ /// Do we have something to send ?
+ virtual bool hasQueuedMessages() const = 0;
+ /// Please send them to me then.
+ virtual void writeQueuedMessages() = 0;
+ /// We just got a message - here it is
+ virtual void handleMessage(const std::vector<char> &data) = 0;
+ /// Get notified that the underlying transports disconnected
+ virtual void onDisconnect() = 0;
+ /// Append pretty printed internal state to a line
+ virtual void dumpState(std::ostream& os) = 0;
+};
+
/// Handles non-blocking socket event polling.
/// Only polls on N-Sockets and invokes callback and
/// doesn't manage buffers or client data.
@@ -672,16 +752,16 @@ public:
/// Inserts a new remote websocket to be polled.
/// NOTE: The DNS lookup is synchronous.
void insertNewWebSocketSync(const Poco::URI &uri,
- const std::shared_ptr<SocketHandlerInterface>& websocketHandler);
+ const std::shared_ptr<ProtocolHandlerInterface>& websocketHandler);
void insertNewUnixSocket(
const std::string &location,
const std::string &pathAndQuery,
- const std::shared_ptr<SocketHandlerInterface>& websocketHandler);
+ const std::shared_ptr<ProtocolHandlerInterface>& websocketHandler);
#else
void insertNewFakeSocket(
int peerSocket,
- const std::shared_ptr<SocketHandlerInterface>& websocketHandler);
+ const std::shared_ptr<ProtocolHandlerInterface>& websocketHandler);
#endif
typedef std::function<void()> CallbackFn;
@@ -736,7 +816,7 @@ protected:
private:
/// Generate the request to connect & upgrade this socket to a given path
void clientRequestWebsocketUpgrade(const std::shared_ptr<StreamSocket>& socket,
- const std::shared_ptr<SocketHandlerInterface>& websocketHandler,
+ const std::shared_ptr<ProtocolHandlerInterface>& websocketHandler,
const std::string &pathAndQuery);
/// Initialize the poll fds array with the right events
@@ -791,12 +871,13 @@ private:
};
/// A plain, non-blocking, data streaming socket.
-class StreamSocket : public Socket, public std::enable_shared_from_this<StreamSocket>
+class StreamSocket : public Socket,
+ public std::enable_shared_from_this<StreamSocket>
{
public:
/// Create a StreamSocket from native FD.
StreamSocket(const int fd, bool /* isClient */,
- std::shared_ptr<SocketHandlerInterface> socketHandler) :
+ std::shared_ptr<ProtocolHandlerInterface> socketHandler) :
Socket(fd),
_socketHandler(std::move(socketHandler)),
_bytesSent(0),
@@ -933,7 +1014,7 @@ public:
}
/// Replace the existing SocketHandler with a new one.
- void setHandler(std::shared_ptr<SocketHandlerInterface> handler)
+ void setHandler(std::shared_ptr<ProtocolHandlerInterface> handler)
{
_socketHandler = std::move(handler);
_socketHandler->onConnect(shared_from_this());
@@ -944,9 +1025,9 @@ public:
/// but we can't have a shared_ptr in the ctor.
template <typename TSocket>
static
- std::shared_ptr<TSocket> create(const int fd, bool isClient, std::shared_ptr<SocketHandlerInterface> handler)
+ std::shared_ptr<TSocket> create(const int fd, bool isClient, std::shared_ptr<ProtocolHandlerInterface> handler)
{
- SocketHandlerInterface* pHandler = handler.get();
+ ProtocolHandlerInterface* pHandler = handler.get();
auto socket = std::make_shared<TSocket>(fd, isClient, std::move(handler));
pHandler->onConnect(socket);
return socket;
@@ -1157,14 +1238,14 @@ protected:
return _shutdownSignalled;
}
- const std::shared_ptr<SocketHandlerInterface>& getSocketHandler() const
+ const std::shared_ptr<ProtocolHandlerInterface>& getSocketHandler() const
{
return _socketHandler;
}
private:
/// Client handling the actual data.
- std::shared_ptr<SocketHandlerInterface> _socketHandler;
+ std::shared_ptr<ProtocolHandlerInterface> _socketHandler;
std::vector<char> _inBuffer;
std::vector<char> _outBuffer;
diff --git a/net/SslSocket.hpp b/net/SslSocket.hpp
index ba9954f56..27e075328 100644
--- a/net/SslSocket.hpp
+++ b/net/SslSocket.hpp
@@ -20,7 +20,7 @@ class SslStreamSocket final : public StreamSocket
{
public:
SslStreamSocket(const int fd, bool isClient,
- std::shared_ptr<SocketHandlerInterface> responseClient) :
+ std::shared_ptr<ProtocolHandlerInterface> responseClient) :
StreamSocket(fd, isClient, std::move(responseClient)),
_bio(nullptr),
_ssl(nullptr),
diff --git a/net/WebSocketHandler.hpp b/net/WebSocketHandler.hpp
index 130f81b69..1c2977602 100644
--- a/net/WebSocketHandler.hpp
+++ b/net/WebSocketHandler.hpp
@@ -24,7 +24,7 @@
#include <Poco/Net/HTTPResponse.h>
#include <Poco/Net/WebSocket.h>
-class WebSocketHandler : public SocketHandlerInterface
+class WebSocketHandler : public ProtocolHandlerInterface
{
private:
/// The socket that owns us (we can't own it).
@@ -94,7 +94,7 @@ public:
upgradeToWebSocket(request);
}
- /// Implementation of the SocketHandlerInterface.
+ /// Implementation of the ProtocolHandlerInterface.
void onConnect(const std::shared_ptr<StreamSocket>& socket) override
{
_socket = socket;
@@ -146,6 +146,24 @@ public:
#endif
}
+ void shutdown(bool goingAway, const std::string &statusMessage) override
+ {
+ shutdown(goingAway ? WebSocketHandler::StatusCodes::ENDPOINT_GOING_AWAY :
+ WebSocketHandler::StatusCodes::NORMAL_CLOSE, statusMessage);
+ }
+
+ void getIOStats(uint64_t &sent, uint64_t &recv) override
+ {
+ std::shared_ptr<StreamSocket> socket = getSocket().lock();
+ if (socket)
+ socket->getIOStats(sent, recv);
+ else
+ {
+ sent = 0;
+ recv = 0;
+ }
+ }
+
void shutdown(const StatusCodes statusCode = StatusCodes::NORMAL_CLOSE, const std::string& statusMessage = "")
{
if (!_shuttingDown)
@@ -384,7 +402,7 @@ public:
return true;
}
- /// Implementation of the SocketHandlerInterface.
+ /// Implementation of the ProtocolHandlerInterface.
virtual void handleIncomingMessage(SocketDisposition&) override
{
// LOG_TRC("***** WebSocketHandler::handleIncomingMessage()");
@@ -421,7 +439,10 @@ public:
std::chrono::duration_cast<std::chrono::milliseconds>(now - _lastPingSentTime).count();
timeoutMaxMs = std::min(timeoutMaxMs, PingFrequencyMs - timeSincePingMs);
}
- return POLLIN;
+ int events = POLLIN;
+ if (_msgHandler && _msgHandler->hasQueuedMessages())
+ events |= POLLOUT;
+ return events;
}
#if !MOBILEAPP
@@ -483,13 +504,34 @@ private:
#endif
}
public:
- /// By default rely on the socket buffer.
- void performWrites() override {}
+ void performWrites() override
+ {
+ if (_msgHandler)
+ _msgHandler->writeQueuedMessages();
+ }
+
+ void onDisconnect() override
+ {
+ if (_msgHandler)
+ _msgHandler->onDisconnect();
+ }
/// Sends a WebSocket Text message.
int sendMessage(const std::string& msg) const
{
- return sendMessage(msg.data(), msg.size(), WSOpCode::Text);
+ return sendTextMessage(msg, msg.size());
+ }
+
+ /// Implementation of the ProtocolHandlerInterface.
+ int sendTextMessage(const std::string &msg, const size_t len, bool flush = false) const override
+ {
+ return sendMessage(msg.data(), len, WSOpCode::Text, flush);
+ }
+
+ /// Implementation of the ProtocolHandlerInterface.
+ int sendBinaryMessage(const char *data, const size_t len, bool flush = false) const override
+ {
+ return sendMessage(data, len, WSOpCode::Binary, flush);
}
/// Sends a WebSocket message of WPOpCode type.
@@ -506,9 +548,7 @@ public:
std::shared_ptr<StreamSocket> socket = _socket.lock();
return sendFrame(socket, data, len, WSFrameMask::Fin | static_cast<unsigned char>(code), flush);
}
-
private:
-
/// Sends a WebSocket frame given the data, length, and flags.
/// Returns the number of bytes written (including frame overhead) on success,
/// 0 for closed/invalid socket, and -1 for other errors.
@@ -615,8 +655,10 @@ protected:
}
/// To be overriden to handle the websocket messages the way you need.
- virtual void handleMessage(const std::vector<char> &/*data*/)
+ virtual void handleMessage(const std::vector<char> &data)
{
+ if (_msgHandler)
+ _msgHandler->handleMessage(data);
}
std::weak_ptr<StreamSocket>& getSocket()
@@ -629,6 +671,7 @@ protected:
_socket = socket;
}
+ /// Implementation of the ProtocolHandlerInterface.
void dumpState(std::ostream& os) override;
private: