Skip to content

Commit

Permalink
implementation of single-threaded brotli decompression (support .br f…
Browse files Browse the repository at this point in the history
…ormat compressed with standard brotli API)
  • Loading branch information
sebres committed Sep 6, 2023
1 parent def5289 commit 80e6bc7
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 15 deletions.
120 changes: 107 additions & 13 deletions C/zstdmt/brotli-mt_decompress.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ struct writelist {

struct BROTLIMT_DCtx_s {

/* threads: 1..BROTLIMT_THREAD_MAX */
/* threads: 0, 1..BROTLIMT_THREAD_MAX */
int threads;

/* should be used for read from input */
Expand Down Expand Up @@ -96,7 +96,7 @@ BROTLIMT_DCtx *BROTLIMT_createDCtx(int threads, int inputsize)
return 0;

/* check threads value */
if (threads < 1 || threads > BROTLIMT_THREAD_MAX)
if (threads < 0 || threads > BROTLIMT_THREAD_MAX)
return 0;

/* setup ctx */
Expand All @@ -112,12 +112,16 @@ BROTLIMT_DCtx *BROTLIMT_createDCtx(int threads, int inputsize)
else
ctx->inputsize = 1024 * 64; /* 64K buffer */

pthread_mutex_init(&ctx->read_mutex, NULL);
pthread_mutex_init(&ctx->write_mutex, NULL);
if (threads) {
pthread_mutex_init(&ctx->read_mutex, NULL);
pthread_mutex_init(&ctx->write_mutex, NULL);

INIT_LIST_HEAD(&ctx->writelist_free);
INIT_LIST_HEAD(&ctx->writelist_busy);
INIT_LIST_HEAD(&ctx->writelist_done);
INIT_LIST_HEAD(&ctx->writelist_free);
INIT_LIST_HEAD(&ctx->writelist_busy);
INIT_LIST_HEAD(&ctx->writelist_done);
} else {
threads = 1;
}

ctx->cwork = (cwork_t *) malloc(sizeof(cwork_t) * threads);
if (!ctx->cwork)
Expand Down Expand Up @@ -376,6 +380,90 @@ static void *pt_decompress(void *arg)
return (void *)result;
}

/* single threaded (standard brotli stream, without header/mt-frames) */
static size_t st_decompress(void *arg)
{
BROTLIMT_DCtx *ctx = (BROTLIMT_DCtx *) arg;
BrotliDecoderResult bres = BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT;
cwork_t *w = &ctx->cwork[0];
BROTLIMT_Buffer Out;
BROTLIMT_Buffer *out = &Out;
BROTLIMT_Buffer *in = &w->in;
BrotliDecoderState *state;
const uint8_t* next_in;
uint8_t* next_out;
int rv;
size_t retval = 0;

/* allocate space for input buffer */
in->allocated = in->size = ctx->inputsize;
in->buf = malloc(in->size);
if (!in->buf)
return MT_ERROR(memory_allocation);
next_in = in->buf;

/* allocate space for output buffer */
out->allocated = out->size = ctx->inputsize * 4;
out->buf = malloc(out->size);
if (!out->buf) {
free(in->buf);
return MT_ERROR(memory_allocation);
}
next_out = out->buf;

state = BrotliDecoderCreateInstance(NULL, NULL, NULL);
if (!state) {
free(in->buf);
free(out->buf);
return MT_ERROR(memory_allocation);
}
BrotliDecoderSetParameter(state, BROTLI_DECODER_PARAM_LARGE_WINDOW, 1);

while (1) {
if (bres == BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT) {
in->size = in->allocated;
rv = ctx->fn_read(ctx->arg_read, in);
if (in->size == 0) break;
if (rv != 0) {
retval = mt_error(rv);
goto done;
}
next_in = in->buf;
} else if (bres == BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT) {
out->size = next_out - (uint8_t*)out->buf;
rv = ctx->fn_write(ctx->arg_write, out);
if (rv != 0) {
retval = mt_error(rv);
goto done;
}
next_out = out->buf;
} else {
break;
}

bres = BrotliDecoderDecompressStream(state, &in->size, &next_in, &out->size, &next_out, 0);
}

if (bres != BROTLI_DECODER_RESULT_SUCCESS) {
retval = MT_ERROR(data_error); // corrupt input
goto done;
}
out->size = next_out - (uint8_t*)out->buf;
if (out->size != 0) {
rv = ctx->fn_write(ctx->arg_write, out);
if (rv != 0) {
retval = mt_error(rv);
goto done;
}
}

done:
free(in->buf);
free(out->buf);
BrotliDecoderDestroyInstance(state);
return retval;
}

size_t BROTLIMT_decompressDCtx(BROTLIMT_DCtx * ctx, BROTLIMT_RdWr_t * rdwr)
{
unsigned char buf[4];
Expand All @@ -393,6 +481,14 @@ size_t BROTLIMT_decompressDCtx(BROTLIMT_DCtx * ctx, BROTLIMT_RdWr_t * rdwr)
ctx->arg_read = rdwr->arg_read;
ctx->arg_write = rdwr->arg_write;

/* For single threaded brotli (no header, no mt-frames), don't check for
BROTLIMT_MAGIC_SKIPPABLE here, because stdandard brotli stream may also
start with it, so will be mistakenly considered as brotli-mt stream. */
if (ctx->threads == 0) {
/* decompress single threaded */
return st_decompress(ctx);
}

/* check for BROTLIMT_MAGIC_SKIPPABLE */
in->buf = buf;
in->size = 4;
Expand All @@ -402,10 +498,6 @@ size_t BROTLIMT_decompressDCtx(BROTLIMT_DCtx * ctx, BROTLIMT_RdWr_t * rdwr)
if (in->size != 4)
return MT_ERROR(data_error);

/* single threaded with unknown sizes */
if (MEM_readLE32(buf) != BROTLIMT_MAGIC_SKIPPABLE)
return MT_ERROR(data_error);

/* mark unused */
in->buf = 0;
in->size = 0;
Expand Down Expand Up @@ -485,8 +577,10 @@ void BROTLIMT_freeDCtx(BROTLIMT_DCtx * ctx)
if (!ctx)
return;

pthread_mutex_destroy(&ctx->read_mutex);
pthread_mutex_destroy(&ctx->write_mutex);
if (ctx->threads) {
pthread_mutex_destroy(&ctx->read_mutex);
pthread_mutex_destroy(&ctx->write_mutex);
}
free(ctx->cwork);
free(ctx);
ctx = 0;
Expand Down
2 changes: 2 additions & 0 deletions CPP/7zip/Archive/BrotliHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ STDMETHODIMP CHandler::Extract(const UInt32 *indices, UInt32 numItems,
{

NCompress::NBROTLI::CDecoder *decoderSpec = new NCompress::NBROTLI::CDecoder;
decoderSpec->SetNumberOfThreads(0); /* .br - single threaded processing (without header/mt-frames) */
CMyComPtr<ICompressCoder> decoder = decoderSpec;
decoderSpec->SetInStream(_seqStream);

Expand Down Expand Up @@ -256,6 +257,7 @@ static HRESULT UpdateArchive(
CMyComPtr<ICompressProgressInfo> localProgress = localProgressSpec;
localProgressSpec->Init(updateCallback, true);
NCompress::NBROTLI::CEncoder *encoderSpec = new NCompress::NBROTLI::CEncoder;
encoderSpec->SetNumberOfThreads(0); /* .br - single threaded processing (without header/mt-frames) */
CMyComPtr<ICompressCoder> encoder = encoderSpec;
RINOK(props.SetCoderProps(encoderSpec, NULL));
RINOK(encoder->Code(fileInStream, outStream, NULL, NULL, localProgress));
Expand Down
2 changes: 1 addition & 1 deletion CPP/7zip/Compress/BrotliDecoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ STDMETHODIMP CDecoder::SetDecoderProperties2(const Byte * prop, UInt32 size)
STDMETHODIMP CDecoder::SetNumberOfThreads(UInt32 numThreads)
{
const UInt32 kNumThreadsMax = BROTLIMT_THREAD_MAX;
if (numThreads < 1) numThreads = 1;
if (numThreads < 0) numThreads = 0;
if (numThreads > kNumThreadsMax) numThreads = kNumThreadsMax;
_numThreads = numThreads;
return S_OK;
Expand Down
2 changes: 1 addition & 1 deletion CPP/7zip/Compress/BrotliEncoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ STDMETHODIMP CEncoder::Code(ISequentialInStream *inStream,
STDMETHODIMP CEncoder::SetNumberOfThreads(UInt32 numThreads)
{
const UInt32 kNumThreadsMax = BROTLIMT_THREAD_MAX;
if (numThreads < 1) numThreads = 1;
if (numThreads < 0) numThreads = 0;
if (numThreads > kNumThreadsMax) numThreads = kNumThreadsMax;
_numThreads = numThreads;
return S_OK;
Expand Down

0 comments on commit 80e6bc7

Please sign in to comment.