From 5be68241800bd6f7ab60fdc05856f1c2844e06f1 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Tue, 14 Jan 2014 14:44:00 +0100 Subject: PBAP: transfer data via pipe (part of FDO #72112) The main advantage is that processed data can be discarded immediately. When using a plain file, the entire address book must be stored in it. It also enables suspending a transfer by stopping to read from the pipe, either via some internal API or simply freezing the syncevo-local-sync process with SIGSTOP. The drawback is that obexd does not react well to a full pipe. It simply gets stuck in a blocking write(); in other words, all obexd operations get frozen and obexd stops responding on D-Bus. --- src/backends/pbap/PbapSyncSource.cpp | 192 +++++++++++++++++++++++++++-------- src/syncevo/TmpFile.cpp | 37 ++++++- src/syncevo/TmpFile.h | 41 ++++++-- 3 files changed, 216 insertions(+), 54 deletions(-) diff --git a/src/backends/pbap/PbapSyncSource.cpp b/src/backends/pbap/PbapSyncSource.cpp index 3fa6280b..3c52af55 100644 --- a/src/backends/pbap/PbapSyncSource.cpp +++ b/src/backends/pbap/PbapSyncSource.cpp @@ -64,12 +64,28 @@ SE_BEGIN_CXX #define OBC_TRANSFER_INTERFACE_NEW5 "org.bluez.obex.Transfer1" typedef std::map Content; +typedef std::list ContactQueue; class PullAll { std::string m_buffer; // vCards kept in memory when using old obexd. TmpFile m_tmpFile; // Stored in temporary file and mmapped with more recent obexd. - Content m_content; // Refers to chunks of m_buffer or m_tmpFile without copying them. + + // When using memory-mapped files: + // refers to chunks of m_buffer or m_tmpFile without copying them via the contact number. + Content m_content; + + // When using pipe: + // - split into queue of std::strings, read from start to finish + // - discard contact strings that are no longer needed + int m_firstContactInQueue; + ContactQueue m_queue; + // - buffer for reading from pipe + char *m_pipeBuffer; + size_t m_pipeBufferSize; + size_t m_pipeBufferUsed; + size_t m_pipeBufferTotal; + int m_numContacts; // Number of existing contacts, according to GetSize() or after downloading. int m_currentContact; // Numbered starting with zero according to discovery in addVCards. boost::shared_ptr m_session; // Only set when there is a transfer ongoing. @@ -77,11 +93,27 @@ class PullAll friend class PbapSession; public: + PullAll(); + ~PullAll(); + std::string getNextID(); bool getContact(int contactNumber, pcrecpp::StringPiece &vcard); const char *addVCards(int startIndex, const pcrecpp::StringPiece &content); }; +PullAll::PullAll() : + m_firstContactInQueue(0), + m_pipeBuffer(NULL), + m_pipeBufferSize(0), + m_pipeBufferUsed(0), + m_pipeBufferTotal(0) +{} + +PullAll::~PullAll() +{ + free(m_pipeBuffer); +} + enum PullData { PULL_AS_CONFIGURED, @@ -541,7 +573,8 @@ boost::shared_ptr PbapSession::startPullAll(PullData pullData) state->m_numContacts = GDBusCXX::DBusClientCall1(*m_session, "GetSize")(); SE_LOG_DEBUG(NULL, "Expecting %d contacts.", state->m_numContacts); - state->m_tmpFile.create(); + TmpFile::Type type = getenv("SYNCEVOLUTION_PBAP_PIPE") ? TmpFile::PIPE : TmpFile::FILE; + state->m_tmpFile.create(type); SE_LOG_DEBUG(NULL, "Created temporary file for PullAll %s", state->m_tmpFile.filename().c_str()); GDBusCXX::DBusClientCall1 > pullall(*m_session, "PullAll"); std::pair tuple = @@ -590,7 +623,14 @@ const char *PullAll::addVCards(int startIndex, const pcrecpp::StringPiece &vcard pcrecpp::RE re("[\\r\\n]*(^BEGIN:VCARD.*?^END:VCARD)", pcrecpp::RE_Options().set_dotall(true).set_multiline(true)); while (re.Consume(&tmp, &vcarddata)) { - m_content[count] = vcarddata; + if (m_tmpFile.getType() == TmpFile::PIPE) { + // Must copy into queue. + m_queue.push_back(std::string()); + m_queue.back().assign(vcarddata.data(), vcarddata.size()); + } else { + // Can continue using the memory-mapped file. + m_content[count] = vcarddata; + } ++count; } @@ -644,58 +684,122 @@ bool PullAll::getContact(int contactNumber, pcrecpp::StringPiece &vcard) return false; } - Content::iterator it; - while ((it = m_content.find(contactNumber)) == m_content.end() && - m_session && - (!m_session->transferComplete() || - m_tmpFile.moreData())) { - // Wait? We rely on regular propgress signals to wake us up. - // obex 0.47 sends them every 64KB, at least in combination - // with a Samsung Galaxy SIII. This may depend on both obexd - // and the phone, so better check ourselves and perhaps do it - // less often - unmap/map can be expensive and invalidates - // some of the unread data (at least how it is implemented - // now). - while (!m_session->transferComplete() && m_tmpFile.moreData() < 128 * 1024) { - g_main_context_iteration(NULL, true); + if (m_tmpFile.getType() == TmpFile::PIPE) { + // Delete old contacts. + ContactQueue::iterator it = m_queue.begin(); + while (m_firstContactInQueue < contactNumber) { + ++it; + ++m_firstContactInQueue; } - m_session->checkForError(); - if (m_tmpFile.moreData()) { - // Remap. This shifts all addresses already stored in - // m_content, so beware and update those. - pcrecpp::StringPiece oldMem = m_tmpFile.stringPiece(); - m_tmpFile.unmap(); - m_tmpFile.map(); - pcrecpp::StringPiece newMem = m_tmpFile.stringPiece(); - ssize_t delta = newMem.data() - oldMem.data(); - BOOST_FOREACH (Content::value_type &entry, m_content) { - pcrecpp::StringPiece &vcard = entry.second; - vcard.set(vcard.data() + delta, vcard.size()); + m_queue.erase(m_queue.begin(), it); + + bool eof = false; + while (m_queue.empty() && + (!m_session->transferComplete() || !eof)) { + // Read at least 64KB, increase buffer if too + // small. Happens at least once (initial read) and may + // happen again when a contact is larger than the current + // buffer size. + static const size_t chunkSize = 64 * 1024; + if (m_pipeBufferSize - m_pipeBufferUsed < chunkSize) { + size_t newSize = m_pipeBufferSize + chunkSize; + char *newBuffer = (char *)realloc(m_pipeBuffer, newSize); + if (!newBuffer) { + // Nothing changed, but we can't proceed. + SE_THROW("getContact(): out of memory"); + } + m_pipeBuffer = newBuffer; + m_pipeBufferSize = newSize; + } + + // Try reading. Blocks until at least one byte becomes available. + ssize_t newData = read(m_tmpFile.getFD(), m_pipeBuffer + m_pipeBufferUsed, m_pipeBufferSize - m_pipeBufferUsed); + SE_LOG_DEBUG(NULL, "PBAP content: next chunk %ld, total %ld, %s", + (long)newData, (long)m_pipeBufferTotal, + newData < 0 ? strerror(errno) : "<>"); + if (newData == 0) { + eof = true; + } else if (newData < 0) { + SE_THROW(StringPrintf("reading PBAP data from pipe: %s", strerror(errno))); + } else { + m_pipeBufferUsed += newData; + m_pipeBufferTotal += newData; } // File exists and obexd has written into it, so now we // can unlink it to avoid leaking it if we crash. m_tmpFile.remove(); - // Continue parsing where we stopped before. - pcrecpp::StringPiece next(newMem.data() + m_tmpFileOffset, - newMem.size() - m_tmpFileOffset); + // Parse next chunk, shift remaining data that couldn't + // be parsed yet to beginning of buffer and continue; + pcrecpp::StringPiece next(m_pipeBuffer, m_pipeBufferUsed); const char *end = addVCards(m_content.size(), next); - int newTmpFileOffset = end - newMem.data(); - SE_LOG_DEBUG(NULL, "PBAP content parsed: %d out of %d (total), %d out of %d (last update)", - newTmpFileOffset, - newMem.size(), - (int)(end - next.data()), - next.size()); - m_tmpFileOffset = newTmpFileOffset; + size_t remaining = m_pipeBuffer + m_pipeBufferUsed - end; + memmove(m_pipeBuffer, end, remaining); + m_pipeBufferUsed = remaining; } - } - if (it == m_content.end()) { - SE_LOG_DEBUG(NULL, "did not get the expected contact #%d, perhaps some contacts were deleted?", contactNumber); - return false; + if (m_queue.empty()) { + SE_LOG_DEBUG(NULL, "did not get the expected contact #%d, perhaps some contacts were deleted?", contactNumber); + return false; + } + const std::string &next = m_queue.front(); + vcard.set(next.c_str(), next.size()); + } else { + Content::iterator it; + while ((it = m_content.find(contactNumber)) == m_content.end() && + m_session && + (!m_session->transferComplete() || + m_tmpFile.moreData())) { + // Wait? We rely on regular propgress signals to wake us up. + // obex 0.47 sends them every 64KB, at least in combination + // with a Samsung Galaxy SIII. This may depend on both obexd + // and the phone, so better check ourselves and perhaps do it + // less often - unmap/map can be expensive and invalidates + // some of the unread data (at least how it is implemented + // now). + while (!m_session->transferComplete() && m_tmpFile.moreData() < 128 * 1024) { + g_main_context_iteration(NULL, true); + } + m_session->checkForError(); + if (m_tmpFile.moreData()) { + // Remap. This shifts all addresses already stored in + // m_content, so beware and update those. + pcrecpp::StringPiece oldMem = m_tmpFile.stringPiece(); + m_tmpFile.unmap(); + m_tmpFile.map(); + pcrecpp::StringPiece newMem = m_tmpFile.stringPiece(); + ssize_t delta = newMem.data() - oldMem.data(); + BOOST_FOREACH (Content::value_type &entry, m_content) { + pcrecpp::StringPiece &vcard = entry.second; + vcard.set(vcard.data() + delta, vcard.size()); + } + + // File exists and obexd has written into it, so now we + // can unlink it to avoid leaking it if we crash. + m_tmpFile.remove(); + + // Continue parsing where we stopped before. + pcrecpp::StringPiece next(newMem.data() + m_tmpFileOffset, + newMem.size() - m_tmpFileOffset); + const char *end = addVCards(m_content.size(), next); + int newTmpFileOffset = end - newMem.data(); + SE_LOG_DEBUG(NULL, "PBAP content parsed: %d out of %d (total), %d out of %d (last update)", + newTmpFileOffset, + newMem.size(), + (int)(end - next.data()), + next.size()); + m_tmpFileOffset = newTmpFileOffset; + } + } + + if (it == m_content.end()) { + SE_LOG_DEBUG(NULL, "did not get the expected contact #%d, perhaps some contacts were deleted?", contactNumber); + return false; + } + vcard = it->second; } - vcard = it->second; + return true; } diff --git a/src/syncevo/TmpFile.cpp b/src/syncevo/TmpFile.cpp index 3e988968..d1a59811 100644 --- a/src/syncevo/TmpFile.cpp +++ b/src/syncevo/TmpFile.cpp @@ -19,7 +19,9 @@ #include +#include #include +#include #include #include @@ -27,9 +29,11 @@ #include #include "TmpFile.h" +#include "util.h" TmpFile::TmpFile() : + m_type(FILE), m_fd(-1), m_mapptr(0), m_mapsize(0) @@ -42,6 +46,10 @@ TmpFile::~TmpFile() try { unmap(); close(); + if (m_type == PIPE && + !m_filename.empty()) { + unlink(m_filename.c_str()); + } } catch (std::exception &x) { fprintf(stderr, "TmpFile::~TmpFile(): %s\n", x.what()); } catch (...) { @@ -50,7 +58,7 @@ TmpFile::~TmpFile() } -void TmpFile::create() +void TmpFile::create(Type type) { gchar *filename = NULL; GError *error = NULL; @@ -66,6 +74,33 @@ void TmpFile::create() } m_filename = filename; g_free(filename); + m_type = type; + if (type == PIPE) { + // We merely use the normal file to get a temporary file name which + // is guaranteed to be unique. There's a slight chance for a denial-of-service + // attack when someone creates a link or normal file directly after we remove + // the file, but because mknod neither overwrites an existing entry nor follows + // symlinks, the effect is smaller compared to opening a file. + unlink(m_filename.c_str()); + if (mknod(m_filename.c_str(), S_IFIFO|S_IRWXU, 0)) { + m_filename = ""; + throw TmpFileException(SyncEvo::StringPrintf("mknod(%s): %s", + m_filename.c_str(), + strerror(errno))); + } + // Open without blocking. Necessary because otherwise we end up + // waiting here. Opening later also does not work, because then + // obexd gets stuck in its open() call while we wait for it to + // acknowledge the start of the transfer. + m_fd = open(m_filename.c_str(), O_RDONLY|O_NONBLOCK, 0); + if (m_fd < 0) { + throw TmpFileException(SyncEvo::StringPrintf("open(%s): %s", + m_filename.c_str(), + strerror(errno))); + } + // From now on, block on the pipe. + fcntl(m_fd, F_SETFL, fcntl(m_fd, F_GETFL) & ~O_NONBLOCK); + } } diff --git a/src/syncevo/TmpFile.h b/src/syncevo/TmpFile.h index 9b4f676b..45261e99 100644 --- a/src/syncevo/TmpFile.h +++ b/src/syncevo/TmpFile.h @@ -40,27 +40,30 @@ class TmpFileException : public std::runtime_error /** * Class for handling temporary files, either read/write access - * or memory mapped. + * or memory mapped. Optionally creates a pipe instead of a plain file. + * + * Reading is done mapping the plain file into memory (file) or simply + * reading from the file descriptor (file or pipe). * * Closing and removing a mapped file is supported by calling close() * after map(). */ class TmpFile { - protected: - int m_fd; - void *m_mapptr; - size_t m_mapsize; - std::string m_filename; - public: + enum Type { + FILE, + PIPE + }; + TmpFile(); virtual ~TmpFile(); /** - * Create a temporary file. + * Create a temporary file or pipe. */ - void create(); + void create(Type type = FILE); + /** * Map a view of file and optionally return pointer and/or size. * @@ -70,11 +73,23 @@ class TmpFile * @param mapsize Pointer to variable for mapped size. (can be NULL) */ void map(void **mapptr = 0, size_t *mapsize = 0); + /** * Unmap a view of file. */ void unmap(); + /** + * File descriptor, ready for reading from start of file or pipe + * after create(). + */ + int getFD() const { return m_fd; } + + /** + * FILE by default, otherwise the value given to create(). + */ + Type getType() const { return m_type; } + /** * Returns amount of bytes not mapped into memory yet, zero if none. */ @@ -134,6 +149,14 @@ class TmpFile * @return pcrecpp::StringPiece of the mapped view */ pcrecpp::StringPiece stringPiece(); + + protected: + Type m_type; + int m_fd; + void *m_mapptr; + size_t m_mapsize; + std::string m_filename; + }; #endif // INCL_SYNCEVOLUTION_TMPFILE -- cgit v1.2.3