naev 0.11.5
threadpool.c
1/*
2 * See Licensing and Copyright notice in threadpool.h
3 */
4/*
5 * @brief A simple threadpool implementation using a single queue.
6 *
7 * The queue is inspired by this paper (look for the queue with two locks):
8 *
9 * Maged M. Michael and Michael L. Scott. 1998. Nonblocking algorithms and
10 * preemption-safe locking on multiprogrammed shared memory multiprocessors. J.
11 * Parallel Distrib. Comput. 51, 1 (May 1998), 1-26. DOI=10.1006/jpdc.1998.1446
12 * http://dx.doi.org/10.1006/jpdc.1998.1446
13 *
14 * @ARTICLE{Michael98non-blockingalgorithms,
15 * author = {Maged M. Michael and Michael L. Scott},
16 * title = {Non-Blocking Algorithms and Preemption-Safe Locking on Multiprogrammed Shared Memory Multiprocessors},
17 * journal = {Journal of Parallel and Distributed Computing},
18 * year = {1998},
19 * volume = {51},
20 * pages = {1--26},
21 * }
22 *
23 * @note The algorithm/strategy for killing idle workers should be moved into
24 * the threadhandler and it should also be improved (the current strategy
25 * is probably not very good).
26 */
27
29#include <stdlib.h>
30#include "SDL.h"
31#include "SDL_error.h"
32#include "SDL_thread.h"
35#include "threadpool.h"
36
37#include "log.h"
38
39#define THREADPOOL_TIMEOUT (5 * 100) /* The time a worker thread waits in ms. */
40#define THREADSIG_STOP (1) /* The signal to stop a worker thread */
41#define THREADSIG_RUN (0) /* The signal to indicate the worker thread is running */
42
46static int MAXTHREADS = 8; /* Bit overkill, but oh well. */
47
51typedef struct Node_ {
52 void *data; /* The element in the list */
53 struct Node_ *next; /* The next node in the list */
54} Node;
55
60 Node *first; /* The first node */
61 Node *last; /* The second node */
62 /* A semaphore to ensure reads only happen when the queue is not empty */
63 SDL_sem *semaphore;
64 SDL_mutex *t_lock; /* Tail lock. Lock when reading/updating tail */
65 SDL_mutex *h_lock; /* Same as tail lock, except it's head lock */
66};
67
71typedef struct ThreadQueueData_ {
72 int (*function)(void *); /* The function to be called */
73 void *data; /* And its arguments */
75
79typedef struct ThreadData_ {
80 int (*function)(void *); /* The function to be called */
81 void *data; /* Arguments to the above function */
82 int signal; /* Signals to the thread */
83 SDL_sem *semaphore; /* The semaphore to signal new jobs or new signal in the
84 'signal' variable */
85 ThreadQueue *idle; /* The queue with idle threads */
86 ThreadQueue *stopped; /* The queue with stopped threads */
88
92typedef struct vpoolThreadData_ {
93 SDL_cond *cond; /* Condition variable for signalling all jobs in the vpool
94 are done */
95 SDL_mutex *mutex; /* The mutex to use with the above condition variable */
96 int *count; /* Variable to count number of finished jobs in the vpool */
97 ThreadQueueData *node; /* The job to be done */
99
100/* The global threadpool queue */
101static ThreadQueue *global_queue = NULL;
102
103/*
104 * Prototypes.
105 */
106static ThreadQueue* tq_create (void);
107static void tq_enqueue( ThreadQueue *q, void *data );
108static void* tq_dequeue( ThreadQueue *q );
109static void tq_destroy( ThreadQueue *q );
110static int threadpool_worker( void *data );
111static int threadpool_handler( void *data );
112static int vpool_worker( void *data );
113
123static ThreadQueue* tq_create (void)
124{
125 ThreadQueue *q;
126 Node *n;
127
128 /* Queue memory allocation. */
129 q = calloc( 1, sizeof(ThreadQueue) );
130
131 /* Allocate and insert the dummy node */
132 n = calloc( 1, sizeof(Node) );
133 n->next = NULL;
134 q->first = n;
135 q->last = n;
136
137 /* Create locks. */
138 q->t_lock = SDL_CreateMutex();
139 q->h_lock = SDL_CreateMutex();
140 q->semaphore = SDL_CreateSemaphore( 0 );
141
142 return q;
143}
144
151static void tq_enqueue( ThreadQueue *q, void *data )
152{
153 /* Allocate new struct. */
154 Node *n = calloc( 1, sizeof(Node) );
155 n->data = data;
156 n->next = NULL;
157
158 /* Lock */
159 SDL_mutexP( q->t_lock );
160
161 /* Enqueue. */
162 q->last->next = n;
163 q->last = n;
164
165 /* Signal and unlock. This wil break if someone tries to enqueue 2^32+1
166 * elements or something. */
167 SDL_SemPost( q->semaphore );
168 SDL_mutexV( q->t_lock );
169}
170
179static void* tq_dequeue( ThreadQueue *q )
180{
181 void *d;
182 Node *newhead, *node;
183
184 /* Lock the head. */
185 SDL_mutexP( q->h_lock );
186
187 /* Start running. */
188 node = q->first;
189 newhead = node->next;
190
191 /* Head not consistent. */
192 if (newhead == NULL) {
193 WARN(_("Tried to dequeue while the queue was empty!"));
194 /* Ugly fix :/ */
195 /*
196 SDL_mutexV(q->h_lock);
197 return NULL;
198 */
199 /* We prefer to wait until the cache updates :/ */
200 do {
201 node = q->first;
202 newhead = node->next;
203 } while (newhead == NULL);
204 }
205
206 /* Remember the value and assign newhead as the new dummy element. */
207 d = newhead->data;
208 q->first = newhead;
209
210 /* Unlock */
211 SDL_mutexV( q->h_lock );
212
213 free( node );
214 return d;
215}
216
224static void tq_destroy( ThreadQueue *q )
225{
226 /* Iterate through the list and free the nodes */
227 while (q->first->next != NULL)
228 free( tq_dequeue(q) ); /* Locks q->t_lock, so we must destroy mutex after. */
229
230 /* Clean up threading structures. */
231 SDL_DestroySemaphore( q->semaphore );
232 SDL_DestroyMutex( q->h_lock );
233 SDL_DestroyMutex( q->t_lock );
234
235 free( q->first );
236 free( q );
237}
238
249int threadpool_newJob( int (*function)(void *), void *data )
250{
251 ThreadQueueData *node;
252
253 if (global_queue == NULL) {
254 WARN(_("Threadpool has not been initialized yet!"));
255 return -2;
256 }
257
258 /* Allocate and set parameters. */
259 node = calloc( 1, sizeof(ThreadQueueData) );
260 node->data = data;
261 node->function = function;
262
263 /* Actually enque. */
264 tq_enqueue( global_queue, node );
265
266 return 0;
267}
268
278static int threadpool_worker( void *data )
279{
280 ThreadData *work;
281
282 work = (ThreadData*) data;
283
284 /* Work loop */
285 while (1) {
286 /* Wait for new signal */
287 while (SDL_SemWait( work->semaphore ) == -1) {
288 /* Putting this in a while-loop is probably a really bad idea, but I
289 * don't have any better ideas. */
290 WARN(_("SDL_SemWait failed! Error: %s"), SDL_GetError());
291 }
292 /* Break if received signal to stop */
293 if (work->signal == THREADSIG_STOP)
294 break;
295
296 /* Do work :-) */
297 work->function( work->data );
298
299 /* Enqueue itself in the idle worker threads queue */
300 tq_enqueue( work->idle, work );
301 }
302 /* Enqueue itself in the stopped worker threads queue when stopped */
303 tq_enqueue( work->stopped, work );
304
305 return 0;
306}
307
328static int threadpool_handler( void *data )
329{
330 (void) data;
331 int nrunning, newthread;
332 ThreadData *threadargs, *threadarg;
333 /* Queues for idle workers and stopped workers */
334 ThreadQueue *idle, *stopped;
335 ThreadQueueData *node;
336
337 /* Initialize the idle and stopped queues. */
338 idle = tq_create();
339 stopped = tq_create();
340
341 /* Allocate threadargs to communicate with workers */
342 threadargs = calloc( MAXTHREADS, sizeof(ThreadData) );
343
344 /* Initialize threadargs */
345 for (int i=0; i<MAXTHREADS; i++) {
346 threadargs[i].function = NULL;
347 threadargs[i].data = NULL;
348 threadargs[i].semaphore = SDL_CreateSemaphore( 0 ); /* Used to give orders. */
349 threadargs[i].idle = idle;
350 threadargs[i].stopped = stopped;
351 threadargs[i].signal = THREADSIG_RUN;
352 /* 'Workers' that do not have a thread are considered stopped */
353 tq_enqueue( stopped, &threadargs[i] );
354 }
355
356 /* Set the number of running threads to 0 */
357 nrunning = 0;
358
359 /*
360 * Thread handler main loop.
361 */
362 while (1) {
363 /*
364 * We must now wait, this shall be done on each active thread. However they will
365 * be put to sleep as time passes. When we receive a command we'll proceed to process
366 * it.
367 */
368 if (nrunning > 0) {
369 /*
370 * Here we'll wait until thread gets work to do. If it doesn't it will
371 * just stop a worker thread and wait until it gets something to do.
372 */
373 if (SDL_SemWaitTimeout( global_queue->semaphore, THREADPOOL_TIMEOUT ) != 0) {
374 /* There weren't any new jobs so we'll start killing threads ;) */
375 if (SDL_SemTryWait( idle->semaphore ) == 0) {
376 threadarg = tq_dequeue( idle );
377 /* Set signal to stop worker thread */
378 threadarg->signal = THREADSIG_STOP;
379 /* Signal thread and decrement running threads counter */
380 SDL_SemPost( threadarg->semaphore );
381 nrunning -= 1;
382 }
383
384 /* We just go back to waiting on a thread. */
385 continue;
386 }
387
388 /* We got work. Continue to handle work. */
389 }
390 else {
391 /*
392 * Here we wait for a new job. No threads are alive at this point and the
393 * threadpool is just patiently waiting for work to arrive.
394 */
395 if (SDL_SemWait( global_queue->semaphore ) == -1) {
396 WARN(_("SDL_SemWait failed! Error: %s"), SDL_GetError());
397 continue;
398 }
399
400 /* We got work. Continue to handle work. */
401 }
402
403 /*
404 * Get a new job from the queue. This should be safe as we have received
405 * a permission from the global_queue->semaphore.
406 */
407 node = tq_dequeue( global_queue );
408 newthread = 0;
409
410 /*
411 * Choose where to get the thread. Either idle, revive stopped or block until
412 * another thread becomes idle.
413 */
414 /* Idle thread available */
415 if (SDL_SemTryWait(idle->semaphore) == 0)
416 threadarg = tq_dequeue( idle );
417 /* Make a new thread */
418 else if (SDL_SemTryWait(stopped->semaphore) == 0) {
419 threadarg = tq_dequeue( stopped );
420 threadarg->signal = THREADSIG_RUN;
421 newthread = 1;
422 }
423 /* Wait for idle thread */
424 else {
425 while (SDL_SemWait(idle->semaphore) == -1) {
426 /* Bad idea */
427 WARN(_("SDL_SemWait failed! Error: %s"), SDL_GetError());
428 }
429 threadarg = tq_dequeue( idle );
430 }
431
432 /* Assign arguments for the thread */
433 threadarg->function = node->function;
434 threadarg->data = node->data;
435 /* Signal the thread that there's a new job */
436 SDL_SemPost( threadarg->semaphore );
437
438 /* Start a new thread and increment the thread counter */
439 if (newthread) {
440 SDL_CreateThread( threadpool_worker,
441 "threadpool_worker",
442 threadarg );
443 nrunning += 1;
444 }
445
446 /* Free the now unused job from the global_queue */
447 free(node);
448 }
451 /* Clean up. */
452 tq_destroy( idle );
453 tq_destroy( stopped );
454 free( threadargs );
455
456 return 0;
457}
458
464int threadpool_init (void)
465{
466 MAXTHREADS = SDL_GetCPUCount() + 1; /* SDL 1.3 is pretty cool. */
467
468 /* There's already a queue */
469 if (global_queue != NULL) {
470 WARN(_("Threadpool has already been initialized!"));
471 return -1;
472 }
473
474 /* Create the global queue queue */
475 global_queue = tq_create();
476
477 /* Initialize the threadpool handler. */
478 if ( SDL_CreateThread( threadpool_handler, "threadpool_handler", NULL ) == NULL ) {
479 ERR( _( "Threadpool init failed: %s" ), SDL_GetError() );
480 return -1;
481 }
482
483 return 0;
484}
485
500ThreadQueue* vpool_create (void)
501{
502 return tq_create();
503}
504
514void vpool_enqueue( ThreadQueue *queue, int (*function)(void *), void *data )
515{
516 ThreadQueueData *node;
517
518 /* Allocate and set up data. */
519 node = calloc( 1, sizeof(ThreadQueueData) );
520 node->data = data;
521 node->function = function;
522
523 /* Add to vpool. */
524 tq_enqueue( queue, node );
525}
526
533static int vpool_worker( void *data )
534{
535 int cnt;
536 vpoolThreadData *work = (vpoolThreadData*) data;
537 ThreadQueueData *node = work->node;
538
539 /* Do work */
540 node->function( node->data );
541
542 /* Decrement the counter and signal vpool_wait if all threads are done */
543 SDL_mutexP( work->mutex );
544 cnt = *(work->count) - 1;
545 if (cnt <= 0) /* All jobs done. */
546 SDL_CondSignal( work->cond ); /* Signal waiting thread */
547 *(work->count) = cnt;
548 SDL_mutexV( work->mutex );
549
550 /* 'work' may not be valid memory anymore. */
551 free( node );
552
553 return 0;
554}
555
556/* @brief Run every job in the vpool queue and block until every job in the
557 * queue is done.
558 *
559 * @note It destroys the queue when it's done.
560 */
561void vpool_wait( ThreadQueue *queue )
562{
563 int cnt;
564 SDL_cond *cond;
565 SDL_mutex *mutex;
566 vpoolThreadData *arg;
567 ThreadQueueData *node;
568
569 /* Create temporary threading structures. */
570 cond = SDL_CreateCond();
571 mutex = SDL_CreateMutex();
572 /* This might be a little ugly (and inefficient?) */
573 cnt = SDL_SemValue( queue->semaphore );
574
575 /* Allocate all vpoolThreadData objects */
576 arg = calloc( cnt, sizeof(vpoolThreadData) );
577
578 SDL_mutexP( mutex );
579 /* Initialize the vpoolThreadData */
580 for (int i=0; i<cnt; i++) {
581 /* This is needed to keep the invariants of the queue */
582 while (SDL_SemWait( queue->semaphore ) == -1) {
583 /* Again, a really bad idea */
584 WARN(_("SDL_SemWait failed! Error: %s"), SDL_GetError());
585 }
586 node = tq_dequeue( queue );
587
588 /* Set up arguments. */
589 arg[i].node = node;
590 arg[i].cond = cond;
591 arg[i].mutex = mutex;
592 arg[i].count = &cnt;
593
594 /* Launch new job. */
595 threadpool_newJob( vpool_worker, &arg[i] );
596 }
597
598 /* Wait for the threads to finish */
599 SDL_CondWait( cond, mutex );
600 SDL_mutexV( mutex );
601
602 /* Clean up */
603 SDL_DestroyMutex( mutex );
604 SDL_DestroyCond( cond );
605 tq_destroy( queue );
606 free( arg );
607}
static const double d[]
Definition rng.c:273
Node struct.
Definition queue.c:25
void * data
Definition queue.c:26
Node next
Definition queue.c:27
Node in the thread queue.
Definition threadpool.c:51
Thread data.
Definition threadpool.c:79
Data for the threadqueue.
Definition threadpool.c:71
Threadqueue itself.
Definition threadpool.c:59
Virtual thread pool data.
Definition threadpool.c:92