13#include "../common/allocations.h"
20# pragma warning(disable : 4204)
24#ifdef ZSTD_MULTITHREAD
29typedef struct POOL_job_s {
35 ZSTD_customMem customMem;
37 ZSTD_pthread_t* threads;
38 size_t threadCapacity;
48 size_t numThreadsBusy;
67static void* POOL_thread(
void* opaque) {
69 if (!ctx) {
return NULL; }
74 while ( ctx->queueEmpty
75 || (ctx->numThreadsBusy >= ctx->threadLimit) ) {
86 { POOL_job
const job = ctx->queue[ctx->queueHead];
87 ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
88 ctx->numThreadsBusy++;
89 ctx->queueEmpty = (ctx->queueHead == ctx->queueTail);
94 job.function(job.opaque);
98 ctx->numThreadsBusy--;
107POOL_ctx* ZSTD_createThreadPool(
size_t numThreads) {
116 ZSTD_customMem customMem)
120 if (!numThreads) {
return NULL; }
123 if (!ctx) {
return NULL; }
128 ctx->queueSize = queueSize + 1;
129 ctx->queue = (POOL_job*)
ZSTD_customCalloc(ctx->queueSize *
sizeof(POOL_job), customMem);
132 ctx->numThreadsBusy = 0;
139 if (error) {
POOL_free(ctx);
return NULL; }
143 ctx->threads = (ZSTD_pthread_t*)
ZSTD_customCalloc(numThreads *
sizeof(ZSTD_pthread_t), customMem);
144 ctx->threadCapacity = 0;
145 ctx->customMem = customMem;
147 if (!ctx->threads || !ctx->queue) {
POOL_free(ctx);
return NULL; }
150 for (i = 0; i < numThreads; ++i) {
151 if (ZSTD_pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) {
152 ctx->threadCapacity = i;
156 ctx->threadCapacity = numThreads;
157 ctx->threadLimit = numThreads;
165static void POOL_join(
POOL_ctx* ctx) {
175 for (i = 0; i < ctx->threadCapacity; ++i) {
176 ZSTD_pthread_join(ctx->threads[i]);
181 if (!ctx) {
return; }
196 while(!ctx->queueEmpty || ctx->numThreadsBusy > 0) {
202void ZSTD_freeThreadPool (ZSTD_threadPool* pool) {
207 if (ctx==NULL)
return 0;
209 + ctx->queueSize *
sizeof(POOL_job)
210 + ctx->threadCapacity *
sizeof(ZSTD_pthread_t);
215static int POOL_resize_internal(
POOL_ctx* ctx,
size_t numThreads)
217 if (numThreads <= ctx->threadCapacity) {
218 if (!numThreads)
return 1;
219 ctx->threadLimit = numThreads;
223 { ZSTD_pthread_t*
const threadPool = (ZSTD_pthread_t*)
ZSTD_customCalloc(numThreads *
sizeof(ZSTD_pthread_t), ctx->customMem);
224 if (!threadPool)
return 1;
226 ZSTD_memcpy(threadPool, ctx->threads, ctx->threadCapacity *
sizeof(ZSTD_pthread_t));
228 ctx->threads = threadPool;
231 for (threadId = ctx->threadCapacity; threadId < numThreads; ++threadId) {
232 if (ZSTD_pthread_create(&threadPool[threadId], NULL, &POOL_thread, ctx)) {
233 ctx->threadCapacity = threadId;
238 ctx->threadCapacity = numThreads;
239 ctx->threadLimit = numThreads;
247 if (ctx==NULL)
return 1;
249 result = POOL_resize_internal(ctx, numThreads);
261static int isQueueFull(
POOL_ctx const* ctx) {
262 if (ctx->queueSize > 1) {
263 return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
265 return (ctx->numThreadsBusy == ctx->threadLimit) ||
278 if (ctx->shutdown)
return;
281 ctx->queue[ctx->queueTail] = job;
282 ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
291 while (isQueueFull(ctx) && (!ctx->shutdown)) {
294 POOL_add_internal(ctx, function, opaque);
303 if (isQueueFull(ctx)) {
307 POOL_add_internal(ctx, function, opaque);
340 assert(!ctx || ctx == &g_poolCtx);
345 assert(!ctx || ctx == &g_poolCtx);
350 (void)ctx; (void)numThreads;
366 if (ctx==NULL)
return 0;
367 assert(ctx == &g_poolCtx);
MEM_STATIC void ZSTD_customFree(void *ptr, ZSTD_customMem customMem)
MEM_STATIC void * ZSTD_customCalloc(size_t size, ZSTD_customMem customMem)
#define assert(condition)
POOL_ctx * POOL_create(size_t numThreads, size_t queueSize)
int POOL_resize(POOL_ctx *ctx, size_t numThreads)
void POOL_add(POOL_ctx *ctx, POOL_function function, void *opaque)
size_t POOL_sizeof(const POOL_ctx *ctx)
void POOL_free(POOL_ctx *ctx)
POOL_ctx * POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem)
void POOL_joinJobs(POOL_ctx *ctx)
int POOL_tryAdd(POOL_ctx *ctx, POOL_function function, void *opaque)
void(* POOL_function)(void *)
#define ZSTD_pthread_mutex_init(a, b)
#define ZSTD_pthread_mutex_unlock(a)
#define ZSTD_pthread_cond_signal(a)
#define ZSTD_pthread_cond_wait(a, b)
#define ZSTD_pthread_cond_broadcast(a)
#define ZSTD_pthread_mutex_lock(a)
#define ZSTD_pthread_mutex_destroy(a)
#define ZSTD_pthread_cond_init(a, b)
#define ZSTD_pthread_cond_destroy(a)
#define ZSTD_memcpy(d, s, l)