32#include "SDL_thread.h"
35#include "threadpool.h"
39#define THREADPOOL_TIMEOUT (5 * 100)
40#define THREADSIG_STOP (1)
41#define THREADSIG_RUN (0)
46static int MAXTHREADS = 8;
71typedef struct ThreadQueueData_ {
72 int (*function)(
void *);
79typedef struct ThreadData_ {
80 int (*function)(
void *);
92typedef struct vpoolThreadData_ {
101static ThreadQueue *global_queue = NULL;
106static ThreadQueue* tq_create (
void);
107static void tq_enqueue( ThreadQueue *q,
void *data );
108static void* tq_dequeue( ThreadQueue *q );
109static void tq_destroy( ThreadQueue *q );
110static int threadpool_worker(
void *data );
111static int threadpool_handler(
void *data );
112static int vpool_worker(
void *data );
123static ThreadQueue* tq_create (
void)
129 q = calloc( 1,
sizeof(ThreadQueue) );
132 n = calloc( 1,
sizeof(
Node) );
138 q->t_lock = SDL_CreateMutex();
139 q->h_lock = SDL_CreateMutex();
140 q->semaphore = SDL_CreateSemaphore( 0 );
151static void tq_enqueue( ThreadQueue *q,
void *data )
154 Node *n = calloc( 1,
sizeof(
Node) );
159 SDL_mutexP( q->t_lock );
167 SDL_SemPost( q->semaphore );
168 SDL_mutexV( q->t_lock );
179static void* tq_dequeue( ThreadQueue *q )
182 Node *newhead, *node;
185 SDL_mutexP( q->h_lock );
189 newhead = node->next;
192 if (newhead == NULL) {
193 WARN(_(
"Tried to dequeue while the queue was empty!"));
202 newhead = node->next;
203 }
while (newhead == NULL);
211 SDL_mutexV( q->h_lock );
224static void tq_destroy( ThreadQueue *q )
227 while (q->first->next != NULL)
228 free( tq_dequeue(q) );
231 SDL_DestroySemaphore( q->semaphore );
232 SDL_DestroyMutex( q->h_lock );
233 SDL_DestroyMutex( q->t_lock );
249int threadpool_newJob(
int (*function)(
void *),
void *data )
253 if (global_queue == NULL) {
254 WARN(_(
"Threadpool has not been initialized yet!"));
261 node->function = function;
264 tq_enqueue( global_queue, node );
278static int threadpool_worker(
void *data )
287 while (SDL_SemWait( work->semaphore ) == -1) {
290 WARN(_(
"SDL_SemWait failed! Error: %s"), SDL_GetError());
293 if (work->signal == THREADSIG_STOP)
297 work->function( work->data );
300 tq_enqueue( work->idle, work );
303 tq_enqueue( work->stopped, work );
328static int threadpool_handler(
void *data )
331 int nrunning, newthread;
334 ThreadQueue *idle, *stopped;
339 stopped = tq_create();
342 threadargs = calloc( MAXTHREADS,
sizeof(
ThreadData) );
345 for (
int i=0; i<MAXTHREADS; i++) {
346 threadargs[i].function = NULL;
347 threadargs[i].data = NULL;
348 threadargs[i].semaphore = SDL_CreateSemaphore( 0 );
349 threadargs[i].idle = idle;
350 threadargs[i].stopped = stopped;
351 threadargs[i].signal = THREADSIG_RUN;
353 tq_enqueue( stopped, &threadargs[i] );
373 if (SDL_SemWaitTimeout( global_queue->semaphore, THREADPOOL_TIMEOUT ) != 0) {
375 if (SDL_SemTryWait( idle->semaphore ) == 0) {
376 threadarg = tq_dequeue( idle );
378 threadarg->signal = THREADSIG_STOP;
380 SDL_SemPost( threadarg->semaphore );
395 if (SDL_SemWait( global_queue->semaphore ) == -1) {
396 WARN(_(
"SDL_SemWait failed! Error: %s"), SDL_GetError());
407 node = tq_dequeue( global_queue );
415 if (SDL_SemTryWait(idle->semaphore) == 0)
416 threadarg = tq_dequeue( idle );
418 else if (SDL_SemTryWait(stopped->semaphore) == 0) {
419 threadarg = tq_dequeue( stopped );
420 threadarg->signal = THREADSIG_RUN;
425 while (SDL_SemWait(idle->semaphore) == -1) {
427 WARN(_(
"SDL_SemWait failed! Error: %s"), SDL_GetError());
429 threadarg = tq_dequeue( idle );
433 threadarg->function = node->function;
434 threadarg->data = node->data;
436 SDL_SemPost( threadarg->semaphore );
440 SDL_CreateThread( threadpool_worker,
453 tq_destroy( stopped );
464int threadpool_init (
void)
466 MAXTHREADS = SDL_GetCPUCount() + 1;
469 if (global_queue != NULL) {
470 WARN(_(
"Threadpool has already been initialized!"));
475 global_queue = tq_create();
478 if ( SDL_CreateThread( threadpool_handler,
"threadpool_handler", NULL ) == NULL ) {
479 ERR( _(
"Threadpool init failed: %s" ), SDL_GetError() );
500ThreadQueue* vpool_create (
void)
514void vpool_enqueue( ThreadQueue *queue,
int (*function)(
void *),
void *data )
521 node->function = function;
524 tq_enqueue( queue, node );
533static int vpool_worker(
void *data )
540 node->function( node->data );
543 SDL_mutexP( work->mutex );
544 cnt = *(work->count) - 1;
546 SDL_CondSignal( work->cond );
547 *(work->count) = cnt;
548 SDL_mutexV( work->mutex );
561void vpool_wait( ThreadQueue *queue )
570 cond = SDL_CreateCond();
571 mutex = SDL_CreateMutex();
573 cnt = SDL_SemValue( queue->semaphore );
580 for (
int i=0; i<cnt; i++) {
582 while (SDL_SemWait( queue->semaphore ) == -1) {
584 WARN(_(
"SDL_SemWait failed! Error: %s"), SDL_GetError());
586 node = tq_dequeue( queue );
591 arg[i].mutex = mutex;
595 threadpool_newJob( vpool_worker, &arg[i] );
599 SDL_CondWait( cond, mutex );
603 SDL_DestroyMutex( mutex );
604 SDL_DestroyCond( cond );
Node in the thread queue.
Data for the threadqueue.
Virtual thread pool data.