summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
authorTor Lillqvist <tml@iki.fi>2018-09-11 09:30:55 +0300
committerTor Lillqvist <tml@iki.fi>2018-09-12 18:32:05 +0300
commitb59d160a085796c567e114a62483438ed761c31a (patch)
tree347d13b6943c92864880620ae4cc58c6a06cd6a1 /net
parentc7a62fb304134d57008e3cc2c29b9178aa4c86fe (diff)
Intermediate commit of work in progress on an iOS app
The app is unimaginatively called "Mobile" for now. Runs but crashes pretty quickly after loading the document by the LO core. Will need some heavy changes to get a ClientSession object created in there, too, to handle the (emulated) WebSocket messages from the JavaScript. It would then handle some of these messages itself, and forwards some to the ChildSession, which in this case is in the same process. Now the messsages from the JavaScript go to a ChildSession, which is wrong. As the assertion says, "Tile traffic should go through the DocumentBroker-LoKit WS"
Diffstat (limited to 'net')
-rw-r--r--net/Socket.cpp19
-rw-r--r--net/Socket.hpp36
-rw-r--r--net/WebSocketHandler.hpp17
3 files changed, 71 insertions, 1 deletions
diff --git a/net/Socket.cpp b/net/Socket.cpp
index 38bacdde8..74a0dff16 100644
--- a/net/Socket.cpp
+++ b/net/Socket.cpp
@@ -74,6 +74,8 @@ SocketPoll::SocketPoll(const std::string& threadName)
std::lock_guard<std::mutex> lock(getPollWakeupsMutex());
getWakeupsArray().push_back(_wakeup[1]);
+#else
+ _bufferEmpty = true;
#endif
}
@@ -136,6 +138,21 @@ void SocketPoll::joinThread()
#endif
+#ifdef MOBILEAPP
+
+void SocketPoll::feed(const std::string& payload)
+{
+ std::unique_lock<std::mutex> lock(_bufferMutex);
+ if (!_bufferEmpty)
+ _bufferCV.wait(lock, [&]{return _bufferEmpty;});
+ _buffer = payload;
+ _bufferEmpty = false;
+ lock.unlock();
+ _bufferCV.notify_one();
+}
+
+#endif
+
void SocketPoll::wakeupWorld()
{
#ifndef MOBILEAPP
@@ -231,6 +248,8 @@ void SocketPoll::insertNewWebSocketSync(const Poco::URI &uri, const std::shared_
}
else
LOG_ERR("Failed to lookup client websocket host '" << uri.getHost() << "' skipping");
+#else
+ _socketHandler = websocketHandler;
#endif
}
diff --git a/net/Socket.hpp b/net/Socket.hpp
index 2a336f8fc..074734ca9 100644
--- a/net/Socket.hpp
+++ b/net/Socket.hpp
@@ -31,7 +31,6 @@
#include <mutex>
#include <sstream>
#include <thread>
-#include <atomic>
#include "Common.hpp"
#include "Log.hpp"
@@ -341,6 +340,10 @@ public:
/// Append pretty printed internal state to a line
virtual void dumpState(std::ostream& os) { os << "\n"; }
+
+ virtual void handleMessage(const std::string& /*message*/)
+ {
+ }
};
/// Handles non-blocking socket event polling.
@@ -528,9 +531,18 @@ public:
disposition.execute();
}
+#else
+ std::unique_lock<std::mutex> lock(_bufferMutex);
+ if (_bufferEmpty)
+ _bufferCV.wait(lock, [&]{return !_bufferEmpty;});
+ _socketHandler->handleMessage(_buffer);
+ _bufferEmpty = true;
+ lock.unlock();
+ _bufferCV.notify_one();
#endif
}
+#ifndef MOBILEAPP
/// Write to a wakeup descriptor
static void wakeup (int fd)
{
@@ -543,15 +555,18 @@ public:
if (rc == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
LOG_SYS("wakeup socket #" << fd << " is closed at wakeup?");
}
+#endif
/// Wakeup the main polling loop in another thread
void wakeup()
{
+#ifndef MOBILEAPP
if (!isAlive())
LOG_WRN("Waking up dead poll thread [" << _name << "], started: " <<
_threadStarted << ", finished: " << _threadFinished);
wakeup(_wakeup[1]);
+#endif
}
/// Global wakeup - signal safe: wakeup all socket polls.
@@ -618,7 +633,15 @@ public:
/// Stop and join the polling thread before returning (if active)
void joinThread();
+#ifdef MOBILEAPP
+ // In the mobile app(s), a SocketPoll doesn't actually "poll" any "sockets" but simply acts as a
+ // conduit for sending messages that correspond to what we would receive as WebSocket messages
+ // in the server online from the app code into the shared online code.
+ void feed(const std::string& payload);
+#endif
+
private:
+#ifndef MOBILEAPP
/// Initialize the poll fds array with the right events
void setupPollFds(std::chrono::steady_clock::time_point now,
int &timeoutMaxMs)
@@ -648,6 +671,7 @@ private:
_pollFds[size].events = POLLIN;
_pollFds[size].revents = 0;
}
+#endif
/// The polling thread entry.
/// Used to set the thread name and mark the thread as stopped when done.
@@ -681,8 +705,10 @@ private:
/// Debug name used for logging.
const std::string _name;
+#ifndef MOBILEAPP
/// main-loop wakeup pipe
int _wakeup[2];
+#endif
/// The sockets we're controlling
std::vector<std::shared_ptr<Socket>> _pollSockets;
/// Protects _newSockets
@@ -700,6 +726,14 @@ protected:
std::atomic<bool> _threadStarted;
std::atomic<bool> _threadFinished;
std::thread::id _owner;
+
+#ifdef MOBILEAPP
+ std::mutex _bufferMutex;
+ std::condition_variable _bufferCV;
+ bool _bufferEmpty;
+ std::string _buffer;
+ std::shared_ptr<SocketHandlerInterface> _socketHandler;
+#endif
};
/// A plain, non-blocking, data streaming socket.
diff --git a/net/WebSocketHandler.hpp b/net/WebSocketHandler.hpp
index 3e1faabb4..0bb0ddc1f 100644
--- a/net/WebSocketHandler.hpp
+++ b/net/WebSocketHandler.hpp
@@ -133,6 +133,7 @@ public:
if (len == 0)
return false; // avoid logging.
+#ifndef MOBILEAPP
if (len < 2) // partial read
{
LOG_TRC("#" << socket->getFD() << ": Still incomplete WebSocket message, have " << len << " bytes");
@@ -200,11 +201,20 @@ public:
*wsData++ = data[i] ^ mask[i % 4];
} else
_wsPayload.insert(_wsPayload.end(), data, data + payloadLen);
+#else
+ unsigned char * const p = reinterpret_cast<unsigned char*>(&socket->_inBuffer[0]);
+ _wsPayload.insert(_wsPayload.end(), p, p + len);
+ const size_t headerLen = 0;
+ const size_t payloadLen = len;
+ const bool hasMask = false;
+#endif
assert(_wsPayload.size() >= payloadLen);
socket->_inBuffer.erase(socket->_inBuffer.begin(), socket->_inBuffer.begin() + headerLen + payloadLen);
+#ifndef MOBILEAPP
+
// FIXME: fin, aggregating payloads into _wsPayload etc.
LOG_TRC("#" << socket->getFD() << ": Incoming WebSocket message code " << static_cast<unsigned>(code) <<
", fin? " << fin << ", mask? " << hasMask << ", payload length: " << _wsPayload.size() <<
@@ -253,6 +263,12 @@ public:
break;
}
+#else
+ handleMessage(true, WSOpCode::Binary, _wsPayload);
+
+#endif
+
+#ifndef MOBILEAPP
if (doClose)
{
if (!_shuttingDown)
@@ -280,6 +296,7 @@ public:
// TCP Close.
socket->closeConnection();
}
+#endif
_wsPayload.clear();