diff --git a/src/liblzma/common/stream_decoder_mt.c b/src/liblzma/common/stream_decoder_mt.c index 1f77ffd3..1fd7dd85 100644 --- a/src/liblzma/common/stream_decoder_mt.c +++ b/src/liblzma/common/stream_decoder_mt.c @@ -39,6 +39,26 @@ typedef enum { } worker_state; +typedef enum { + /// Partial updates (storing of worker thread progress + /// to lzma_outbuf) are disabled. + PARTIAL_DISABLED, + + /// Main thread requests partial updates to be enabled but + /// no partial update has been done by the worker thread yet. + /// + /// Changing from PARTIAL_DISABLED to PARTIAL_START requires + /// use of the worker-thread mutex. Other transitions don't + /// need a mutex. + PARTIAL_START, + + /// Partial updates are enabled and the worker thread has done + /// at least one partial update. + PARTIAL_ENABLED, + +} partial_update_mode; + + struct worker_thread { /// Worker state is protected with our mutex. worker_state state; @@ -90,9 +110,10 @@ struct worker_thread { /// happen if all worker threads were frequently locking the main /// mutex to update their outbuf->pos. /// - /// Only when partial_update is true, this worker thread will update - /// outbuf->pos after each call to the Block decoder. - bool partial_update; + /// Only when partial_update is something else than PARTIAL_DISABLED, + /// this worker thread will update outbuf->pos after each call to + /// the Block decoder. + partial_update_mode partial_update; /// Block decoder lzma_next_coder block_decoder; @@ -303,7 +324,8 @@ worker_enable_partial_update(void *thr_ptr) struct worker_thread *thr = thr_ptr; mythread_sync(thr->mutex) { - thr->partial_update = true; + thr->partial_update = PARTIAL_START; + mythread_cond_signal(&thr->cond); } } @@ -334,6 +356,7 @@ worker_decoder(void *thr_ptr) { struct worker_thread *thr = thr_ptr; size_t in_filled; + partial_update_mode partial_update; lzma_ret ret; next_loop_lock: @@ -371,9 +394,19 @@ next_loop_unlocked: assert(thr->state == THR_RUN); - in_filled = thr->in_filled; + // Update progress info for get_progress(). + thr->progress_in = thr->in_pos; + thr->progress_out = thr->out_pos; - if (in_filled == thr->in_pos) { + // If we don't have any new input, wait for a signal from the main + // thread except if partial output has just been enabled. In that + // case we will do one normal run so that the partial output info + // gets passed to the main thread. The call to block_decoder.code() + // is useless but harmless as it can occur only once per Block. + in_filled = thr->in_filled; + partial_update = thr->partial_update; + + if (in_filled == thr->in_pos && partial_update != PARTIAL_START) { mythread_cond_wait(&thr->cond, &thr->mutex); goto next_loop_unlocked; } @@ -382,7 +415,7 @@ next_loop_unlocked: // Pass the input in small chunks to the Block decoder. // This way we react reasonably fast if we are told to stop/exit, - // and (when partial_update is true) we tell about our progress + // and (when partial update is enabled) we tell about our progress // to the main thread frequently enough. const size_t chunk_size = 16384; if ((in_filled - thr->in_pos) > chunk_size) @@ -395,24 +428,23 @@ next_loop_unlocked: thr->outbuf->allocated, LZMA_RUN); if (ret == LZMA_OK) { - bool partial_update; + if (partial_update != PARTIAL_DISABLED) { + // The main thread uses thr->mutex to change from + // PARTIAL_DISABLED to PARTIAL_START. The main thread + // doesn't care about this variable after that so we + // can safely change it here to PARTIAL_ENABLED + // without a mutex. + thr->partial_update = PARTIAL_ENABLED; - mythread_sync(thr->mutex) { - // Update progress info for get_progress(). - thr->progress_in = thr->in_pos; - thr->progress_out = thr->out_pos; - - partial_update = thr->partial_update; - } - - if (partial_update) { // The main thread is reading decompressed data // from thr->outbuf. Tell the main thread about // our progress. // // NOTE: It's possible that we consumed input without // producing any new output so it's possible that - // only in_pos has changed. + // only in_pos has changed. In case of PARTIAL_START + // it is possible that neither in_pos nor out_pos has + // changed. mythread_sync(thr->coder->mutex) { thr->outbuf->pos = thr->out_pos; thr->outbuf->decoder_in_pos = thr->in_pos; @@ -603,7 +635,7 @@ get_thread(struct lzma_stream_coder *coder, const lzma_allocator *allocator) coder->thr->progress_in = 0; coder->thr->progress_out = 0; - coder->thr->partial_update = false; + coder->thr->partial_update = PARTIAL_DISABLED; return LZMA_OK; } @@ -757,7 +789,8 @@ read_output_and_wait(struct lzma_stream_coder *coder, // without thr->mutex as only the main thread // modifies these variables. decoder_in_pos requires // coder->mutex which we are already holding. - if (coder->thr != NULL && coder->thr->partial_update) { + if (coder->thr != NULL && coder->thr->partial_update + != PARTIAL_DISABLED) { // There is exactly one outbuf in the queue. assert(coder->thr->outbuf == coder->outq.head); assert(coder->thr->outbuf == coder->outq.tail);