Changeset 18953 for trunk/psLib/src/sys/psThread.c
- Timestamp:
- Aug 8, 2008, 8:05:09 AM (18 years ago)
- File:
-
- 1 edited
-
trunk/psLib/src/sys/psThread.c (modified) (7 diffs)
Legend:
- Unmodified
- Added
- Removed
-
trunk/psLib/src/sys/psThread.c
r18827 r18953 1 1 #ifdef HAVE_CONFIG_H 2 # include "config.h"2 #include "config.h" 3 3 #endif 4 4 … … 17 17 #include "psThread.h" 18 18 19 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 20 static psList *pending = NULL; // queue of pending jobs 21 static psList *done = NULL; // queue of done jobs 22 static psArray *pool = NULL; // array of defined threads 23 static psArray *tasks = NULL; // queue of tasks 19 #define THREAD_WAIT 10000 // Microseconds to wait for threads 20 21 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // Mutex for locking threads 22 static psList *pending = NULL; // queue of pending jobs 23 static psList *done = NULL; // queue of done jobs 24 static psArray *pool = NULL; // array of defined threads 25 static psArray *tasks = NULL; // queue of tasks 24 26 25 27 /***** basic thread functions *****/ 26 28 27 void psThreadLock () { 28 pthread_mutex_lock (&mutex); 29 return; 30 } 31 32 void psThreadUnlock () { 33 pthread_mutex_unlock (&mutex); 34 return; 35 } 36 37 void psThreadFree (psThread *thread) { 29 void psThreadLock(void) 30 { 31 pthread_mutex_lock(&mutex); 32 return; 33 } 34 35 void psThreadUnlock(void) 36 { 37 pthread_mutex_unlock(&mutex); 38 return; 39 } 40 41 static void threadFree(psThread *thread) 42 { 43 // Nothing to free; this function is merely provided for identification purposes 38 44 return; 39 45 } 40 46 41 47 // allocate a psThread 42 psThread *psThreadAlloc () {43 44 psThread *thread = (psThread *)psAlloc(sizeof(psThread));45 psMemSetDeallocator(thread, (psFreeFunc) psThreadFree);48 psThread *psThreadAlloc(void) 49 { 50 psThread *thread = (psThread*)psAlloc(sizeof(psThread)); 51 psMemSetDeallocator(thread, (psFreeFunc)threadFree); 46 52 47 53 thread->busy = false; … … 52 58 /***** thread job functions *****/ 53 59 54 void psThreadJobFree (psThreadJob *job) { 55 56 if (!job) return; 57 58 psFree (job->type); 59 psFree (job->args); 60 static void threadJobFree(psThreadJob *job) 61 { 62 psFree(job->type); 63 psFree(job->args); 60 64 return; 61 65 } 62 66 63 67 // allocate a psThreadJob of the given type 64 psThreadJob *psThreadJobAlloc (char *type) {65 68 psThreadJob *psThreadJobAlloc(const char *type) 69 { 66 70 psThreadJob *job = (psThreadJob *)psAlloc(sizeof(psThreadJob)); 67 psMemSetDeallocator(job, (psFreeFunc) psThreadJobFree);68 69 job->type = psStringCopy (type);70 job->args = psArrayAlloc (0);71 psMemSetDeallocator(job, (psFreeFunc)threadJobFree); 72 73 job->type = psStringCopy(type); 74 job->args = psArrayAlloc(0); 71 75 return job; 72 76 } 73 77 74 78 // add a job to the queue of pending jobs 75 bool psThreadJobAddPending (psThreadJob *job) { 79 bool psThreadJobAddPending(psThreadJob *job) 80 { 81 PS_ASSERT_THREAD_JOB_NON_NULL(job, false); 76 82 77 83 // if we failed to call psThreadPoolInit, or we called it with nThreads == 0, … … 79 85 if (!pool || !pool->n) { 80 86 81 // in non-threaded operation, the job is placed on the done list and immediately run 82 if (done == NULL) { 83 done = psListAlloc(NULL); 84 } 85 psListAdd (done, PS_LIST_TAIL, job); 86 87 // find the corresponding task and run it 88 for (int i = 0; i < tasks->n; i++) { 89 psThreadTask *task = tasks->data[i]; 90 if (strcmp (job->type, task->type)) continue; 91 92 psAssert (job->args->n == task->nArgs, "invalid number of arguments to %s", task->type); 93 94 bool status = task->function(job); 95 return status; 96 } 97 return false; 98 } 99 100 psThreadLock (); 101 if (pending == NULL) { 102 pending = psListAlloc(NULL); 103 } 104 105 psListAdd (pending, PS_LIST_TAIL, job); 106 psThreadUnlock (); 87 // in non-threaded operation, the job is placed on the done list and immediately run 88 if (!done) { 89 done = psListAlloc(NULL); 90 } 91 psListAdd(done, PS_LIST_TAIL, job); 92 93 // find the corresponding task and run it 94 for (int i = 0; i < tasks->n; i++) { 95 psThreadTask *task = tasks->data[i]; 96 if (strcmp (job->type, task->type) != 0) { 97 continue; 98 } 99 psAssert(job->args->n == task->nArgs, "invalid number of arguments to %s", task->type); 100 101 bool status = task->function(job); 102 return status; 103 } 104 // Programming error 105 psAbort("Unable to find job %s", job->type); 106 } 107 108 psThreadLock(); 109 if (!pending) { 110 pending = psListAlloc(NULL); 111 } 112 psListAdd(pending, PS_LIST_TAIL, job); 113 psThreadUnlock(); 114 107 115 return true; 108 116 } 109 117 110 118 // this function is not locked -- see thread launder for example 111 psThreadJob *psThreadJobGetPending () { 112 113 if (!pending) return NULL; 114 115 psThreadJob *job = psListGetAndRemove (pending, PS_LIST_HEAD); 116 117 // jobs we pull off the pending queue get placed on the done queue 118 if (job) { 119 if (done == NULL) { 120 done = psListAlloc(NULL); 121 } 122 psListAdd (done, PS_LIST_TAIL, job); 123 } 119 psThreadJob *psThreadJobGetPending(void) 120 { 121 if (!pending) { 122 return NULL; 123 } 124 125 psThreadJob *job = psListGetAndRemove(pending, PS_LIST_HEAD); 124 126 return job; 125 127 } 126 128 127 // this function is not locked -- see thread launder for example 128 psThreadJob *psThreadJobGetDone () { 129 130 if (!done) return NULL; 131 132 psThreadJob *job = psListGetAndRemove (done, PS_LIST_HEAD); 129 // this function is not locked -- see thread launcher for example 130 psThreadJob *psThreadJobGetDone(void) 131 { 132 if (!done) { 133 return NULL; 134 } 135 136 psThreadJob *job = psListGetAndRemove(done, PS_LIST_HEAD); 133 137 return job; 134 138 } … … 136 140 /***** thread task functions *****/ 137 141 138 void psThreadTaskFree (psThreadTask *task) { 139 140 if (!task) return; 141 142 psFree (task->type); 142 static void threadTaskFree(psThreadTask *task) 143 { 144 psFree(task->type); 143 145 return; 144 146 } 145 147 146 148 // allocate a psThreadTask with nArgs arguments 147 psThreadTask *psThreadTaskAlloc (char *type, int nArgs) {148 149 psThreadTask *psThreadTaskAlloc(const char *type, int nArgs) 150 { 149 151 psThreadTask *task = (psThreadTask *)psAlloc(sizeof(psThreadTask)); 150 psMemSetDeallocator(task, (psFreeFunc) psThreadTaskFree);151 152 task->type = psStringCopy (type);152 psMemSetDeallocator(task, (psFreeFunc)threadTaskFree); 153 154 task->type = psStringCopy(type); 153 155 task->nArgs = nArgs; 154 156 task->function = NULL; … … 157 159 158 160 // add a task to the collection of tasks 159 bool psThreadTaskAdd (psThreadTask *task) { 160 161 if (tasks == NULL) { 162 tasks = psArrayAllocEmpty(8); 163 } 164 165 psArrayAdd (tasks, 1, task); 161 bool psThreadTaskAdd(psThreadTask *task) 162 { 163 PS_ASSERT_THREAD_TASK_NON_NULL(task, false); 164 165 if (!tasks) { 166 tasks = psArrayAllocEmpty(8); 167 } 168 169 psArrayAdd(tasks, 1, task); 166 170 return true; 167 171 } 168 172 169 173 // each thread runs this function to choose the task functions 170 void *psThreadLauncher (void *data) { 171 172 psThread *self = data; 173 psThreadJob *job = NULL; 174 void *psThreadLauncher(void *thread) 175 { 176 psThread *self = thread; // Thread that's running 174 177 175 178 while (1) { 176 177 // if we get an error, just wait until we are cleared or killed 178 while (self->fault) { 179 usleep (10000); 180 } 181 182 // if no tasks are assigned, just wait until they are 183 while (tasks == NULL) { 184 usleep (10000); 185 } 186 187 // request a new job, if there are none available, sleep a bit 188 // we have to lock here so the job queue cannot be empty yet no threads busy 189 psThreadLock(); 190 while ((job = psThreadJobGetPending ()) == NULL) { 191 psThreadUnlock(); 192 usleep (10000); 193 psThreadLock(); // XXX ??? 194 } 195 self->busy = true; 196 197 for (int i = 0; i < tasks->n; i++) { 198 psThreadTask *task = tasks->data[i]; 199 if (strcmp (job->type, task->type)) continue; 200 201 psThreadUnlock(); 202 psAssert (job->args->n == task->nArgs, "invalid number of arguments to %s", task->type); 203 204 bool status = task->function(job); 205 if (!status) { 206 self->fault = true; 207 } 208 self->busy = false; 209 break; 210 } 211 psThreadUnlock(); 212 // XXX what do we do if the job is unknown? 179 // if we get an error, just wait until we are cleared or killed 180 while (self->fault) { 181 usleep(THREAD_WAIT); 182 } 183 184 // if no tasks are assigned, just wait until they are 185 while (!tasks) { 186 usleep(THREAD_WAIT); 187 } 188 189 // request a new job, if there are none available, sleep a bit 190 // we have to lock here so the job queue cannot be empty yet no threads busy 191 psThreadLock(); 192 psThreadJob *job = NULL; // Job to process 193 while ((job = psThreadJobGetPending()) == NULL) { 194 psThreadUnlock(); 195 usleep(THREAD_WAIT); 196 psThreadLock(); // XXX ??? 197 } 198 self->busy = true; 199 200 bool found = false; // Found the job? 201 for (int i = 0; i < tasks->n; i++) { 202 psThreadTask *task = tasks->data[i]; // Task to do 203 if (strcmp(job->type, task->type) != 0) { 204 continue; 205 } 206 found = true; 207 208 psThreadUnlock(); 209 psAssert(job->args->n == task->nArgs, "invalid number of arguments to %s", task->type); 210 211 bool status = task->function(job); // Status of executing task 212 213 // Put the completed job on the 'done' queue 214 psThreadLock(); 215 if (!done) { 216 done = psListAlloc(NULL); 217 } 218 psListAdd(done, PS_LIST_TAIL, job); 219 psFree(job); 220 221 if (!status) { 222 self->fault = true; 223 } 224 self->busy = false; 225 break; 226 } 227 psThreadUnlock(); 228 if (!found) { 229 // Programming error 230 psAbort("Unable to find job %s", job->type); 231 } 213 232 } 214 233 } … … 217 236 218 237 // create a pool of Nthreads, each running the user's job-launcher function 219 bool psThreadPoolInit (int nThreads) { 220 221 if (pool) psAbort ("psThreadsInit already called"); 222 223 pool = psArrayAlloc (nThreads); 238 bool psThreadPoolInit(int nThreads) 239 { 240 if (pool) { 241 psAbort("psThreadsInit already called"); 242 } 243 244 PS_ASSERT_INT_NONNEGATIVE(nThreads, false); 245 if (nThreads == 0) { 246 // No threading 247 return true; 248 } 249 250 pool = psArrayAlloc(nThreads); 224 251 for (int i = 0; i < nThreads; i++) { 225 psThread *thread = psThreadAlloc(); 226 pthread_create(&thread->pt, NULL, psThreadLauncher, thread);227 pool->data[i] = thread;252 psThread *thread = psThreadAlloc(); // Thread for pool 253 pthread_create(&thread->pt, NULL, psThreadLauncher, thread); 254 pool->data[i] = thread; 228 255 } 229 256 return true; 230 257 } 231 258 232 // call this function after you have added jobs to the queue and 233 bool psThreadPoolWait () { 234 235 // this function blocks (waits in usleep) until either 236 // an error is detected on one of the threads or until 237 // all threads are idle and no jobs are left on the queue 238 239 if (!pool) return true; 240 if (!pool->n) return true; 259 int psThreadPoolSize(void) 260 { 261 return pool ? pool->n : 0; 262 } 263 264 // call this function after you have added jobs to the queue and 265 bool psThreadPoolWait(bool harvest) 266 { 267 if (!pool || pool->n == 0) { 268 // No threads initialised, so everything's done 269 return true; 270 } 241 271 242 272 while (1) { 243 244 // check for an error 245 for (int i = 0; i < pool->n; i++) { 246 psThread *thread = pool->data[i]; 247 if (thread->fault) return false; 248 } 249 250 // are all threads idle? 251 psThreadLock(); 252 for (int i = 0; i < pool->n; i++) { 253 psThread *thread = pool->data[i]; 254 if (thread->busy) goto SLEEP; 255 } 256 257 if (!pending) return true; 258 259 // is the queue empty? 260 if (pending->head == NULL) { 261 psThreadUnlock(); 262 return true; 263 } 273 // check for an error 274 for (int i = 0; i < pool->n; i++) { 275 psThread *thread = pool->data[i]; 276 if (thread->fault) { 277 return false; 278 } 279 } 280 281 // Harvest jobs, if requested 282 if (harvest) { 283 psThreadLock(); 284 psThreadJob *job; // Job from done queue 285 while ((job = psThreadJobGetDone())) { 286 psFree(job); 287 } 288 psThreadUnlock(); 289 } 290 291 // are all threads idle? 292 psThreadLock(); 293 for (int i = 0; i < pool->n; i++) { 294 psThread *thread = pool->data[i]; 295 if (thread->busy) { 296 // At least one thread is busy: sleep. 297 goto SLEEP; 298 } 299 } 300 301 if (!pending || !pending->head) { 302 // Nothing in the queue 303 psThreadUnlock(); 304 return true; 305 } 264 306 265 307 SLEEP: 266 psThreadUnlock(); 267 usleep (10000); 268 } 308 psThreadUnlock(); 309 usleep(THREAD_WAIT); 310 } 311 269 312 return false; 270 313 } 271 314 272 // create a pool of Nthreads, each running the user's job-launcher function 273 bool psThreadPoolFinalize () { 274 275 psFree (pending); 315 bool psThreadPoolFinalize(void) 316 { 317 psFree(pending); 276 318 pending = NULL; 277 319 278 psFree (done);320 psFree(done); 279 321 done = NULL; 280 322 281 psFree (pool);323 psFree(pool); 282 324 pool = NULL; 283 325 284 psFree (tasks);326 psFree(tasks); 285 327 tasks = NULL; 286 328
Note:
See TracChangeset
for help on using the changeset viewer.
