diff options
author | Tor Lillqvist <tml@iki.fi> | 2018-09-11 09:30:55 +0300 |
---|---|---|
committer | Tor Lillqvist <tml@iki.fi> | 2018-09-12 18:32:05 +0300 |
commit | b59d160a085796c567e114a62483438ed761c31a (patch) | |
tree | 347d13b6943c92864880620ae4cc58c6a06cd6a1 /net | |
parent | c7a62fb304134d57008e3cc2c29b9178aa4c86fe (diff) |
Intermediate commit of work in progress on an iOS app
The app is unimaginatively called "Mobile" for now.
Runs but crashes pretty quickly after loading the document by the LO
core. Will need some heavy changes to get a ClientSession object
created in there, too, to handle the (emulated) WebSocket messages
from the JavaScript. It would then handle some of these messages
itself, and forwards some to the ChildSession, which in this case is
in the same process. Now the messsages from the JavaScript go to a
ChildSession, which is wrong. As the assertion says, "Tile traffic
should go through the DocumentBroker-LoKit WS"
Diffstat (limited to 'net')
-rw-r--r-- | net/Socket.cpp | 19 | ||||
-rw-r--r-- | net/Socket.hpp | 36 | ||||
-rw-r--r-- | net/WebSocketHandler.hpp | 17 |
3 files changed, 71 insertions, 1 deletions
diff --git a/net/Socket.cpp b/net/Socket.cpp index 38bacdde8..74a0dff16 100644 --- a/net/Socket.cpp +++ b/net/Socket.cpp @@ -74,6 +74,8 @@ SocketPoll::SocketPoll(const std::string& threadName) std::lock_guard<std::mutex> lock(getPollWakeupsMutex()); getWakeupsArray().push_back(_wakeup[1]); +#else + _bufferEmpty = true; #endif } @@ -136,6 +138,21 @@ void SocketPoll::joinThread() #endif +#ifdef MOBILEAPP + +void SocketPoll::feed(const std::string& payload) +{ + std::unique_lock<std::mutex> lock(_bufferMutex); + if (!_bufferEmpty) + _bufferCV.wait(lock, [&]{return _bufferEmpty;}); + _buffer = payload; + _bufferEmpty = false; + lock.unlock(); + _bufferCV.notify_one(); +} + +#endif + void SocketPoll::wakeupWorld() { #ifndef MOBILEAPP @@ -231,6 +248,8 @@ void SocketPoll::insertNewWebSocketSync(const Poco::URI &uri, const std::shared_ } else LOG_ERR("Failed to lookup client websocket host '" << uri.getHost() << "' skipping"); +#else + _socketHandler = websocketHandler; #endif } diff --git a/net/Socket.hpp b/net/Socket.hpp index 2a336f8fc..074734ca9 100644 --- a/net/Socket.hpp +++ b/net/Socket.hpp @@ -31,7 +31,6 @@ #include <mutex> #include <sstream> #include <thread> -#include <atomic> #include "Common.hpp" #include "Log.hpp" @@ -341,6 +340,10 @@ public: /// Append pretty printed internal state to a line virtual void dumpState(std::ostream& os) { os << "\n"; } + + virtual void handleMessage(const std::string& /*message*/) + { + } }; /// Handles non-blocking socket event polling. @@ -528,9 +531,18 @@ public: disposition.execute(); } +#else + std::unique_lock<std::mutex> lock(_bufferMutex); + if (_bufferEmpty) + _bufferCV.wait(lock, [&]{return !_bufferEmpty;}); + _socketHandler->handleMessage(_buffer); + _bufferEmpty = true; + lock.unlock(); + _bufferCV.notify_one(); #endif } +#ifndef MOBILEAPP /// Write to a wakeup descriptor static void wakeup (int fd) { @@ -543,15 +555,18 @@ public: if (rc == -1 && errno != EAGAIN && errno != EWOULDBLOCK) LOG_SYS("wakeup socket #" << fd << " is closed at wakeup?"); } +#endif /// Wakeup the main polling loop in another thread void wakeup() { +#ifndef MOBILEAPP if (!isAlive()) LOG_WRN("Waking up dead poll thread [" << _name << "], started: " << _threadStarted << ", finished: " << _threadFinished); wakeup(_wakeup[1]); +#endif } /// Global wakeup - signal safe: wakeup all socket polls. @@ -618,7 +633,15 @@ public: /// Stop and join the polling thread before returning (if active) void joinThread(); +#ifdef MOBILEAPP + // In the mobile app(s), a SocketPoll doesn't actually "poll" any "sockets" but simply acts as a + // conduit for sending messages that correspond to what we would receive as WebSocket messages + // in the server online from the app code into the shared online code. + void feed(const std::string& payload); +#endif + private: +#ifndef MOBILEAPP /// Initialize the poll fds array with the right events void setupPollFds(std::chrono::steady_clock::time_point now, int &timeoutMaxMs) @@ -648,6 +671,7 @@ private: _pollFds[size].events = POLLIN; _pollFds[size].revents = 0; } +#endif /// The polling thread entry. /// Used to set the thread name and mark the thread as stopped when done. @@ -681,8 +705,10 @@ private: /// Debug name used for logging. const std::string _name; +#ifndef MOBILEAPP /// main-loop wakeup pipe int _wakeup[2]; +#endif /// The sockets we're controlling std::vector<std::shared_ptr<Socket>> _pollSockets; /// Protects _newSockets @@ -700,6 +726,14 @@ protected: std::atomic<bool> _threadStarted; std::atomic<bool> _threadFinished; std::thread::id _owner; + +#ifdef MOBILEAPP + std::mutex _bufferMutex; + std::condition_variable _bufferCV; + bool _bufferEmpty; + std::string _buffer; + std::shared_ptr<SocketHandlerInterface> _socketHandler; +#endif }; /// A plain, non-blocking, data streaming socket. diff --git a/net/WebSocketHandler.hpp b/net/WebSocketHandler.hpp index 3e1faabb4..0bb0ddc1f 100644 --- a/net/WebSocketHandler.hpp +++ b/net/WebSocketHandler.hpp @@ -133,6 +133,7 @@ public: if (len == 0) return false; // avoid logging. +#ifndef MOBILEAPP if (len < 2) // partial read { LOG_TRC("#" << socket->getFD() << ": Still incomplete WebSocket message, have " << len << " bytes"); @@ -200,11 +201,20 @@ public: *wsData++ = data[i] ^ mask[i % 4]; } else _wsPayload.insert(_wsPayload.end(), data, data + payloadLen); +#else + unsigned char * const p = reinterpret_cast<unsigned char*>(&socket->_inBuffer[0]); + _wsPayload.insert(_wsPayload.end(), p, p + len); + const size_t headerLen = 0; + const size_t payloadLen = len; + const bool hasMask = false; +#endif assert(_wsPayload.size() >= payloadLen); socket->_inBuffer.erase(socket->_inBuffer.begin(), socket->_inBuffer.begin() + headerLen + payloadLen); +#ifndef MOBILEAPP + // FIXME: fin, aggregating payloads into _wsPayload etc. LOG_TRC("#" << socket->getFD() << ": Incoming WebSocket message code " << static_cast<unsigned>(code) << ", fin? " << fin << ", mask? " << hasMask << ", payload length: " << _wsPayload.size() << @@ -253,6 +263,12 @@ public: break; } +#else + handleMessage(true, WSOpCode::Binary, _wsPayload); + +#endif + +#ifndef MOBILEAPP if (doClose) { if (!_shuttingDown) @@ -280,6 +296,7 @@ public: // TCP Close. socket->closeConnection(); } +#endif _wsPayload.clear(); |