diff --git a/CHANGELOG.rst b/CHANGELOG.rst index f59dea5..d2a9fa4 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -9,6 +9,8 @@ Changelog version 0.5.1-dev ----------------- ++ Fix a bug where flushing in threaded mode did not write the data to the + output file. + Threaded reading and writing do no longer block exiting when an exception occurs in the main thread. diff --git a/src/zlib_ng/gzip_ng_threaded.py b/src/zlib_ng/gzip_ng_threaded.py index a1cd918..251c0df 100644 --- a/src/zlib_ng/gzip_ng_threaded.py +++ b/src/zlib_ng/gzip_ng_threaded.py @@ -60,7 +60,7 @@ def open(filename, mode="rb", compresslevel=gzip_ng._COMPRESS_LEVEL_TRADEOFF, gzip_file = io.BufferedReader( _ThreadedGzipReader(filename, block_size=block_size)) else: - gzip_file = io.BufferedWriter( + gzip_file = FlushableBufferedWriter( _ThreadedGzipWriter( filename, mode.replace("t", "b"), @@ -167,6 +167,12 @@ def closed(self) -> bool: return self._closed +class FlushableBufferedWriter(io.BufferedWriter): + def flush(self): + super().flush() + self.raw.flush() + + class _ThreadedGzipWriter(io.RawIOBase): """ Write a gzip file using multiple threads. @@ -315,7 +321,7 @@ def write(self, b) -> int: self.input_queues[worker_index].put((data, zdict)) return len(data) - def flush(self): + def _end_gzip_stream(self): self._check_closed() # Wait for all data to be compressed for in_q in self.input_queues: @@ -323,22 +329,27 @@ def flush(self): # Wait for all data to be written for out_q in self.output_queues: out_q.join() + # Write an empty deflate block with a lost block marker. + self.raw.write(zlib_ng.compress(b"", wbits=-15)) + trailer = struct.pack(" None: if self._closed: return - self.flush() + self._end_gzip_stream() self.stop() if self.exception: self.raw.close() self._closed = True raise self.exception - # Write an empty deflate block with a lost block marker. - self.raw.write(zlib_ng.compress(b"", wbits=-15)) - trailer = struct.pack("