14# pragma warning(disable : 4204)
19#include "../common/allocations.h"
20#include "../common/zstd_deps.h"
21#include "../common/mem.h"
22#include "../common/pool.h"
23#include "../common/threading.h"
32#define ZSTD_RESIZE_SEQPOOL 0
35#if defined(DEBUGLEVEL) && (DEBUGLEVEL>=2) \
36 && !defined(_MSC_VER) \
37 && !defined(__MINGW32__)
41# include <sys/times.h>
43# define DEBUG_PRINTHEX(l,p,n) \
46 for (debug_u=0; debug_u<(n); debug_u++) \
47 RAWLOG(l, "%02X ", ((const unsigned char*)(p))[debug_u]); \
51static unsigned long long GetCurrentClockTimeMicroseconds(
void)
53 static clock_t _ticksPerSecond = 0;
54 if (_ticksPerSecond <= 0) _ticksPerSecond = sysconf(_SC_CLK_TCK);
56 {
struct tms junk; clock_t newTicks = (clock_t) times(&junk);
57 return ((((
unsigned long long)newTicks)*(1000000))/_ticksPerSecond);
60#define MUTEX_WAIT_TIME_DLEVEL 6
61#define ZSTD_PTHREAD_MUTEX_LOCK(mutex) \
63 if (DEBUGLEVEL >= MUTEX_WAIT_TIME_DLEVEL) { \
64 unsigned long long const beforeTime = GetCurrentClockTimeMicroseconds(); \
65 ZSTD_pthread_mutex_lock(mutex); \
66 { unsigned long long const afterTime = GetCurrentClockTimeMicroseconds(); \
67 unsigned long long const elapsedTime = (afterTime-beforeTime); \
68 if (elapsedTime > 1000) { \
70 DEBUGLOG(MUTEX_WAIT_TIME_DLEVEL, \
71 "Thread took %llu microseconds to acquire mutex %s \n", \
72 elapsedTime, #mutex); \
75 ZSTD_pthread_mutex_lock(mutex); \
81# define ZSTD_PTHREAD_MUTEX_LOCK(m) ZSTD_pthread_mutex_lock(m)
82# define DEBUG_PRINTHEX(l,p,n) do { } while (0)
95static const buffer_t g_nullBuffer = { NULL, 0 };
108 DEBUGLOG(3,
"ZSTDMT_freeBufferPool (address:%08X)", (
U32)(
size_t)bufPool);
109 if (!bufPool)
return;
122static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(
unsigned maxNbBuffers, ZSTD_customMem cMem)
126 if (bufPool==NULL)
return NULL;
133 ZSTDMT_freeBufferPool(bufPool);
139 bufPool->
cMem = cMem;
146 size_t const poolSize =
sizeof(*bufPool);
149 size_t totalBufferSize = 0;
155 return poolSize + arraySize + totalBufferSize;
162static void ZSTDMT_setBufferSize(
ZSTDMT_bufferPool*
const bufPool,
size_t const bSize)
165 DEBUGLOG(4,
"ZSTDMT_setBufferSize: bSize = %u", (
U32)bSize);
173 if (srcBufPool==NULL)
return NULL;
177 { ZSTD_customMem
const cMem = srcBufPool->
cMem;
180 ZSTDMT_freeBufferPool(srcBufPool);
181 newBufPool = ZSTDMT_createBufferPool(maxNbBuffers, cMem);
182 if (newBufPool==NULL)
return newBufPool;
183 ZSTDMT_setBufferSize(newBufPool, bSize);
199 size_t const availBufferSize = buf.
capacity;
201 if ((availBufferSize >= bSize) & ((availBufferSize>>3) <= bSize)) {
203 DEBUGLOG(5,
"ZSTDMT_getBuffer: provide buffer %u of size %u",
209 DEBUGLOG(5,
"ZSTDMT_getBuffer: existing buffer does not meet size conditions => freeing");
214 DEBUGLOG(5,
"ZSTDMT_getBuffer: create a new buffer");
217 buffer.
start = start;
218 buffer.
capacity = (start==NULL) ? 0 : bSize;
220 DEBUGLOG(5,
"ZSTDMT_getBuffer: buffer allocation failure !!");
222 DEBUGLOG(5,
"ZSTDMT_getBuffer: created buffer of size %u", (
U32)bSize);
228#if ZSTD_RESIZE_SEQPOOL
240 newBuffer.
start = start;
241 newBuffer.
capacity = start == NULL ? 0 : bSize;
245 DEBUGLOG(5,
"ZSTDMT_resizeBuffer: created buffer of size %u", (
U32)bSize);
248 DEBUGLOG(5,
"ZSTDMT_resizeBuffer: buffer allocation failure !!");
257 DEBUGLOG(5,
"ZSTDMT_releaseBuffer");
258 if (buf.
start == NULL)
return;
262 DEBUGLOG(5,
"ZSTDMT_releaseBuffer: stored buffer of size %u in slot %u",
269 DEBUGLOG(5,
"ZSTDMT_releaseBuffer: pool capacity reached => freeing ");
278#define BUF_POOL_MAX_NB_BUFFERS(nbWorkers) (2*(nbWorkers) + 3)
282#define SEQ_POOL_MAX_NB_BUFFERS(nbWorkers) (nbWorkers)
290 return ZSTDMT_sizeof_bufferPool(seqPool);
312 return kNullRawSeqStore;
314 return bufferToSeq(ZSTDMT_getBuffer(seqPool));
317#if ZSTD_RESIZE_SEQPOOL
320 return bufferToSeq(ZSTDMT_resizeBuffer(seqPool, seqToBuffer(seq)));
326 ZSTDMT_releaseBuffer(seqPool, seqToBuffer(seq));
329static void ZSTDMT_setNbSeq(
ZSTDMT_seqPool*
const seqPool,
size_t const nbSeq)
331 ZSTDMT_setBufferSize(seqPool, nbSeq *
sizeof(
rawSeq));
334static ZSTDMT_seqPool* ZSTDMT_createSeqPool(
unsigned nbWorkers, ZSTD_customMem cMem)
337 if (seqPool == NULL)
return NULL;
338 ZSTDMT_setNbSeq(seqPool, 0);
344 ZSTDMT_freeBufferPool(seqPool);
386 if (!cctxPool)
return NULL;
393 if (!cctxPool->
cctxs) {
394 ZSTDMT_freeCCtxPool(cctxPool);
397 cctxPool->
cMem = cMem;
399 if (!cctxPool->
cctxs[0]) { ZSTDMT_freeCCtxPool(cctxPool);
return NULL; }
401 DEBUGLOG(3,
"cctxPool created, with %u workers", nbWorkers);
408 if (srcPool==NULL)
return NULL;
409 if (nbWorkers <= srcPool->totalCCtx)
return srcPool;
411 { ZSTD_customMem
const cMem = srcPool->
cMem;
412 ZSTDMT_freeCCtxPool(srcPool);
413 return ZSTDMT_createCCtxPool(nbWorkers, cMem);
421 {
unsigned const nbWorkers = cctxPool->
totalCCtx;
422 size_t const poolSize =
sizeof(*cctxPool);
424 size_t totalCCtxSize = 0;
426 for (u=0; u<nbWorkers; u++) {
431 return poolSize + arraySize + totalCCtxSize;
446 DEBUGLOG(5,
"create one more CCtx");
452 if (cctx==NULL)
return;
458 DEBUGLOG(4,
"CCtx pool overflow : free cctx");
490 ZSTD_CCtx_params params,
492 const void* dict,
size_t const dictSize,
493 ZSTD_dictContentType_e dictContentType)
496 if (params.ldmParams.enableLdm == ZSTD_ps_enable) {
497 DEBUGLOG(4,
"LDM window size = %u KB", (1U << params.cParams.windowLog) >> 10);
499 assert(params.ldmParams.hashLog >= params.ldmParams.bucketSizeLog);
500 assert(params.ldmParams.hashRateLog < 32);
502 ZSTD_memset(¶ms.ldmParams, 0,
sizeof(params.ldmParams));
505 if (params.fParams.checksumFlag)
507 if (params.ldmParams.enableLdm == ZSTD_ps_enable) {
508 ZSTD_customMem cMem = params.customMem;
509 unsigned const hashLog = params.ldmParams.hashLog;
510 size_t const hashSize = ((size_t)1 << hashLog) *
sizeof(
ldmEntry_t);
511 unsigned const bucketLog =
512 params.ldmParams.hashLog - params.ldmParams.bucketSizeLog;
513 unsigned const prevBucketLog =
514 serialState->
params.ldmParams.hashLog -
515 serialState->
params.ldmParams.bucketSizeLog;
516 size_t const numBuckets = (size_t)1 << bucketLog;
539 if (dictContentType == ZSTD_dct_rawContent) {
540 BYTE const*
const dictEnd = (
const BYTE*)dict + dictSize;
553 serialState->
params = params;
554 serialState->
params.jobSize = (
U32)jobSize;
558static int ZSTDMT_serialState_init(
serialState_t* serialState)
569static void ZSTDMT_serialState_free(
serialState_t* serialState)
571 ZSTD_customMem cMem = serialState->
params.customMem;
580static void ZSTDMT_serialState_update(
serialState_t* serialState,
587 DEBUGLOG(5,
"wait for serialState->cond");
593 if (serialState->
params.ldmParams.enableLdm == ZSTD_ps_enable) {
612 if (serialState->
params.fParams.checksumFlag && src.
size > 0)
620 if (seqStore.
size > 0) {
622 assert(serialState->
params.ldmParams.enableLdm == ZSTD_ps_enable);
626static void ZSTDMT_serialState_ensureFinished(
serialState_t* serialState,
627 unsigned jobID,
size_t cSize)
632 DEBUGLOG(5,
"Skipping past job %u because of error", jobID);
650static const range_t kNullRange = { NULL, 0 };
674#define JOB_ERROR(e) \
676 ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); \
678 ZSTD_pthread_mutex_unlock(&job->job_mutex); \
683static void ZSTDMT_compressionJob(
void* jobDescription)
686 ZSTD_CCtx_params jobParams = job->
params;
690 size_t lastCBlockSize = 0;
694 if (dstBuff.
start == NULL) {
695 dstBuff = ZSTDMT_getBuffer(job->
bufPool);
699 if (jobParams.ldmParams.enableLdm == ZSTD_ps_enable && rawSeqStore.
seq == NULL)
705 if (job->
jobID != 0) jobParams.fParams.checksumFlag = 0;
707 jobParams.ldmParams.enableLdm = ZSTD_ps_disable;
709 jobParams.nbWorkers = 0;
730 &jobParams, pledgedSrcSize);
735 ZSTDMT_serialState_update(job->
serial, cctx, rawSeqStore, job->
src, job->
jobID);
740 DEBUGLOG(5,
"ZSTDMT_compressionJob: flush and overwrite %u bytes of frame header (not first job)", (
U32)hSize);
746 int const nbChunks = (int)((job->
src.
size + (chunkSize-1)) / chunkSize);
752 if (
sizeof(
size_t) >
sizeof(int))
assert(job->
src.
size < ((
size_t)INT_MAX) * chunkSize);
753 DEBUGLOG(5,
"ZSTDMT_compressionJob: compress %u bytes in %i blocks", (
U32)job->
src.
size, nbChunks);
755 for (chunkNb = 1; chunkNb < nbChunks; chunkNb++) {
759 op += cSize;
assert(op < oend);
763 job->
consumed = chunkSize * chunkNb;
764 DEBUGLOG(5,
"ZSTDMT_compressionJob: compress new block : cSize==%u bytes (total: %u)",
771 assert((chunkSize & (chunkSize - 1)) == 0);
772 if ((nbChunks > 0) | job->
lastJob ) {
773 size_t const lastBlockSize1 = job->
src.
size & (chunkSize-1);
774 size_t const lastBlockSize = ((lastBlockSize1==0) & (job->
src.
size>=chunkSize)) ? chunkSize : lastBlockSize1;
775 size_t const cSize = (job->
lastJob) ?
779 lastCBlockSize = cSize;
795 ZSTDMT_releaseSeq(job->
seqPool, rawSeqStore);
796 ZSTDMT_releaseCCtx(job->
cctxPool, cctx);
800 job->
cSize += lastCBlockSize;
831static const roundBuff_t kNullRoundBuff = {NULL, 0, 0};
833#define RSYNC_LENGTH 32
841#define RSYNC_MIN_BLOCK_LOG ZSTD_BLOCKSIZELOG_MAX
842#define RSYNC_MIN_BLOCK_SIZE (1<<RSYNC_MIN_BLOCK_LOG)
881 if (jobTable == NULL)
return;
882 for (jobNb=0; jobNb<nbJobs; jobNb++) {
895 U32 const nbJobs = 1 << nbJobsLog2;
900 if (jobTable==NULL)
return NULL;
902 for (jobNb=0; jobNb<nbJobs; jobNb++) {
906 if (initError != 0) {
907 ZSTDMT_freeJobsTable(jobTable, nbJobs, cMem);
913static size_t ZSTDMT_expandJobsTable (
ZSTDMT_CCtx* mtctx,
U32 nbWorkers) {
914 U32 nbJobs = nbWorkers + 2;
918 mtctx->
jobs = ZSTDMT_createJobsTable(&nbJobs, mtctx->
cMem);
919 if (mtctx->
jobs==NULL)
return ERROR(memory_allocation);
920 assert((nbJobs != 0) && ((nbJobs & (nbJobs - 1)) == 0));
929static size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params,
unsigned nbWorkers)
937 U32 nbJobs = nbWorkers + 2;
939 DEBUGLOG(3,
"ZSTDMT_createCCtx_advanced (nbWorkers = %u)", nbWorkers);
941 if (nbWorkers < 1)
return NULL;
943 if ((cMem.customAlloc!=NULL) ^ (cMem.customFree!=NULL))
948 if (!mtctx)
return NULL;
949 ZSTDMT_CCtxParam_setNbWorkers(&mtctx->
params, nbWorkers);
960 mtctx->
jobs = ZSTDMT_createJobsTable(&nbJobs, cMem);
961 assert(nbJobs > 0);
assert((nbJobs & (nbJobs - 1)) == 0);
964 mtctx->
cctxPool = ZSTDMT_createCCtxPool(nbWorkers, cMem);
965 mtctx->
seqPool = ZSTDMT_createSeqPool(nbWorkers, cMem);
966 initError = ZSTDMT_serialState_init(&mtctx->
serial);
972 DEBUGLOG(3,
"mt_cctx created, for %u threads", nbWorkers);
978#ifdef ZSTD_MULTITHREAD
991static void ZSTDMT_releaseAllJobResources(
ZSTDMT_CCtx* mtctx)
994 DEBUGLOG(3,
"ZSTDMT_releaseAllJobResources");
995 for (jobID=0; jobID <= mtctx->
jobIDMask; jobID++) {
1013static void ZSTDMT_waitForAllJobsCompleted(
ZSTDMT_CCtx* mtctx)
1015 DEBUGLOG(4,
"ZSTDMT_waitForAllJobsCompleted");
1030 if (mtctx==NULL)
return 0;
1033 ZSTDMT_releaseAllJobResources(mtctx);
1035 ZSTDMT_freeBufferPool(mtctx->
bufPool);
1036 ZSTDMT_freeCCtxPool(mtctx->
cctxPool);
1037 ZSTDMT_freeSeqPool(mtctx->
seqPool);
1038 ZSTDMT_serialState_free(&mtctx->
serial);
1048 if (mtctx == NULL)
return 0;
1049 return sizeof(*mtctx)
1051 + ZSTDMT_sizeof_bufferPool(mtctx->
bufPool)
1053 + ZSTDMT_sizeof_CCtxPool(mtctx->
cctxPool)
1054 + ZSTDMT_sizeof_seqPool(mtctx->
seqPool)
1062static size_t ZSTDMT_resize(
ZSTDMT_CCtx* mtctx,
unsigned nbWorkers)
1067 if (mtctx->
bufPool == NULL)
return ERROR(memory_allocation);
1069 if (mtctx->
cctxPool == NULL)
return ERROR(memory_allocation);
1070 mtctx->
seqPool = ZSTDMT_expandSeqPool(mtctx->
seqPool, nbWorkers);
1071 if (mtctx->
seqPool == NULL)
return ERROR(memory_allocation);
1072 ZSTDMT_CCtxParam_setNbWorkers(&mtctx->
params, nbWorkers);
1082 U32 const saved_wlog = mtctx->
params.cParams.windowLog;
1083 int const compressionLevel = cctxParams->compressionLevel;
1084 DEBUGLOG(5,
"ZSTDMT_updateCParams_whileCompressing (level:%i)",
1086 mtctx->
params.compressionLevel = compressionLevel;
1088 cParams.windowLog = saved_wlog;
1089 mtctx->
params.cParams = cParams;
1099 ZSTD_frameProgression fps;
1100 DEBUGLOG(5,
"ZSTDMT_getFrameProgression");
1103 fps.produced = fps.flushed = mtctx->
produced;
1105 fps.nbActiveWorkers = 0;
1108 DEBUGLOG(6,
"ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)",
1110 for (jobNb = mtctx->
doneJobID ; jobNb < lastJobNb ; jobNb++) {
1111 unsigned const wJobID = jobNb & mtctx->
jobIDMask;
1114 {
size_t const cResult = jobPtr->
cSize;
1115 size_t const produced =
ZSTD_isError(cResult) ? 0 : cResult;
1117 assert(flushed <= produced);
1118 fps.ingested += jobPtr->
src.
size;
1120 fps.produced += produced;
1121 fps.flushed += flushed;
1134 unsigned const jobID = mtctx->
doneJobID;
1135 assert(jobID <= mtctx->nextJobID);
1136 if (jobID == mtctx->
nextJobID)
return 0;
1139 {
unsigned const wJobID = jobID & mtctx->
jobIDMask;
1142 {
size_t const cResult = jobPtr->
cSize;
1143 size_t const produced =
ZSTD_isError(cResult) ? 0 : cResult;
1145 assert(flushed <= produced);
1147 toFlush = produced - flushed;
1168static unsigned ZSTDMT_computeTargetJobLog(
const ZSTD_CCtx_params* params)
1171 if (params->ldmParams.enableLdm == ZSTD_ps_enable) {
1175 jobLog =
MAX(21,
ZSTD_cycleLog(params->cParams.chainLog, params->cParams.strategy) + 3);
1177 jobLog =
MAX(20, params->cParams.windowLog + 2);
1205 assert(0 <= ovlog && ovlog <= 9);
1206 if (ovlog == 0)
return ZSTDMT_overlapLog_default(strat);
1210static size_t ZSTDMT_computeOverlapSize(
const ZSTD_CCtx_params* params)
1212 int const overlapRLog = 9 - ZSTDMT_overlapLog(params->overlapLog, params->cParams.strategy);
1213 int ovLog = (overlapRLog >= 8) ? 0 : (params->cParams.windowLog - overlapRLog);
1214 assert(0 <= overlapRLog && overlapRLog <= 8);
1215 if (params->ldmParams.enableLdm == ZSTD_ps_enable) {
1220 ovLog =
MIN(params->cParams.windowLog, ZSTDMT_computeTargetJobLog(params) - 2)
1223 assert(0 <= ovLog && ovLog <= ZSTD_WINDOWLOG_MAX);
1224 DEBUGLOG(4,
"overlapLog : %i", params->overlapLog);
1225 DEBUGLOG(4,
"overlap size : %i", 1 << ovLog);
1226 return (ovLog==0) ? 0 : (size_t)1 << ovLog;
1235 const void* dict,
size_t dictSize, ZSTD_dictContentType_e dictContentType,
1236 const ZSTD_CDict* cdict, ZSTD_CCtx_params params,
1237 unsigned long long pledgedSrcSize)
1239 DEBUGLOG(4,
"ZSTDMT_initCStream_internal (pledgedSrcSize=%u, nbWorkers=%u, cctxPool=%u)",
1244 assert(!((dict) && (cdict)));
1247 if (params.nbWorkers != mtctx->
params.nbWorkers)
1253 DEBUGLOG(4,
"ZSTDMT_initCStream_internal: %u workers", params.nbWorkers);
1256 ZSTDMT_waitForAllJobsCompleted(mtctx);
1257 ZSTDMT_releaseAllJobResources(mtctx);
1266 ZSTD_dlm_byCopy, dictContentType,
1267 params.cParams, mtctx->
cMem);
1273 mtctx->
cdict = cdict;
1284 if (params.rsyncable) {
1291 DEBUGLOG(4,
"rsyncLog = %u", rsyncBits);
1302 size_t const windowSize = mtctx->
params.ldmParams.enableLdm == ZSTD_ps_enable ? (1U << mtctx->
params.cParams.windowLog) : 0;
1312 size_t const nbWorkers =
MAX(mtctx->
params.nbWorkers, 1);
1314 size_t const capacity =
MAX(windowSize, sectionsSize) + slackSize;
1321 return ERROR(memory_allocation);
1338 dict, dictSize, dictContentType))
1339 return ERROR(memory_allocation);
1361 job->
src = kNullRange;
1373 DEBUGLOG(5,
"ZSTDMT_createCompressionJob: will not create new job : table is full");
1380 DEBUGLOG(5,
"ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ",
1416 mtctx->
params.fParams.checksumFlag = 0;
1421 DEBUGLOG(5,
"ZSTDMT_createCompressionJob: creating a last empty block to end frame");
1423 ZSTDMT_writeLastEmptyBlock(mtctx->
jobs + jobID);
1429 DEBUGLOG(5,
"ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u, jobNb == %u (mod:%u))",
1439 DEBUGLOG(5,
"ZSTDMT_createCompressionJob: no worker available for job %u", mtctx->
nextJobID);
1455 DEBUGLOG(5,
"ZSTDMT_flushProduced (blocking:%u , job %u <= %u)",
1465 DEBUGLOG(5,
"job %u is completely consumed (%u == %u) => don't wait for cond, there will be none",
1469 DEBUGLOG(5,
"waiting for something to flush from job %u (currently flushed: %u bytes)",
1475 {
size_t cSize = mtctx->
jobs[wJobID].
cSize;
1476 size_t const srcConsumed = mtctx->
jobs[wJobID].
consumed;
1477 size_t const srcSize = mtctx->
jobs[wJobID].
src.
size;
1480 DEBUGLOG(5,
"ZSTDMT_flushProduced: job %u : compression error detected : %s",
1482 ZSTDMT_waitForAllJobsCompleted(mtctx);
1483 ZSTDMT_releaseAllJobResources(mtctx);
1487 assert(srcConsumed <= srcSize);
1488 if ( (srcConsumed == srcSize)
1491 DEBUGLOG(4,
"ZSTDMT_flushProduced: writing checksum : %08X \n", checksum);
1500 DEBUGLOG(5,
"ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u, generated:%u)",
1513 if ( (srcConsumed == srcSize)
1515 DEBUGLOG(5,
"Job %u completed (%u bytes), moving to next one",
1528 if (srcSize > srcConsumed)
return 1;
1545 unsigned const firstJobID = mtctx->
doneJobID;
1546 unsigned const lastJobID = mtctx->
nextJobID;
1549 for (jobID = firstJobID; jobID < lastJobID; ++jobID) {
1550 unsigned const wJobID = jobID & mtctx->
jobIDMask;
1557 if (consumed < mtctx->jobs[wJobID].src.size) {
1559 if (range.
size == 0) {
1561 range = mtctx->
jobs[wJobID].
src;
1579 if (rangeStart == NULL || bufferStart == NULL)
1583 BYTE const*
const bufferEnd = bufferStart + buffer.
capacity;
1584 BYTE const*
const rangeEnd = rangeStart + range.
size;
1587 if (bufferStart == bufferEnd || rangeStart == rangeEnd)
1590 return bufferStart < rangeEnd && rangeStart < bufferEnd;
1599 DEBUGLOG(5,
"ZSTDMT_doesOverlapWindow");
1605 DEBUGLOG(5,
"extDict [0x%zx, 0x%zx)",
1606 (
size_t)extDict.
start,
1608 DEBUGLOG(5,
"prefix [0x%zx, 0x%zx)",
1609 (
size_t)prefix.
start,
1612 return ZSTDMT_isOverlapped(buffer, extDict)
1613 || ZSTDMT_isOverlapped(buffer, prefix);
1618 if (mtctx->
params.ldmParams.enableLdm == ZSTD_ps_enable) {
1620 DEBUGLOG(5,
"ZSTDMT_waitForLdmComplete");
1621 DEBUGLOG(5,
"source [0x%zx, 0x%zx)",
1622 (
size_t)buffer.
start,
1626 DEBUGLOG(5,
"Waiting for LDM to finish...");
1629 DEBUGLOG(6,
"Done waiting for LDM to finish");
1639static int ZSTDMT_tryGetInputRange(
ZSTDMT_CCtx* mtctx)
1641 range_t const inUse = ZSTDMT_getInputDataInUse(mtctx);
1646 DEBUGLOG(5,
"ZSTDMT_tryGetInputRange");
1650 if (spaceLeft < target) {
1657 buffer.
start = start;
1659 if (ZSTDMT_isOverlapped(buffer, inUse)) {
1660 DEBUGLOG(5,
"Waiting for buffer...");
1663 ZSTDMT_waitForLdmComplete(mtctx, buffer);
1671 if (ZSTDMT_isOverlapped(buffer, inUse)) {
1672 DEBUGLOG(5,
"Waiting for buffer...");
1677 ZSTDMT_waitForLdmComplete(mtctx, buffer);
1679 DEBUGLOG(5,
"Using prefix range [%zx, %zx)",
1682 DEBUGLOG(5,
"Using source range [%zx, %zx)",
1683 (
size_t)buffer.
start,
1707 BYTE const*
const istart = (
BYTE const*)input.src + input.pos;
1717 syncPoint.flush = 0;
1718 if (!mtctx->
params.rsyncable)
1749 hash = ZSTD_rollingHash_append(hash, istart, pos);
1761 if ((hash & hitMask) == hitMask) {
1767 syncPoint.toLoad = 0;
1768 syncPoint.flush = 1;
1781 for (; pos < syncPoint.toLoad; ++pos) {
1790 if ((hash & hitMask) == hitMask) {
1791 syncPoint.toLoad = pos + 1;
1792 syncPoint.flush = 1;
1817 unsigned forwardInputProgress = 0;
1818 DEBUGLOG(5,
"ZSTDMT_compressStream_generic (endOp=%u, srcSize=%u)",
1819 (
U32)endOp, (
U32)(input->size - input->pos));
1820 assert(output->pos <= output->size);
1821 assert(input->pos <= input->size);
1825 return ERROR(stage_wrong);
1830 && (input->size > input->pos) ) {
1833 if (!ZSTDMT_tryGetInputRange(mtctx)) {
1837 DEBUGLOG(5,
"ZSTDMT_tryGetInputRange failed");
1840 DEBUGLOG(5,
"ZSTDMT_tryGetInputRange completed successfully : mtctx->inBuff.buffer.start = %p", mtctx->
inBuff.
buffer.
start);
1843 syncPoint_t const syncPoint = findSynchronizationPoint(mtctx, *input);
1848 DEBUGLOG(5,
"ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u",
1851 input->pos += syncPoint.
toLoad;
1853 forwardInputProgress = syncPoint.
toLoad>0;
1856 if ((input->pos < input->size) && (endOp ==
ZSTD_e_end)) {
1873 FORWARD_IF_ERROR( ZSTDMT_createCompressionJob(mtctx, jobSize, endOp) ,
"");
1877 {
size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress, endOp);
1878 if (input->pos < input->size)
return MAX(remainingToFlush, 1);
1879 DEBUGLOG(5,
"end of ZSTDMT_compressStream_generic: remainingToFlush = %u", (
U32)remainingToFlush);
1880 return remainingToFlush;
MEM_STATIC void ZSTD_customFree(void *ptr, ZSTD_customMem customMem)
MEM_STATIC void * ZSTD_customMalloc(size_t size, ZSTD_customMem customMem)
MEM_STATIC void * ZSTD_customCalloc(size_t size, ZSTD_customMem customMem)
MEM_STATIC unsigned ZSTD_highbit32(U32 val)
#define assert(condition)
#define FORWARD_IF_ERROR(err,...)
struct XXH64_state_s XXH64_state_t
The opaque state struct for the XXH64 streaming API.
MEM_STATIC void MEM_writeLE32(void *memPtr, U32 val32)
int POOL_resize(POOL_ctx *ctx, size_t numThreads)
size_t POOL_sizeof(const POOL_ctx *ctx)
void POOL_free(POOL_ctx *ctx)
POOL_ctx * POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem)
int POOL_tryAdd(POOL_ctx *ctx, POOL_function function, void *opaque)
ZSTD_blockState_t blockState
ZSTD_matchState_t matchState
ZSTDMT_CCtxPool * cctxPool
unsigned long long frameContentSize
unsigned long long consumed
unsigned allJobsCompleted
unsigned long long produced
ZSTDMT_bufferPool * bufPool
ZSTDMT_jobDescription * jobs
ZSTD_pthread_mutex_t poolMutex
ZSTD_pthread_mutex_t poolMutex
unsigned long long fullFrameSize
unsigned frameChecksumNeeded
ZSTDMT_CCtxPool * cctxPool
ZSTD_pthread_cond_t job_cond
ZSTDMT_bufferPool * bufPool
ZSTD_pthread_mutex_t job_mutex
ZSTD_pthread_mutex_t ldmWindowMutex
ZSTD_pthread_cond_t ldmWindowCond
ZSTD_pthread_mutex_t mutex
#define ZSTD_pthread_mutex_init(a, b)
#define ZSTD_pthread_mutex_unlock(a)
#define ZSTD_pthread_cond_signal(a)
#define ZSTD_pthread_cond_wait(a, b)
#define ZSTD_pthread_cond_broadcast(a)
#define ZSTD_pthread_mutex_lock(a)
#define ZSTD_pthread_mutex_destroy(a)
#define ZSTD_pthread_cond_init(a, b)
#define ZSTD_pthread_cond_destroy(a)
#define ZSTD_CONTENTSIZE_UNKNOWN
#define ZSTD_BLOCKSIZE_MAX
const char * ZSTD_getErrorName(size_t code)
size_t ZSTD_compressBound(size_t srcSize)
size_t ZSTD_sizeof_CCtx(const ZSTD_CCtx *cctx)
ZSTD_CCtx * ZSTD_createCCtx_advanced(ZSTD_customMem customMem)
size_t ZSTD_checkCParams(ZSTD_compressionParameters cParams)
void ZSTD_CCtx_trace(ZSTD_CCtx *cctx, size_t extraCSize)
size_t ZSTD_compressEnd_public(ZSTD_CCtx *cctx, void *dst, size_t dstCapacity, const void *src, size_t srcSize)
ZSTD_compressionParameters ZSTD_getCParamsFromCCtxParams(const ZSTD_CCtx_params *CCtxParams, U64 srcSizeHint, size_t dictSize, ZSTD_cParamMode_e mode)
size_t ZSTD_freeCCtx(ZSTD_CCtx *cctx)
size_t ZSTD_freeCDict(ZSTD_CDict *cdict)
size_t ZSTD_compressBegin_advanced_internal(ZSTD_CCtx *cctx, const void *dict, size_t dictSize, ZSTD_dictContentType_e dictContentType, ZSTD_dictTableLoadMethod_e dtlm, const ZSTD_CDict *cdict, const ZSTD_CCtx_params *params, unsigned long long pledgedSrcSize)
size_t ZSTD_sizeof_CDict(const ZSTD_CDict *cdict)
void ZSTD_referenceExternalSequences(ZSTD_CCtx *cctx, rawSeq *seq, size_t nbSeq)
U32 ZSTD_cycleLog(U32 hashLog, ZSTD_strategy strat)
size_t ZSTD_CCtxParams_setParameter(ZSTD_CCtx_params *CCtxParams, ZSTD_cParameter param, int value)
size_t ZSTD_writeLastEmptyBlock(void *dst, size_t dstCapacity)
ZSTD_CDict * ZSTD_createCDict_advanced(const void *dictBuffer, size_t dictSize, ZSTD_dictLoadMethod_e dictLoadMethod, ZSTD_dictContentType_e dictContentType, ZSTD_compressionParameters cParams, ZSTD_customMem customMem)
size_t ZSTD_compressContinue_public(ZSTD_CCtx *cctx, void *dst, size_t dstCapacity, const void *src, size_t srcSize)
MEM_STATIC void ZSTD_window_clear(ZSTD_window_t *window)
MEM_STATIC U64 ZSTD_rollingHash_compute(void const *buf, size_t size)
MEM_STATIC U64 ZSTD_rollingHash_rotate(U64 hash, BYTE toRemove, BYTE toAdd, U64 primePower)
MEM_STATIC U64 ZSTD_rollingHash_primePower(U32 length)
MEM_STATIC U32 ZSTD_window_hasExtDict(ZSTD_window_t const window)
MEM_STATIC ZSTD_ALLOW_POINTER_OVERFLOW_ATTR U32 ZSTD_window_update(ZSTD_window_t *window, void const *src, size_t srcSize, int forceNonContiguous)
MEM_STATIC void ZSTD_window_init(ZSTD_window_t *window)
#define ZSTD_memmove(d, s, l)
#define ZSTD_memcpy(d, s, l)
#define ZSTD_memset(p, v, l)
void ZSTD_invalidateRepCodes(ZSTD_CCtx *cctx)
size_t ZSTD_ldm_generateSequences(ldmState_t *ldmState, rawSeqStore_t *sequences, ldmParams_t const *params, void const *src, size_t srcSize)
void ZSTD_ldm_adjustParameters(ldmParams_t *params, ZSTD_compressionParameters const *cParams)
void ZSTD_ldm_fillHashTable(ldmState_t *ldmState, const BYTE *ip, const BYTE *iend, ldmParams_t const *params)
size_t ZSTD_ldm_getMaxNbSeq(ldmParams_t params, size_t maxChunkSize)
size_t ZSTDMT_toFlushNow(ZSTDMT_CCtx *mtctx)
size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx *mtctx, ZSTD_outBuffer *output, ZSTD_inBuffer *input, ZSTD_EndDirective endOp)
ZSTDMT_bufferPool ZSTDMT_seqPool
ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx *mtctx)
void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx *mtctx, const ZSTD_CCtx_params *cctxParams)
#define ZSTD_PTHREAD_MUTEX_LOCK(m)
struct ZSTDMT_bufferPool_s ZSTDMT_bufferPool
#define BUF_POOL_MAX_NB_BUFFERS(nbWorkers)
size_t ZSTDMT_nextInputSizeHint(const ZSTDMT_CCtx *mtctx)
#define RSYNC_MIN_BLOCK_SIZE
size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx *mtctx)
size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx *mtctx)
#define RSYNC_MIN_BLOCK_LOG
#define SEQ_POOL_MAX_NB_BUFFERS(nbWorkers)
size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx *mtctx, const void *dict, size_t dictSize, ZSTD_dictContentType_e dictContentType, const ZSTD_CDict *cdict, ZSTD_CCtx_params params, unsigned long long pledgedSrcSize)
ZSTDMT_CCtx * ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem, ZSTD_threadPool *pool)
MEM_STATIC ZSTDMT_CCtx * ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers, ZSTD_customMem cMem, ZSTD_threadPool *pool)
#define ZSTDMT_JOBSIZE_MAX
#define ZSTDMT_JOBSIZE_MIN
#define ZSTDMT_JOBLOG_MAX
#define ZSTDMT_NBWORKERS_MAX