diff options
author | Luboš Luňák <l.lunak@collabora.com> | 2019-05-25 12:28:27 +0200 |
---|---|---|
committer | Luboš Luňák <l.lunak@collabora.com> | 2019-05-28 12:28:01 +0200 |
commit | aa44e10942937452930255be156c1b29247ee969 (patch) | |
tree | db5258343c82c434d1026870285ae9f21727f915 /package | |
parent | 7cd3f267cfbf3655f6a7a395b80560ecd22e15f7 (diff) |
parallel deflate compression (one stream, multiple threads)
ZipPackageStream::saveChild() already had one threaded compression,
but that still uses only one thread for one stream. Many documents
contain many streams (where this is useful), but large documents
often contain one huge content.xml, which then would be compressed
using just one thread.
But it is in fact possible to do deflate in parallel on the same data,
at the cost of somewhat increased CPU usage (spread over threads).
This is handled separately from the background thread path, as
integrating these two approaches would probably be needlessly complex
(since they both internally use ThreadPool, the tasks should often
intermix and parallelize anyway).
On my 4-core (8 HT threads) machine this reduces the compression time
of tdf#113042 from 3s to 1s.
Change-Id: Ifbc889a27966f97eb1ce2ce01c5fb0b151a1bdf8
Reviewed-on: https://gerrit.libreoffice.org/73032
Tested-by: Jenkins
Reviewed-by: Luboš Luňák <l.lunak@collabora.com>
Diffstat (limited to 'package')
-rw-r--r-- | package/Library_package2.mk | 1 | ||||
-rw-r--r-- | package/inc/ThreadedDeflater.hxx | 62 | ||||
-rw-r--r-- | package/inc/ZipOutputEntry.hxx | 68 | ||||
-rw-r--r-- | package/source/zipapi/ThreadedDeflater.cxx | 181 | ||||
-rw-r--r-- | package/source/zipapi/ZipOutputEntry.cxx | 195 | ||||
-rw-r--r-- | package/source/zippackage/ZipPackageStream.cxx | 30 |
6 files changed, 477 insertions, 60 deletions
diff --git a/package/Library_package2.mk b/package/Library_package2.mk index 75a15f0e0d08..195c87f9ff4b 100644 --- a/package/Library_package2.mk +++ b/package/Library_package2.mk @@ -53,6 +53,7 @@ $(eval $(call gb_Library_add_exception_objects,package2,\ package/source/zipapi/Deflater \ package/source/zipapi/Inflater \ package/source/zipapi/sha1context \ + package/source/zipapi/ThreadedDeflater \ package/source/zipapi/XBufferedThreadedStream \ package/source/zipapi/XUnbufferedStream \ package/source/zipapi/ZipEnumeration \ diff --git a/package/inc/ThreadedDeflater.hxx b/package/inc/ThreadedDeflater.hxx new file mode 100644 index 000000000000..90801700a37e --- /dev/null +++ b/package/inc/ThreadedDeflater.hxx @@ -0,0 +1,62 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#ifndef INCLUDED_PACKAGE_THREADEDDEFLATER_HXX +#define INCLUDED_PACKAGE_THREADEDDEFLATER_HXX + +#include <com/sun/star/uno/Sequence.hxx> +#include <package/packagedllapi.hxx> +#include <comphelper/threadpool.hxx> +#include <atomic> +#include <memory> + +namespace ZipUtils +{ +/// Parallel compression a stream using the libz deflate algorithm. +/// +/// Almost a replacement for the Deflater class. Call startDeflate() with the data, +/// check with finished() or waitForTasks() and retrieve result with getOutput(). +/// The class will internally split into multiple threads. +class ThreadedDeflater final +{ + class Task; + // Note: All this should be lock-less. Each task writes only to its part + // of the data, flags are atomic. + css::uno::Sequence<sal_Int8> inBuffer; + int zlibLevel; + std::shared_ptr<comphelper::ThreadTaskTag> threadTaskTag; + std::atomic<int> pendingTasksCount; + std::vector<std::vector<sal_Int8>> outBuffers; + +public: + // Unlike with Deflater class, bNoWrap is always true. + ThreadedDeflater(sal_Int32 nSetLevel); + ~ThreadedDeflater(); + void startDeflate(const css::uno::Sequence<sal_Int8>& rBuffer); + void waitForTasks(); + bool finished() const; + css::uno::Sequence<sal_Int8> getOutput() const; + void clear(); +}; + +} // namespace + +#endif + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/package/inc/ZipOutputEntry.hxx b/package/inc/ZipOutputEntry.hxx index af6528f04ea1..7234d890f4c2 100644 --- a/package/inc/ZipOutputEntry.hxx +++ b/package/inc/ZipOutputEntry.hxx @@ -35,11 +35,9 @@ struct ZipEntry; class ZipPackageBuffer; class ZipPackageStream; -class ZipOutputEntry +class ZipOutputEntryBase { protected: - css::uno::Sequence< sal_Int8 > m_aDeflateBuffer; - ZipUtils::Deflater m_aDeflater; css::uno::Reference< css::uno::XComponentContext > m_xContext; css::uno::Reference< css::io::XOutputStream > m_xOutStream; @@ -53,10 +51,9 @@ protected: bool const m_bEncryptCurrentEntry; public: - ZipOutputEntry( - const css::uno::Reference< css::io::XOutputStream >& rxOutStream, - const css::uno::Reference< css::uno::XComponentContext >& rxContext, - ZipEntry& rEntry, ZipPackageStream* pStream, bool bEncrypt); + virtual ~ZipOutputEntryBase() = default; + + virtual void writeStream(const css::uno::Reference< css::io::XInputStream >& xInStream) = 0; ZipEntry* getZipEntry() { return m_pCurrentEntry; } ZipPackageStream* getZipPackageStream() { return m_pCurrentStream; } @@ -64,7 +61,36 @@ public: void closeEntry(); - void writeStream(const css::uno::Reference< css::io::XInputStream >& xInStream); +protected: + ZipOutputEntryBase( + const css::uno::Reference< css::io::XOutputStream >& rxOutStream, + const css::uno::Reference< css::uno::XComponentContext >& rxContext, + ZipEntry& rEntry, ZipPackageStream* pStream, bool bEncrypt, bool checkStream); + + // Inherited classes call this with deflated data buffer. + void processDeflated( const css::uno::Sequence< sal_Int8 >& deflateBuffer, sal_Int32 nLength ); + // Inherited classes call this with the input buffer. + void processInput( const css::uno::Sequence< sal_Int8 >& rBuffer ); + + virtual void finishDeflater() = 0; + virtual sal_Int64 getDeflaterTotalIn() const = 0; + virtual sal_Int64 getDeflaterTotalOut() const = 0; + virtual void deflaterReset() = 0; + virtual bool isDeflaterFinished() const = 0; +}; + +// Normal non-threaded case. +class ZipOutputEntry : public ZipOutputEntryBase +{ + css::uno::Sequence< sal_Int8 > m_aDeflateBuffer; + ZipUtils::Deflater m_aDeflater; + +public: + ZipOutputEntry( + const css::uno::Reference< css::io::XOutputStream >& rxOutStream, + const css::uno::Reference< css::uno::XComponentContext >& rxContext, + ZipEntry& rEntry, ZipPackageStream* pStream, bool bEncrypt); + void writeStream(const css::uno::Reference< css::io::XInputStream >& xInStream) override; void write(const css::uno::Sequence< sal_Int8 >& rBuffer); protected: @@ -72,10 +98,15 @@ protected: const css::uno::Reference< css::io::XOutputStream >& rxOutStream, const css::uno::Reference< css::uno::XComponentContext >& rxContext, ZipEntry& rEntry, ZipPackageStream* pStream, bool bEncrypt, bool checkStream); + virtual void finishDeflater() override; + virtual sal_Int64 getDeflaterTotalIn() const override; + virtual sal_Int64 getDeflaterTotalOut() const override; + virtual void deflaterReset() override; + virtual bool isDeflaterFinished() const override; void doDeflate(); }; -// Class that runs the compression in a thread. +// Class that runs the compression in a background thread. class ZipOutputEntryInThread : public ZipOutputEntry { class Task; @@ -103,6 +134,25 @@ private: void setFinished() { m_bFinished = true; } }; +// Class that synchronously runs the compression in multiple threads (using ThreadDeflater). +class ZipOutputEntryParallel : public ZipOutputEntryBase +{ + sal_Int64 totalIn; + sal_Int64 totalOut; +public: + ZipOutputEntryParallel( + const css::uno::Reference< css::io::XOutputStream >& rxOutStream, + const css::uno::Reference< css::uno::XComponentContext >& rxContext, + ZipEntry& rEntry, ZipPackageStream* pStream, bool bEncrypt); + void writeStream(const css::uno::Reference< css::io::XInputStream >& xInStream) override; +protected: + virtual void finishDeflater() override; + virtual sal_Int64 getDeflaterTotalIn() const override; + virtual sal_Int64 getDeflaterTotalOut() const override; + virtual void deflaterReset() override; + virtual bool isDeflaterFinished() const override; +}; + #endif /* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/package/source/zipapi/ThreadedDeflater.cxx b/package/source/zipapi/ThreadedDeflater.cxx new file mode 100644 index 000000000000..b136981b3bdb --- /dev/null +++ b/package/source/zipapi/ThreadedDeflater.cxx @@ -0,0 +1,181 @@ +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* + * This file is part of the LibreOffice project. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * This file incorporates work covered by the following license notice: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright + * ownership. The ASF licenses this file to you under the Apache + * License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of + * the License at http://www.apache.org/licenses/LICENSE-2.0 . + */ + +#include <ThreadedDeflater.hxx> +#include <zlib.h> +#include <com/sun/star/packages/zip/ZipConstants.hpp> +#include <sal/log.hxx> + +using namespace com::sun::star::packages::zip::ZipConstants; +using namespace com::sun::star; + +namespace ZipUtils +{ +const sal_Int64 MaxBlockSize = 128 * 1024; + +// Parallel ZLIB compression using threads. The class internally splits the data into +// blocks and spawns ThreadPool tasks to process them independently. This is achieved +// in a similar way how pigz works, see comments from Mark Adler at +// https://stackoverflow.com/questions/30294766/how-to-use-multiple-threads-for-zlib-compression +// and +// https://stackoverflow.com/questions/30794053/how-to-use-multiple-threads-for-zlib-compression-same-input-source + +// Everything here should be either read-only, or writing to distinct data, or atomic. + +class ThreadedDeflater::Task : public comphelper::ThreadTask +{ + z_stream stream; + ThreadedDeflater* deflater; + int sequence; + int blockSize; + +public: + Task(ThreadedDeflater* deflater_, int sequence_, int blockSize_) + : comphelper::ThreadTask(deflater_->threadTaskTag) + , deflater(deflater_) + , sequence(sequence_) + , blockSize(blockSize_) + { + } + +private: + virtual void doWork() override; +}; + +ThreadedDeflater::ThreadedDeflater(sal_Int32 nSetLevel) + : zlibLevel(nSetLevel) + , threadTaskTag(comphelper::ThreadPool::createThreadTaskTag()) + , pendingTasksCount(0) +{ +} + +ThreadedDeflater::~ThreadedDeflater() +{ + waitForTasks(); + clear(); +} + +void ThreadedDeflater::startDeflate(const uno::Sequence<sal_Int8>& rBuffer) +{ + inBuffer = rBuffer; + sal_Int64 size = inBuffer.getLength(); + int tasksCount = (size + MaxBlockSize - 1) / MaxBlockSize; + tasksCount = std::max(tasksCount, 1); + pendingTasksCount = tasksCount; + outBuffers.resize(pendingTasksCount); + for (int sequence = 0; sequence < tasksCount; ++sequence) + { + sal_Int64 thisSize = std::min(MaxBlockSize, size); + size -= thisSize; + comphelper::ThreadPool::getSharedOptimalPool().pushTask( + std::make_unique<Task>(this, sequence, thisSize)); + } + assert(size == 0); +} + +bool ThreadedDeflater::finished() const { return pendingTasksCount == 0; } + +css::uno::Sequence<sal_Int8> ThreadedDeflater::getOutput() const +{ + assert(finished()); + sal_Int64 totalSize = 0; + for (const auto& buffer : outBuffers) + totalSize += buffer.size(); + uno::Sequence<sal_Int8> outBuffer(totalSize); + auto pos = outBuffer.begin(); + for (const auto& buffer : outBuffers) + pos = std::copy(buffer.begin(), buffer.end(), pos); + return outBuffer; +} + +void ThreadedDeflater::waitForTasks() +{ + comphelper::ThreadPool::getSharedOptimalPool().waitUntilDone(threadTaskTag); +} + +void ThreadedDeflater::clear() +{ + assert(finished()); + inBuffer = uno::Sequence<sal_Int8>(); + outBuffers.clear(); +} + +#if defined Z_PREFIX +#define deflateInit2 z_deflateInit2 +#define deflateBound z_deflateBound +#define deflateSetDictionary z_deflateSetDictionary +#define deflate z_deflate +#define deflateEnd z_deflateEnd +#endif + +void ThreadedDeflater::Task::doWork() +{ + stream.zalloc = nullptr; + stream.zfree = nullptr; + stream.opaque = nullptr; + // -MAX_WBITS means 32k window size and raw stream + if (deflateInit2(&stream, deflater->zlibLevel, Z_DEFLATED, -MAX_WBITS, DEF_MEM_LEVEL, + Z_DEFAULT_STRATEGY) + != Z_OK) + { + SAL_WARN("package.threadeddeflate", "deflateInit2() failed"); + abort(); + } + // Find out size for our output buffer to be large enough for deflate() needing to be called just once. + sal_Int64 outputMaxSize = deflateBound(&stream, blockSize); + // add extra size for Z_SYNC_FLUSH + outputMaxSize += 20; + deflater->outBuffers[sequence].resize(outputMaxSize); + sal_Int64 myInBufferStart = sequence * MaxBlockSize; + // zlib doesn't handle const properly + unsigned char* inBufferPtr = reinterpret_cast<unsigned char*>( + const_cast<signed char*>(deflater->inBuffer.getConstArray())); + if (sequence != 0) + { + // the window size is 32k, so set last 32k of previous data as the dictionary + assert(MAX_WBITS == 15); + assert(MaxBlockSize >= 32768); + deflateSetDictionary(&stream, inBufferPtr + myInBufferStart - 32768, 32768); + } + stream.next_in = inBufferPtr + myInBufferStart; + stream.avail_in = blockSize; + stream.next_out = reinterpret_cast<unsigned char*>(deflater->outBuffers[sequence].data()); + stream.avail_out = outputMaxSize; + bool last = sequence == int(deflater->outBuffers.size() - 1); // Last block? + // The trick is in using Z_SYNC_FLUSH instead of Z_NO_FLUSH. It will align the data at a byte boundary, + // and since we use a raw stream, the data blocks then can be simply concatenated. + int res = deflate(&stream, last ? Z_FINISH : Z_SYNC_FLUSH); + assert(stream.avail_in == 0); // Check that everything has been deflated. + if (last ? res == Z_STREAM_END : res == Z_OK) + { // ok + sal_Int64 outSize = outputMaxSize - stream.avail_out; + deflater->outBuffers[sequence].resize(outSize); + --deflater->pendingTasksCount; + } + else + { + SAL_WARN("package.threadeddeflate", "deflate() failed"); + abort(); + } + deflateEnd(&stream); +} + +} // namespace + +/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/package/source/zipapi/ZipOutputEntry.cxx b/package/source/zipapi/ZipOutputEntry.cxx index 74281fd063dd..bee9d0aeb70c 100644 --- a/package/source/zipapi/ZipOutputEntry.cxx +++ b/package/source/zipapi/ZipOutputEntry.cxx @@ -27,6 +27,7 @@ #include <osl/diagnose.h> #include <PackageConstants.hxx> +#include <ThreadedDeflater.hxx> #include <ZipEntry.hxx> #include <ZipFile.hxx> #include <ZipPackageBuffer.hxx> @@ -41,16 +42,14 @@ using namespace com::sun::star::packages::zip::ZipConstants; /** This class is used to deflate Zip entries */ -ZipOutputEntry::ZipOutputEntry( +ZipOutputEntryBase::ZipOutputEntryBase( const css::uno::Reference< css::io::XOutputStream >& rxOutput, const uno::Reference< uno::XComponentContext >& rxContext, ZipEntry& rEntry, ZipPackageStream* pStream, bool bEncrypt, bool checkStream) -: m_aDeflateBuffer(n_ConstBufferSize) -, m_aDeflater(DEFAULT_COMPRESSION, true) -, m_xContext(rxContext) +: m_xContext(rxContext) , m_xOutStream(rxOutput) , m_pCurrentEntry(&rEntry) , m_nDigested(0) @@ -67,33 +66,21 @@ ZipOutputEntry::ZipOutputEntry( } } -ZipOutputEntry::ZipOutputEntry( - const css::uno::Reference< css::io::XOutputStream >& rxOutput, - const uno::Reference< uno::XComponentContext >& rxContext, - ZipEntry& rEntry, - ZipPackageStream* pStream, - bool bEncrypt) -: ZipOutputEntry( rxOutput, rxContext, rEntry, pStream, bEncrypt, true) -{ -} - -void ZipOutputEntry::closeEntry() +void ZipOutputEntryBase::closeEntry() { - m_aDeflater.finish(); - while (!m_aDeflater.finished()) - doDeflate(); + finishDeflater(); if ((m_pCurrentEntry->nFlag & 8) == 0) { - if (m_pCurrentEntry->nSize != m_aDeflater.getTotalIn()) + if (m_pCurrentEntry->nSize != getDeflaterTotalIn()) { OSL_FAIL("Invalid entry size"); } - if (m_pCurrentEntry->nCompressedSize != m_aDeflater.getTotalOut()) + if (m_pCurrentEntry->nCompressedSize != getDeflaterTotalOut()) { // Different compression strategies make the merit of this // test somewhat dubious - m_pCurrentEntry->nCompressedSize = m_aDeflater.getTotalOut(); + m_pCurrentEntry->nCompressedSize = getDeflaterTotalOut(); } if (m_pCurrentEntry->nCrc != m_aCRC.getValue()) { @@ -104,12 +91,12 @@ void ZipOutputEntry::closeEntry() { if ( !m_bEncryptCurrentEntry ) { - m_pCurrentEntry->nSize = m_aDeflater.getTotalIn(); - m_pCurrentEntry->nCompressedSize = m_aDeflater.getTotalOut(); + m_pCurrentEntry->nSize = getDeflaterTotalIn(); + m_pCurrentEntry->nCompressedSize = getDeflaterTotalOut(); } m_pCurrentEntry->nCrc = m_aCRC.getValue(); } - m_aDeflater.reset(); + deflaterReset(); m_aCRC.reset(); if (m_bEncryptCurrentEntry) @@ -128,25 +115,11 @@ void ZipOutputEntry::closeEntry() } } -void ZipOutputEntry::write( const Sequence< sal_Int8 >& rBuffer ) +void ZipOutputEntryBase::processDeflated( const uno::Sequence< sal_Int8 >& deflateBuffer, sal_Int32 nLength ) { - if (!m_aDeflater.finished()) - { - m_aDeflater.setInputSegment(rBuffer); - while (!m_aDeflater.needsInput()) - doDeflate(); - if (!m_bEncryptCurrentEntry) - m_aCRC.updateSegment(rBuffer, rBuffer.getLength()); - } -} - -void ZipOutputEntry::doDeflate() -{ - sal_Int32 nLength = m_aDeflater.doDeflateSegment(m_aDeflateBuffer, m_aDeflateBuffer.getLength()); - if ( nLength > 0 ) { - uno::Sequence< sal_Int8 > aTmpBuffer( m_aDeflateBuffer.getConstArray(), nLength ); + uno::Sequence< sal_Int8 > aTmpBuffer( deflateBuffer.getConstArray(), nLength ); if ( m_bEncryptCurrentEntry && m_xDigestContext.is() && m_xCipherContext.is() ) { // Need to update our digest before encryption... @@ -175,7 +148,7 @@ void ZipOutputEntry::doDeflate() } } - if ( m_aDeflater.finished() && m_bEncryptCurrentEntry && m_xDigestContext.is() && m_xCipherContext.is() ) + if ( isDeflaterFinished() && m_bEncryptCurrentEntry && m_xDigestContext.is() && m_xCipherContext.is() ) { // FIXME64: sequence not 64bit safe. uno::Sequence< sal_Int8 > aEncryptionBuffer = m_xCipherContext->finalizeCipherContextAndDispose(); @@ -191,6 +164,80 @@ void ZipOutputEntry::doDeflate() } } +void ZipOutputEntryBase::processInput( const uno::Sequence< sal_Int8 >& rBuffer ) +{ + if (!m_bEncryptCurrentEntry) + m_aCRC.updateSegment(rBuffer, rBuffer.getLength()); +} + +ZipOutputEntry::ZipOutputEntry( + const css::uno::Reference< css::io::XOutputStream >& rxOutput, + const uno::Reference< uno::XComponentContext >& rxContext, + ZipEntry& rEntry, + ZipPackageStream* pStream, + bool bEncrypt, + bool checkStream) +: ZipOutputEntryBase(rxOutput, rxContext, rEntry, pStream, bEncrypt, checkStream) +, m_aDeflateBuffer(n_ConstBufferSize) +, m_aDeflater(DEFAULT_COMPRESSION, true) +{ +} + +ZipOutputEntry::ZipOutputEntry( + const css::uno::Reference< css::io::XOutputStream >& rxOutput, + const uno::Reference< uno::XComponentContext >& rxContext, + ZipEntry& rEntry, + ZipPackageStream* pStream, + bool bEncrypt) +: ZipOutputEntry( rxOutput, rxContext, rEntry, pStream, bEncrypt, true) +{ +} + +void ZipOutputEntry::write( const Sequence< sal_Int8 >& rBuffer ) +{ + if (!m_aDeflater.finished()) + { + m_aDeflater.setInputSegment(rBuffer); + while (!m_aDeflater.needsInput()) + doDeflate(); + processInput(rBuffer); + } +} + +void ZipOutputEntry::doDeflate() +{ + sal_Int32 nLength = m_aDeflater.doDeflateSegment(m_aDeflateBuffer, m_aDeflateBuffer.getLength()); + processDeflated( m_aDeflateBuffer, nLength ); +} + +void ZipOutputEntry::finishDeflater() +{ + m_aDeflater.finish(); + while (!m_aDeflater.finished()) + doDeflate(); +} + +sal_Int64 ZipOutputEntry::getDeflaterTotalIn() const +{ + return m_aDeflater.getTotalIn(); +} + +sal_Int64 ZipOutputEntry::getDeflaterTotalOut() const +{ + return m_aDeflater.getTotalOut(); +} + +void ZipOutputEntry::deflaterReset() +{ + m_aDeflater.reset(); +} + +bool ZipOutputEntry::isDeflaterFinished() const +{ + return m_aDeflater.finished(); +} + + ZipOutputEntryInThread::ZipOutputEntryInThread( const uno::Reference< uno::XComponentContext >& rxContext, ZipEntry& rEntry, @@ -301,4 +348,70 @@ void ZipOutputEntry::writeStream(const uno::Reference< io::XInputStream >& xInSt closeEntry(); } + +ZipOutputEntryParallel::ZipOutputEntryParallel( + const css::uno::Reference< css::io::XOutputStream >& rxOutput, + const uno::Reference< uno::XComponentContext >& rxContext, + ZipEntry& rEntry, + ZipPackageStream* pStream, + bool bEncrypt) +: ZipOutputEntryBase(rxOutput, rxContext, rEntry, pStream, bEncrypt, true) +, totalIn(0) +, totalOut(0) +{ +} + +void ZipOutputEntryParallel::writeStream(const uno::Reference< io::XInputStream >& xInStream) +{ + sal_Int64 toRead = xInStream->available(); + uno::Sequence< sal_Int8 > inBuffer( toRead ); + sal_Int64 read = xInStream->readBytes(inBuffer, toRead); + if (read < toRead) + inBuffer.realloc( read ); + while( xInStream->available() > 0 ) + { // We didn't get the full size from available(). + uno::Sequence< sal_Int8 > buf( xInStream->available()); + read = xInStream->readBytes( buf, xInStream->available()); + sal_Int64 oldSize = inBuffer.getLength(); + inBuffer.realloc( oldSize + read ); + std::copy( buf.begin(), buf.end(), inBuffer.begin() + oldSize ); + } + ZipUtils::ThreadedDeflater deflater( DEFAULT_COMPRESSION ); + totalIn = inBuffer.getLength(); + deflater.startDeflate( inBuffer ); + processInput( inBuffer ); + deflater.waitForTasks(); + uno::Sequence< sal_Int8 > outBuffer = deflater.getOutput(); + deflater.clear(); // release memory + totalOut = outBuffer.getLength(); + processDeflated(outBuffer, outBuffer.getLength()); + closeEntry(); +} + +void ZipOutputEntryParallel::finishDeflater() +{ + // ThreadedDeflater is called synchronously in one call, so nothing to do here. +} + +sal_Int64 ZipOutputEntryParallel::getDeflaterTotalIn() const +{ + return totalIn; +} + +sal_Int64 ZipOutputEntryParallel::getDeflaterTotalOut() const +{ + return totalOut; +} + +void ZipOutputEntryParallel::deflaterReset() +{ + totalIn = 0; + totalOut = 0; +} + +bool ZipOutputEntryParallel::isDeflaterFinished() const +{ + return true; +} + /* vim:set shiftwidth=4 softtabstop=4 expandtab: */ diff --git a/package/source/zippackage/ZipPackageStream.cxx b/package/source/zippackage/ZipPackageStream.cxx index e795776ab065..4692372c8fe5 100644 --- a/package/source/zippackage/ZipPackageStream.cxx +++ b/package/source/zippackage/ZipPackageStream.cxx @@ -35,6 +35,7 @@ #include <string.h> #include <CRC32.hxx> +#include <ThreadedDeflater.hxx> #include <ZipOutputEntry.hxx> #include <ZipOutputStream.hxx> #include <ZipPackage.hxx> @@ -496,7 +497,7 @@ bool ZipPackageStream::saveChild( else if ( m_nStreamMode == PACKAGE_STREAM_RAW ) m_bRawStream = true; - bool bParallelDeflate = false; + bool bBackgroundThreadDeflate = false; bool bTransportOwnEncrStreamAsRaw = false; // During the storing the original size of the stream can be changed // TODO/LATER: get rid of this hack @@ -758,17 +759,25 @@ bool ZipPackageStream::saveChild( } else { - // tdf#89236 Encrypting in parallel does not work - bParallelDeflate = !bToBeEncrypted; - // Do not deflate small streams in a thread. XSeekable's getLength() + // tdf#89236 Encrypting in a background thread does not work + bBackgroundThreadDeflate = !bToBeEncrypted; + // Do not deflate small streams using threads. XSeekable's getLength() // gives the full size, XInputStream's available() may not be // the full size, but it appears that at this point it usually is. - if (xSeek.is() && xSeek->getLength() < 100000) - bParallelDeflate = false; - else if (xStream->available() < 100000) - bParallelDeflate = false; + sal_Int64 estimatedSize = xSeek.is() ? xSeek->getLength() : xStream->available(); - if (bParallelDeflate) + if (estimatedSize > 1000000) + { + // Use ThreadDeflater which will split the stream into blocks and compress + // them in threads, but not in background (i.e. writeStream() will block). + // This is suitable for large data. + bBackgroundThreadDeflate = false; + rZipOut.writeLOC(pTempEntry, bToBeEncrypted); + ZipOutputEntryParallel aZipEntry(rZipOut.getStream(), m_xContext, *pTempEntry, this, bToBeEncrypted); + aZipEntry.writeStream(xStream); + rZipOut.rawCloseEntry(bToBeEncrypted); + } + else if (bBackgroundThreadDeflate && estimatedSize > 100000) { // tdf#93553 limit to a useful amount of pending tasks. Having way too many // tasks pending may use a lot of memory. Take number of available @@ -786,6 +795,7 @@ bool ZipPackageStream::saveChild( } else { + bBackgroundThreadDeflate = false; rZipOut.writeLOC(pTempEntry, bToBeEncrypted); ZipOutputEntry aZipEntry(rZipOut.getStream(), m_xContext, *pTempEntry, this, bToBeEncrypted); aZipEntry.writeStream(xStream); @@ -823,7 +833,7 @@ bool ZipPackageStream::saveChild( } } - if (bSuccess && !bParallelDeflate) + if (bSuccess && !bBackgroundThreadDeflate) successfullyWritten(pTempEntry); if ( aPropSet.hasElements() |