diff options
author | Luboš Luňák <l.lunak@collabora.com> | 2019-05-21 12:41:44 +0200 |
---|---|---|
committer | Luboš Luňák <l.lunak@collabora.com> | 2019-05-21 17:25:32 +0200 |
commit | aeb2014ba401707dece5d0cf3cb213ce307a5330 (patch) | |
tree | 7d542cbd43ad0a0bbe8b16f2f91808e2abb63d88 /package | |
parent | 86618248683fa4048192d15356c7e6b430e8dbb9 (diff) |
remove code confusion about threads vs thread tasks
A threadpool controls a number of threads that execute a number
of thread *tasks* from a queue. The API even says they are tasks.
So it's damn confusing when ZipPackageStream::saveChild()
claims to limit the number of threads to 4x the maximum number
of threads. It limits the number of queued thread tasks.
Change-Id: I317497f27a82d92a7c8566b14aaeae73a4ffef1f
Reviewed-on: https://gerrit.libreoffice.org/72677
Tested-by: Jenkins
Reviewed-by: Luboš Luňák <l.lunak@collabora.com>
Diffstat (limited to 'package')
-rw-r--r-- | package/inc/ZipOutputEntry.hxx | 4 | ||||
-rw-r--r-- | package/inc/ZipOutputStream.hxx | 10 | ||||
-rw-r--r-- | package/source/zipapi/ZipOutputStream.cxx | 26 | ||||
-rw-r--r-- | package/source/zippackage/ZipPackageStream.cxx | 18 |
4 files changed, 30 insertions, 28 deletions
diff --git a/package/inc/ZipOutputEntry.hxx b/package/inc/ZipOutputEntry.hxx index 15c94aecc14b..c35da5818062 100644 --- a/package/inc/ZipOutputEntry.hxx +++ b/package/inc/ZipOutputEntry.hxx @@ -36,8 +36,8 @@ class ZipPackageStream; class ZipOutputEntry { - // allow only DeflateThread to change m_bFinished using setFinished() - friend class DeflateThread; + // allow only DeflateThreadTask to change m_bFinished using setFinished() + friend class DeflateThreadTask; css::uno::Sequence< sal_Int8 > m_aDeflateBuffer; ZipUtils::Deflater m_aDeflater; diff --git a/package/inc/ZipOutputStream.hxx b/package/inc/ZipOutputStream.hxx index 814413da041e..ff7b66d64507 100644 --- a/package/inc/ZipOutputStream.hxx +++ b/package/inc/ZipOutputStream.hxx @@ -47,7 +47,7 @@ public: const css::uno::Reference< css::io::XOutputStream > &xOStream ); ~ZipOutputStream(); - void addDeflatingThread( ZipOutputEntry *pEntry, std::unique_ptr<comphelper::ThreadTask> pThreadTask ); + void addDeflatingThreadTask( ZipOutputEntry *pEntry, std::unique_ptr<comphelper::ThreadTask> pThreadTask ); /// @throws css::io::IOException /// @throws css::uno::RuntimeException @@ -79,12 +79,12 @@ private: void writeEXT( const ZipEntry &rEntry ); // ScheduledThread handling helpers - void consumeScheduledThreadEntry(std::unique_ptr<ZipOutputEntry> pCandidate); - void consumeFinishedScheduledThreadEntries(); + void consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntry> pCandidate); + void consumeFinishedScheduledThreadTaskEntries(); public: - void reduceScheduledThreadsToGivenNumberOrLess( - sal_Int32 nThreads); + void reduceScheduledThreadTasksToGivenNumberOrLess( + sal_Int32 nThreadTasks); const std::shared_ptr<comphelper::ThreadTaskTag>& getThreadTaskTag() { return mpThreadTaskTag; } }; diff --git a/package/source/zipapi/ZipOutputStream.cxx b/package/source/zipapi/ZipOutputStream.cxx index 8fc1f581ac49..5d90224981b0 100644 --- a/package/source/zipapi/ZipOutputStream.cxx +++ b/package/source/zipapi/ZipOutputStream.cxx @@ -68,9 +68,9 @@ void ZipOutputStream::setEntry( ZipEntry *pEntry ) } } -void ZipOutputStream::addDeflatingThread( ZipOutputEntry *pEntry, std::unique_ptr<comphelper::ThreadTask> pThread ) +void ZipOutputStream::addDeflatingThreadTask( ZipOutputEntry *pEntry, std::unique_ptr<comphelper::ThreadTask> pTask ) { - comphelper::ThreadPool::getSharedOptimalPool().pushTask(std::move(pThread)); + comphelper::ThreadPool::getSharedOptimalPool().pushTask(std::move(pTask)); m_aEntries.push_back(pEntry); } @@ -91,14 +91,14 @@ void ZipOutputStream::rawCloseEntry( bool bEncrypt ) m_pCurrentEntry = nullptr; } -void ZipOutputStream::consumeScheduledThreadEntry(std::unique_ptr<ZipOutputEntry> pCandidate) +void ZipOutputStream::consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntry> pCandidate) { //Any exceptions thrown in the threads were caught and stored for now const std::exception_ptr& rCaughtException(pCandidate->getParallelDeflateException()); if (rCaughtException) { m_aDeflateException = rCaughtException; // store it for later throwing - // the exception handler in DeflateThread should have cleaned temp file + // the exception handler in DeflateThreadTask should have cleaned temp file return; } @@ -124,7 +124,7 @@ void ZipOutputStream::consumeScheduledThreadEntry(std::unique_ptr<ZipOutputEntry pCandidate->deleteBufferFile(); } -void ZipOutputStream::consumeFinishedScheduledThreadEntries() +void ZipOutputStream::consumeFinishedScheduledThreadTaskEntries() { std::vector< ZipOutputEntry* > aNonFinishedEntries; @@ -132,7 +132,7 @@ void ZipOutputStream::consumeFinishedScheduledThreadEntries() { if(pEntry->isFinished()) { - consumeScheduledThreadEntry(std::unique_ptr<ZipOutputEntry>(pEntry)); + consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntry>(pEntry)); } else { @@ -144,13 +144,13 @@ void ZipOutputStream::consumeFinishedScheduledThreadEntries() m_aEntries = aNonFinishedEntries; } -void ZipOutputStream::reduceScheduledThreadsToGivenNumberOrLess(sal_Int32 nThreads) +void ZipOutputStream::reduceScheduledThreadTasksToGivenNumberOrLess(sal_Int32 nThreadTasks) { - while(static_cast< sal_Int32 >(m_aEntries.size()) > nThreads) + while(static_cast< sal_Int32 >(m_aEntries.size()) > nThreadTasks) { - consumeFinishedScheduledThreadEntries(); + consumeFinishedScheduledThreadTaskEntries(); - if(static_cast< sal_Int32 >(m_aEntries.size()) > nThreads) + if(static_cast< sal_Int32 >(m_aEntries.size()) > nThreadTasks) { osl::Thread::wait(std::chrono::microseconds(100)); } @@ -161,7 +161,7 @@ void ZipOutputStream::finish() { assert(!m_aZipList.empty() && "Zip file must have at least one entry!"); - // Wait for all threads to finish & write + // Wait for all thread tasks to finish & write comphelper::ThreadPool::getSharedOptimalPool().waitUntilDone(mpThreadTaskTag); // consume all processed entries @@ -169,7 +169,7 @@ void ZipOutputStream::finish() { ZipOutputEntry* pCandidate = m_aEntries.back(); m_aEntries.pop_back(); - consumeScheduledThreadEntry(std::unique_ptr<ZipOutputEntry>(pCandidate)); + consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntry>(pCandidate)); } sal_Int32 nOffset= static_cast < sal_Int32 > (m_aChucker.GetPosition()); @@ -183,7 +183,7 @@ void ZipOutputStream::finish() m_aZipList.clear(); if (m_aDeflateException) - { // throw once all threads are finished and m_aEntries can be released + { // throw once all thread tasks are finished and m_aEntries can be released std::rethrow_exception(m_aDeflateException); } } diff --git a/package/source/zippackage/ZipPackageStream.cxx b/package/source/zippackage/ZipPackageStream.cxx index db58a2892c33..1d6cd237c8e1 100644 --- a/package/source/zippackage/ZipPackageStream.cxx +++ b/package/source/zippackage/ZipPackageStream.cxx @@ -448,14 +448,14 @@ static void deflateZipEntry(ZipOutputEntry *pZipEntry, pZipEntry->closeEntry(); } -class DeflateThread: public comphelper::ThreadTask +class DeflateThreadTask: public comphelper::ThreadTask { ZipOutputEntry *mpEntry; uno::Reference< io::XInputStream > mxInStream; public: - DeflateThread( const std::shared_ptr<comphelper::ThreadTaskTag>& pTag, ZipOutputEntry *pEntry, - const uno::Reference< io::XInputStream >& xInStream ) + DeflateThreadTask( const std::shared_ptr<comphelper::ThreadTaskTag>& pTag, ZipOutputEntry *pEntry, + const uno::Reference< io::XInputStream >& xInStream ) : comphelper::ThreadTask(pTag) , mpEntry(pEntry) , mxInStream(xInStream) @@ -826,17 +826,19 @@ bool ZipPackageStream::saveChild( if (bParallelDeflate) { - // tdf#93553 limit to a useful amount of threads. Taking number of available + // tdf#93553 limit to a useful amount of pending tasks. Having way too many + // tasks pending may use a lot of memory. Take number of available // cores and allow 4-times the amount for having the queue well filled. The // 2nd parameter is the time to wait between cleanups in 10th of a second. // Both values may be added to the configuration settings if needed. - static sal_Int32 nAllowedThreads(comphelper::ThreadPool::getPreferredConcurrency() * 4); - rZipOut.reduceScheduledThreadsToGivenNumberOrLess(nAllowedThreads); + static sal_Int32 nAllowedTasks(comphelper::ThreadPool::getPreferredConcurrency() * 4); + rZipOut.reduceScheduledThreadTasksToGivenNumberOrLess(nAllowedTasks); - // Start a new thread deflating this zip entry + // Start a new thread task deflating this zip entry ZipOutputEntry *pZipEntry = new ZipOutputEntry( m_xContext, *pTempEntry, this, bToBeEncrypted); - rZipOut.addDeflatingThread( pZipEntry, std::make_unique<DeflateThread>(rZipOut.getThreadTaskTag(), pZipEntry, xStream) ); + rZipOut.addDeflatingThreadTask( pZipEntry, + std::make_unique<DeflateThreadTask>(rZipOut.getThreadTaskTag(), pZipEntry, xStream) ); } else { |