mirror of
https://git.tukaani.org/xz.git
synced 2024-04-04 12:36:23 +02:00
xz: Add preliminary support for --flush-timeout=TIMEOUT.
When --flush-timeout=TIMEOUT is used, xz will use LZMA_SYNC_FLUSH if read() would block and at least TIMEOUT milliseconds has elapsed since the previous flush. This can be useful in realtime-like use cases where the data is simultanously decompressed by another process (possibly on a different computer). If new uncompressed input data is produced slowly, without this option xz could buffer the data for a long time until it would become decompressible from the output. If TIMEOUT is 0, the feature is disabled. This is the default. This commit affects the compression side. Using xz for the decompression side for the above purpose doesn't work yet so well because there is quite a bit of input and output buffering when decompressing. The --long-help or man page were not updated yet. The details of this feature may change.
This commit is contained in:
parent
fa381acaf9
commit
dee6ad3d59
3 changed files with 78 additions and 21 deletions
|
@ -140,6 +140,7 @@ parse_real(args_info *args, int argc, char **argv)
|
||||||
OPT_NO_ADJUST,
|
OPT_NO_ADJUST,
|
||||||
OPT_INFO_MEMORY,
|
OPT_INFO_MEMORY,
|
||||||
OPT_ROBOT,
|
OPT_ROBOT,
|
||||||
|
OPT_FLUSH_TIMEOUT,
|
||||||
};
|
};
|
||||||
|
|
||||||
static const char short_opts[]
|
static const char short_opts[]
|
||||||
|
@ -176,6 +177,7 @@ parse_real(args_info *args, int argc, char **argv)
|
||||||
{ "memory", required_argument, NULL, 'M' }, // Old alias
|
{ "memory", required_argument, NULL, 'M' }, // Old alias
|
||||||
{ "no-adjust", no_argument, NULL, OPT_NO_ADJUST },
|
{ "no-adjust", no_argument, NULL, OPT_NO_ADJUST },
|
||||||
{ "threads", required_argument, NULL, 'T' },
|
{ "threads", required_argument, NULL, 'T' },
|
||||||
|
{ "flush-timeout", required_argument, NULL, OPT_FLUSH_TIMEOUT },
|
||||||
|
|
||||||
{ "extreme", no_argument, NULL, 'e' },
|
{ "extreme", no_argument, NULL, 'e' },
|
||||||
{ "fast", no_argument, NULL, '0' },
|
{ "fast", no_argument, NULL, '0' },
|
||||||
|
@ -483,6 +485,11 @@ parse_real(args_info *args, int argc, char **argv)
|
||||||
opt_auto_adjust = false;
|
opt_auto_adjust = false;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case OPT_FLUSH_TIMEOUT:
|
||||||
|
opt_flush_timeout = str_to_uint64("flush-timeout",
|
||||||
|
optarg, 0, UINT64_MAX);
|
||||||
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
message_try_help();
|
message_try_help();
|
||||||
tuklib_exit(E_ERROR, E_ERROR, false);
|
tuklib_exit(E_ERROR, E_ERROR, false);
|
||||||
|
|
|
@ -586,6 +586,9 @@ coder_normal(file_pair *pair)
|
||||||
if (block_remaining == 0)
|
if (block_remaining == 0)
|
||||||
action = LZMA_FULL_FLUSH;
|
action = LZMA_FULL_FLUSH;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (action == LZMA_RUN && flush_needed)
|
||||||
|
action = LZMA_SYNC_FLUSH;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Let liblzma do the actual work.
|
// Let liblzma do the actual work.
|
||||||
|
@ -601,21 +604,42 @@ coder_normal(file_pair *pair)
|
||||||
strm.avail_out = IO_BUFFER_SIZE;
|
strm.avail_out = IO_BUFFER_SIZE;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ret == LZMA_STREAM_END && action == LZMA_FULL_FLUSH) {
|
if (ret == LZMA_STREAM_END && (action == LZMA_SYNC_FLUSH
|
||||||
// Start a new Block.
|
|| action == LZMA_FULL_FLUSH)) {
|
||||||
action = LZMA_RUN;
|
// Flushing completed. Write the pending data out
|
||||||
|
// immediatelly so that the reading side can
|
||||||
|
// decompress everything compressed so far. Do this
|
||||||
|
// also with LZMA_FULL_FLUSH because if it is combined
|
||||||
|
// with timed LZMA_SYNC_FLUSH the same flushing
|
||||||
|
// timer can be used.
|
||||||
|
if (io_write(pair, &out_buf, IO_BUFFER_SIZE
|
||||||
|
- strm.avail_out))
|
||||||
|
break;
|
||||||
|
|
||||||
if (opt_block_list == NULL) {
|
strm.next_out = out_buf.u8;
|
||||||
block_remaining = opt_block_size;
|
strm.avail_out = IO_BUFFER_SIZE;
|
||||||
} else {
|
|
||||||
// FIXME: Make it work together with
|
|
||||||
// --block-size.
|
|
||||||
if (opt_block_list[list_pos + 1] != 0)
|
|
||||||
++list_pos;
|
|
||||||
|
|
||||||
block_remaining = opt_block_list[list_pos];
|
if (action == LZMA_FULL_FLUSH) {
|
||||||
|
if (opt_block_list == NULL) {
|
||||||
|
block_remaining = opt_block_size;
|
||||||
|
} else {
|
||||||
|
// FIXME: Make it work together with
|
||||||
|
// --block-size.
|
||||||
|
if (opt_block_list[list_pos + 1] != 0)
|
||||||
|
++list_pos;
|
||||||
|
|
||||||
|
block_remaining
|
||||||
|
= opt_block_list[list_pos];
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set the time of the most recent flushing.
|
||||||
|
mytime_set_flush_time();
|
||||||
|
|
||||||
|
// Start a new Block after LZMA_FULL_FLUSH or continue
|
||||||
|
// the same block after LZMA_SYNC_FLUSH.
|
||||||
|
action = LZMA_RUN;
|
||||||
|
|
||||||
} else if (ret != LZMA_OK) {
|
} else if (ret != LZMA_OK) {
|
||||||
// Determine if the return value indicates that we
|
// Determine if the return value indicates that we
|
||||||
// won't continue coding.
|
// won't continue coding.
|
||||||
|
|
|
@ -38,6 +38,13 @@ static bool warn_fchown;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
IO_WAIT_MORE, // Reading or writing is possible.
|
||||||
|
IO_WAIT_ERROR, // Error or user_abort
|
||||||
|
IO_WAIT_TIMEOUT, // poll() timed out
|
||||||
|
} io_wait_ret;
|
||||||
|
|
||||||
|
|
||||||
/// If true, try to create sparse files when decompressing.
|
/// If true, try to create sparse files when decompressing.
|
||||||
static bool try_sparse = true;
|
static bool try_sparse = true;
|
||||||
|
|
||||||
|
@ -130,8 +137,8 @@ io_no_sparse(void)
|
||||||
/// pops up again. There are pselect() (POSIX-1.2001) and ppoll() (not in
|
/// pops up again. There are pselect() (POSIX-1.2001) and ppoll() (not in
|
||||||
/// POSIX) but neither is portable enough in 2013. The self-pipe trick is
|
/// POSIX) but neither is portable enough in 2013. The self-pipe trick is
|
||||||
/// old and very portable.
|
/// old and very portable.
|
||||||
static bool
|
static io_wait_ret
|
||||||
io_wait(file_pair *pair, bool is_reading)
|
io_wait(file_pair *pair, int timeout, bool is_reading)
|
||||||
{
|
{
|
||||||
struct pollfd pfd[2];
|
struct pollfd pfd[2];
|
||||||
|
|
||||||
|
@ -147,10 +154,10 @@ io_wait(file_pair *pair, bool is_reading)
|
||||||
pfd[1].events = POLLIN;
|
pfd[1].events = POLLIN;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
const int ret = poll(pfd, 2, -1);
|
const int ret = poll(pfd, 2, timeout);
|
||||||
|
|
||||||
if (user_abort)
|
if (user_abort)
|
||||||
return true;
|
return IO_WAIT_ERROR;
|
||||||
|
|
||||||
if (ret == -1) {
|
if (ret == -1) {
|
||||||
if (errno == EINTR || errno == EAGAIN)
|
if (errno == EINTR || errno == EAGAIN)
|
||||||
|
@ -160,10 +167,17 @@ io_wait(file_pair *pair, bool is_reading)
|
||||||
is_reading ? pair->src_name
|
is_reading ? pair->src_name
|
||||||
: pair->dest_name,
|
: pair->dest_name,
|
||||||
strerror(errno));
|
strerror(errno));
|
||||||
|
return IO_WAIT_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ret == 0) {
|
||||||
|
assert(opt_flush_timeout != 0);
|
||||||
|
flush_needed = true;
|
||||||
|
return IO_WAIT_TIMEOUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pfd[0].revents != 0)
|
if (pfd[0].revents != 0)
|
||||||
return false;
|
return IO_WAIT_MORE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
@ -583,10 +597,10 @@ io_open_src_real(file_pair *pair)
|
||||||
// will work when open() is used with O_NONBLOCK.
|
// will work when open() is used with O_NONBLOCK.
|
||||||
if (!S_ISREG(pair->src_st.st_mode)) {
|
if (!S_ISREG(pair->src_st.st_mode)) {
|
||||||
signals_unblock();
|
signals_unblock();
|
||||||
const bool ret = io_wait(pair, true);
|
const io_wait_ret ret = io_wait(pair, -1, true);
|
||||||
signals_block();
|
signals_block();
|
||||||
|
|
||||||
if (ret)
|
if (ret != IO_WAIT_MORE)
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
@ -1001,10 +1015,22 @@ io_read(file_pair *pair, io_buf *buf_union, size_t size)
|
||||||
|
|
||||||
#ifndef TUKLIB_DOSLIKE
|
#ifndef TUKLIB_DOSLIKE
|
||||||
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||||
if (!io_wait(pair, true))
|
const io_wait_ret ret = io_wait(pair,
|
||||||
|
mytime_get_flush_timeout(),
|
||||||
|
true);
|
||||||
|
switch (ret) {
|
||||||
|
case IO_WAIT_MORE:
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
return SIZE_MAX;
|
case IO_WAIT_ERROR:
|
||||||
|
return SIZE_MAX;
|
||||||
|
|
||||||
|
case IO_WAIT_TIMEOUT:
|
||||||
|
return size - left;
|
||||||
|
|
||||||
|
default:
|
||||||
|
message_bug();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
@ -1077,7 +1103,7 @@ io_write_buf(file_pair *pair, const uint8_t *buf, size_t size)
|
||||||
|
|
||||||
#ifndef TUKLIB_DOSLIKE
|
#ifndef TUKLIB_DOSLIKE
|
||||||
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||||
if (!io_wait(pair, false))
|
if (io_wait(pair, -1, false) == IO_WAIT_MORE)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
|
Loading…
Reference in a new issue