diff options
author | Michael Meeks <michael.meeks@collabora.com> | 2020-03-06 17:43:46 +0000 |
---|---|---|
committer | Michael Meeks <michael.meeks@collabora.com> | 2020-03-11 16:48:03 +0100 |
commit | e924625cc1af8736505f363fc525d20a6373bb95 (patch) | |
tree | 6775062347a832542949ce4a586a150f8ece1c14 /net | |
parent | fb27d9a79679f6aac5f2199163d5ba3e6421f2e5 (diff) |
re-factor: Socket / WebSocketHandler.
Essentially we want to be able to separate low-level socket code
for eg. TCP vs. UDS, from Protocol handling: eg. WebSocketHandler
and client sessions themselves which handle and send messages
which now implement the simple MessageHandlerInterface.
Some helpful renaming too:
s/SocketHandlerInterface/ProtocolHandlerInterface/
Change-Id: I58092b5e0b5792fda47498fb2c875851eada461d
Reviewed-on: https://gerrit.libreoffice.org/c/online/+/90138
Tested-by: Jenkins CollaboraOffice <jenkinscollaboraoffice@gmail.com>
Reviewed-by: Michael Meeks <michael.meeks@collabora.com>
Diffstat (limited to 'net')
-rw-r--r-- | net/Socket.cpp | 8 | ||||
-rw-r--r-- | net/Socket.hpp | 109 | ||||
-rw-r--r-- | net/SslSocket.hpp | 2 | ||||
-rw-r--r-- | net/WebSocketHandler.hpp | 63 |
4 files changed, 153 insertions, 29 deletions
diff --git a/net/Socket.cpp b/net/Socket.cpp index cbe5a0a52..5bb1fa250 100644 --- a/net/Socket.cpp +++ b/net/Socket.cpp @@ -204,7 +204,7 @@ void SocketPoll::wakeupWorld() void SocketPoll::insertNewWebSocketSync( const Poco::URI &uri, - const std::shared_ptr<SocketHandlerInterface>& websocketHandler) + const std::shared_ptr<ProtocolHandlerInterface>& websocketHandler) { LOG_INF("Connecting to " << uri.getHost() << " : " << uri.getPort() << " : " << uri.getPath()); @@ -277,7 +277,7 @@ void SocketPoll::insertNewWebSocketSync( // should this be a static method in the WebsocketHandler(?) void SocketPoll::clientRequestWebsocketUpgrade(const std::shared_ptr<StreamSocket>& socket, - const std::shared_ptr<SocketHandlerInterface>& websocketHandler, + const std::shared_ptr<ProtocolHandlerInterface>& websocketHandler, const std::string &pathAndQuery) { // cf. WebSocketHandler::upgradeToWebSocket (?) @@ -304,7 +304,7 @@ void SocketPoll::clientRequestWebsocketUpgrade(const std::shared_ptr<StreamSocke void SocketPoll::insertNewUnixSocket( const std::string &location, const std::string &pathAndQuery, - const std::shared_ptr<SocketHandlerInterface>& websocketHandler) + const std::shared_ptr<ProtocolHandlerInterface>& websocketHandler) { int fd = socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0); @@ -337,7 +337,7 @@ void SocketPoll::insertNewUnixSocket( void SocketPoll::insertNewFakeSocket( int peerSocket, - const std::shared_ptr<SocketHandlerInterface>& websocketHandler) + const std::shared_ptr<ProtocolHandlerInterface>& websocketHandler) { LOG_INF("Connecting to " << peerSocket); int fd = fakeSocketSocket(); diff --git a/net/Socket.hpp b/net/Socket.hpp index c95b93dd7..99fdf259a 100644 --- a/net/Socket.hpp +++ b/net/Socket.hpp @@ -344,12 +344,21 @@ private: }; class StreamSocket; +class MessageHandlerInterface; -/// Interface that handles the actual incoming message. -class SocketHandlerInterface +/// Interface that decodes the actual incoming message. +class ProtocolHandlerInterface : + public std::enable_shared_from_this<ProtocolHandlerInterface> { +protected: + /// We own a message handler, after decoding the socket data we pass it on as messages. + std::shared_ptr<MessageHandlerInterface> _msgHandler; public: - virtual ~SocketHandlerInterface() {} + // ------------------------------------------------------------------ + // Interface for implementing low level socket goodness from streams. + // ------------------------------------------------------------------ + virtual ~ProtocolHandlerInterface() { } + /// Called when the socket is newly created to /// set the socket associated with this ResponseClient. /// Will be called exactly once. @@ -374,10 +383,81 @@ public: /// Will be called exactly once. virtual void onDisconnect() {} + // ----------------------------------------------------------------- + // Interface for external MessageHandlers + // ----------------------------------------------------------------- +public: + void setMessageHandler(const std::shared_ptr<MessageHandlerInterface> &msgHandler) + { + _msgHandler = msgHandler; + } + + /// Clear all external references + virtual void dispose() { _msgHandler.reset(); } + + virtual int sendTextMessage(const std::string &msg, const size_t len, bool flush = false) const = 0; + virtual int sendBinaryMessage(const char *data, const size_t len, bool flush = false) const = 0; + virtual void shutdown(bool goingAway = false, const std::string &statusMessage = "") = 0; + + virtual void getIOStats(uint64_t &sent, uint64_t &recv) = 0; + /// Append pretty printed internal state to a line virtual void dumpState(std::ostream& os) { os << "\n"; } }; +/// A ProtocolHandlerInterface with dummy sending API. +class SimpleSocketHandler : public ProtocolHandlerInterface +{ +public: + SimpleSocketHandler() {} + int sendTextMessage(const std::string &, const size_t, bool) const override { return 0; } + int sendBinaryMessage(const char *, const size_t , bool ) const override { return 0; } + void shutdown(bool, const std::string &) override {} + void getIOStats(uint64_t &, uint64_t &) override {} +}; + +/// Interface that receives and sends incoming messages. +class MessageHandlerInterface : + public std::enable_shared_from_this<MessageHandlerInterface> +{ +protected: + std::shared_ptr<ProtocolHandlerInterface> _protocol; + MessageHandlerInterface(const std::shared_ptr<ProtocolHandlerInterface> &protocol) : + _protocol(protocol) + { + } + virtual ~MessageHandlerInterface() {} + +public: + /// Setup, after construction for shared_from_this + void initialize() + { + if (_protocol) + _protocol->setMessageHandler(shared_from_this()); + } + + /// Clear all external references + virtual void dispose() + { + if (_protocol) + { + _protocol->dispose(); + _protocol.reset(); + } + } + + /// Do we have something to send ? + virtual bool hasQueuedMessages() const = 0; + /// Please send them to me then. + virtual void writeQueuedMessages() = 0; + /// We just got a message - here it is + virtual void handleMessage(const std::vector<char> &data) = 0; + /// Get notified that the underlying transports disconnected + virtual void onDisconnect() = 0; + /// Append pretty printed internal state to a line + virtual void dumpState(std::ostream& os) = 0; +}; + /// Handles non-blocking socket event polling. /// Only polls on N-Sockets and invokes callback and /// doesn't manage buffers or client data. @@ -672,16 +752,16 @@ public: /// Inserts a new remote websocket to be polled. /// NOTE: The DNS lookup is synchronous. void insertNewWebSocketSync(const Poco::URI &uri, - const std::shared_ptr<SocketHandlerInterface>& websocketHandler); + const std::shared_ptr<ProtocolHandlerInterface>& websocketHandler); void insertNewUnixSocket( const std::string &location, const std::string &pathAndQuery, - const std::shared_ptr<SocketHandlerInterface>& websocketHandler); + const std::shared_ptr<ProtocolHandlerInterface>& websocketHandler); #else void insertNewFakeSocket( int peerSocket, - const std::shared_ptr<SocketHandlerInterface>& websocketHandler); + const std::shared_ptr<ProtocolHandlerInterface>& websocketHandler); #endif typedef std::function<void()> CallbackFn; @@ -736,7 +816,7 @@ protected: private: /// Generate the request to connect & upgrade this socket to a given path void clientRequestWebsocketUpgrade(const std::shared_ptr<StreamSocket>& socket, - const std::shared_ptr<SocketHandlerInterface>& websocketHandler, + const std::shared_ptr<ProtocolHandlerInterface>& websocketHandler, const std::string &pathAndQuery); /// Initialize the poll fds array with the right events @@ -791,12 +871,13 @@ private: }; /// A plain, non-blocking, data streaming socket. -class StreamSocket : public Socket, public std::enable_shared_from_this<StreamSocket> +class StreamSocket : public Socket, + public std::enable_shared_from_this<StreamSocket> { public: /// Create a StreamSocket from native FD. StreamSocket(const int fd, bool /* isClient */, - std::shared_ptr<SocketHandlerInterface> socketHandler) : + std::shared_ptr<ProtocolHandlerInterface> socketHandler) : Socket(fd), _socketHandler(std::move(socketHandler)), _bytesSent(0), @@ -933,7 +1014,7 @@ public: } /// Replace the existing SocketHandler with a new one. - void setHandler(std::shared_ptr<SocketHandlerInterface> handler) + void setHandler(std::shared_ptr<ProtocolHandlerInterface> handler) { _socketHandler = std::move(handler); _socketHandler->onConnect(shared_from_this()); @@ -944,9 +1025,9 @@ public: /// but we can't have a shared_ptr in the ctor. template <typename TSocket> static - std::shared_ptr<TSocket> create(const int fd, bool isClient, std::shared_ptr<SocketHandlerInterface> handler) + std::shared_ptr<TSocket> create(const int fd, bool isClient, std::shared_ptr<ProtocolHandlerInterface> handler) { - SocketHandlerInterface* pHandler = handler.get(); + ProtocolHandlerInterface* pHandler = handler.get(); auto socket = std::make_shared<TSocket>(fd, isClient, std::move(handler)); pHandler->onConnect(socket); return socket; @@ -1157,14 +1238,14 @@ protected: return _shutdownSignalled; } - const std::shared_ptr<SocketHandlerInterface>& getSocketHandler() const + const std::shared_ptr<ProtocolHandlerInterface>& getSocketHandler() const { return _socketHandler; } private: /// Client handling the actual data. - std::shared_ptr<SocketHandlerInterface> _socketHandler; + std::shared_ptr<ProtocolHandlerInterface> _socketHandler; std::vector<char> _inBuffer; std::vector<char> _outBuffer; diff --git a/net/SslSocket.hpp b/net/SslSocket.hpp index ba9954f56..27e075328 100644 --- a/net/SslSocket.hpp +++ b/net/SslSocket.hpp @@ -20,7 +20,7 @@ class SslStreamSocket final : public StreamSocket { public: SslStreamSocket(const int fd, bool isClient, - std::shared_ptr<SocketHandlerInterface> responseClient) : + std::shared_ptr<ProtocolHandlerInterface> responseClient) : StreamSocket(fd, isClient, std::move(responseClient)), _bio(nullptr), _ssl(nullptr), diff --git a/net/WebSocketHandler.hpp b/net/WebSocketHandler.hpp index 130f81b69..1c2977602 100644 --- a/net/WebSocketHandler.hpp +++ b/net/WebSocketHandler.hpp @@ -24,7 +24,7 @@ #include <Poco/Net/HTTPResponse.h> #include <Poco/Net/WebSocket.h> -class WebSocketHandler : public SocketHandlerInterface +class WebSocketHandler : public ProtocolHandlerInterface { private: /// The socket that owns us (we can't own it). @@ -94,7 +94,7 @@ public: upgradeToWebSocket(request); } - /// Implementation of the SocketHandlerInterface. + /// Implementation of the ProtocolHandlerInterface. void onConnect(const std::shared_ptr<StreamSocket>& socket) override { _socket = socket; @@ -146,6 +146,24 @@ public: #endif } + void shutdown(bool goingAway, const std::string &statusMessage) override + { + shutdown(goingAway ? WebSocketHandler::StatusCodes::ENDPOINT_GOING_AWAY : + WebSocketHandler::StatusCodes::NORMAL_CLOSE, statusMessage); + } + + void getIOStats(uint64_t &sent, uint64_t &recv) override + { + std::shared_ptr<StreamSocket> socket = getSocket().lock(); + if (socket) + socket->getIOStats(sent, recv); + else + { + sent = 0; + recv = 0; + } + } + void shutdown(const StatusCodes statusCode = StatusCodes::NORMAL_CLOSE, const std::string& statusMessage = "") { if (!_shuttingDown) @@ -384,7 +402,7 @@ public: return true; } - /// Implementation of the SocketHandlerInterface. + /// Implementation of the ProtocolHandlerInterface. virtual void handleIncomingMessage(SocketDisposition&) override { // LOG_TRC("***** WebSocketHandler::handleIncomingMessage()"); @@ -421,7 +439,10 @@ public: std::chrono::duration_cast<std::chrono::milliseconds>(now - _lastPingSentTime).count(); timeoutMaxMs = std::min(timeoutMaxMs, PingFrequencyMs - timeSincePingMs); } - return POLLIN; + int events = POLLIN; + if (_msgHandler && _msgHandler->hasQueuedMessages()) + events |= POLLOUT; + return events; } #if !MOBILEAPP @@ -483,13 +504,34 @@ private: #endif } public: - /// By default rely on the socket buffer. - void performWrites() override {} + void performWrites() override + { + if (_msgHandler) + _msgHandler->writeQueuedMessages(); + } + + void onDisconnect() override + { + if (_msgHandler) + _msgHandler->onDisconnect(); + } /// Sends a WebSocket Text message. int sendMessage(const std::string& msg) const { - return sendMessage(msg.data(), msg.size(), WSOpCode::Text); + return sendTextMessage(msg, msg.size()); + } + + /// Implementation of the ProtocolHandlerInterface. + int sendTextMessage(const std::string &msg, const size_t len, bool flush = false) const override + { + return sendMessage(msg.data(), len, WSOpCode::Text, flush); + } + + /// Implementation of the ProtocolHandlerInterface. + int sendBinaryMessage(const char *data, const size_t len, bool flush = false) const override + { + return sendMessage(data, len, WSOpCode::Binary, flush); } /// Sends a WebSocket message of WPOpCode type. @@ -506,9 +548,7 @@ public: std::shared_ptr<StreamSocket> socket = _socket.lock(); return sendFrame(socket, data, len, WSFrameMask::Fin | static_cast<unsigned char>(code), flush); } - private: - /// Sends a WebSocket frame given the data, length, and flags. /// Returns the number of bytes written (including frame overhead) on success, /// 0 for closed/invalid socket, and -1 for other errors. @@ -615,8 +655,10 @@ protected: } /// To be overriden to handle the websocket messages the way you need. - virtual void handleMessage(const std::vector<char> &/*data*/) + virtual void handleMessage(const std::vector<char> &data) { + if (_msgHandler) + _msgHandler->handleMessage(data); } std::weak_ptr<StreamSocket>& getSocket() @@ -629,6 +671,7 @@ protected: _socket = socket; } + /// Implementation of the ProtocolHandlerInterface. void dumpState(std::ostream& os) override; private: |