Changeset 18953
- Timestamp:
- Aug 8, 2008, 8:05:09 AM (18 years ago)
- Location:
- trunk/psLib/src/sys
- Files:
-
- 3 edited
-
psConfigure.c (modified) (3 diffs)
-
psThread.c (modified) (7 diffs)
-
psThread.h (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
-
trunk/psLib/src/sys/psConfigure.c
r18827 r18953 13 13 * @author Robert DeSonia, MHPCC 14 14 * 15 * @version $Revision: 1.2 7$ $Name: not supported by cvs2svn $16 * @date $Date: 2008-0 7-31 23:56:29$15 * @version $Revision: 1.28 $ $Name: not supported by cvs2svn $ 16 * @date $Date: 2008-08-08 18:05:08 $ 17 17 * 18 18 * Copyright 2004-2005 Maui High Performance Computing Center, University of Hawaii … … 34 34 #include "psEarthOrientation.h" 35 35 #include "psError.h" 36 #include "psFFT.h" 36 37 #include "psConfigure.h" 37 38 #include "psMemory.h" … … 125 126 psTimerStop(); 126 127 128 // Clean up FFTW threads 129 psFFTThreads(0); 130 131 // Clean up threads 127 132 psThreadPoolFinalize(); 128 133 -
trunk/psLib/src/sys/psThread.c
r18827 r18953 1 1 #ifdef HAVE_CONFIG_H 2 # include "config.h"2 #include "config.h" 3 3 #endif 4 4 … … 17 17 #include "psThread.h" 18 18 19 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 20 static psList *pending = NULL; // queue of pending jobs 21 static psList *done = NULL; // queue of done jobs 22 static psArray *pool = NULL; // array of defined threads 23 static psArray *tasks = NULL; // queue of tasks 19 #define THREAD_WAIT 10000 // Microseconds to wait for threads 20 21 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // Mutex for locking threads 22 static psList *pending = NULL; // queue of pending jobs 23 static psList *done = NULL; // queue of done jobs 24 static psArray *pool = NULL; // array of defined threads 25 static psArray *tasks = NULL; // queue of tasks 24 26 25 27 /***** basic thread functions *****/ 26 28 27 void psThreadLock () { 28 pthread_mutex_lock (&mutex); 29 return; 30 } 31 32 void psThreadUnlock () { 33 pthread_mutex_unlock (&mutex); 34 return; 35 } 36 37 void psThreadFree (psThread *thread) { 29 void psThreadLock(void) 30 { 31 pthread_mutex_lock(&mutex); 32 return; 33 } 34 35 void psThreadUnlock(void) 36 { 37 pthread_mutex_unlock(&mutex); 38 return; 39 } 40 41 static void threadFree(psThread *thread) 42 { 43 // Nothing to free; this function is merely provided for identification purposes 38 44 return; 39 45 } 40 46 41 47 // allocate a psThread 42 psThread *psThreadAlloc () {43 44 psThread *thread = (psThread *)psAlloc(sizeof(psThread));45 psMemSetDeallocator(thread, (psFreeFunc) psThreadFree);48 psThread *psThreadAlloc(void) 49 { 50 psThread *thread = (psThread*)psAlloc(sizeof(psThread)); 51 psMemSetDeallocator(thread, (psFreeFunc)threadFree); 46 52 47 53 thread->busy = false; … … 52 58 /***** thread job functions *****/ 53 59 54 void psThreadJobFree (psThreadJob *job) { 55 56 if (!job) return; 57 58 psFree (job->type); 59 psFree (job->args); 60 static void threadJobFree(psThreadJob *job) 61 { 62 psFree(job->type); 63 psFree(job->args); 60 64 return; 61 65 } 62 66 63 67 // allocate a psThreadJob of the given type 64 psThreadJob *psThreadJobAlloc (char *type) {65 68 psThreadJob *psThreadJobAlloc(const char *type) 69 { 66 70 psThreadJob *job = (psThreadJob *)psAlloc(sizeof(psThreadJob)); 67 psMemSetDeallocator(job, (psFreeFunc) psThreadJobFree);68 69 job->type = psStringCopy (type);70 job->args = psArrayAlloc (0);71 psMemSetDeallocator(job, (psFreeFunc)threadJobFree); 72 73 job->type = psStringCopy(type); 74 job->args = psArrayAlloc(0); 71 75 return job; 72 76 } 73 77 74 78 // add a job to the queue of pending jobs 75 bool psThreadJobAddPending (psThreadJob *job) { 79 bool psThreadJobAddPending(psThreadJob *job) 80 { 81 PS_ASSERT_THREAD_JOB_NON_NULL(job, false); 76 82 77 83 // if we failed to call psThreadPoolInit, or we called it with nThreads == 0, … … 79 85 if (!pool || !pool->n) { 80 86 81 // in non-threaded operation, the job is placed on the done list and immediately run 82 if (done == NULL) { 83 done = psListAlloc(NULL); 84 } 85 psListAdd (done, PS_LIST_TAIL, job); 86 87 // find the corresponding task and run it 88 for (int i = 0; i < tasks->n; i++) { 89 psThreadTask *task = tasks->data[i]; 90 if (strcmp (job->type, task->type)) continue; 91 92 psAssert (job->args->n == task->nArgs, "invalid number of arguments to %s", task->type); 93 94 bool status = task->function(job); 95 return status; 96 } 97 return false; 98 } 99 100 psThreadLock (); 101 if (pending == NULL) { 102 pending = psListAlloc(NULL); 103 } 104 105 psListAdd (pending, PS_LIST_TAIL, job); 106 psThreadUnlock (); 87 // in non-threaded operation, the job is placed on the done list and immediately run 88 if (!done) { 89 done = psListAlloc(NULL); 90 } 91 psListAdd(done, PS_LIST_TAIL, job); 92 93 // find the corresponding task and run it 94 for (int i = 0; i < tasks->n; i++) { 95 psThreadTask *task = tasks->data[i]; 96 if (strcmp (job->type, task->type) != 0) { 97 continue; 98 } 99 psAssert(job->args->n == task->nArgs, "invalid number of arguments to %s", task->type); 100 101 bool status = task->function(job); 102 return status; 103 } 104 // Programming error 105 psAbort("Unable to find job %s", job->type); 106 } 107 108 psThreadLock(); 109 if (!pending) { 110 pending = psListAlloc(NULL); 111 } 112 psListAdd(pending, PS_LIST_TAIL, job); 113 psThreadUnlock(); 114 107 115 return true; 108 116 } 109 117 110 118 // this function is not locked -- see thread launder for example 111 psThreadJob *psThreadJobGetPending () { 112 113 if (!pending) return NULL; 114 115 psThreadJob *job = psListGetAndRemove (pending, PS_LIST_HEAD); 116 117 // jobs we pull off the pending queue get placed on the done queue 118 if (job) { 119 if (done == NULL) { 120 done = psListAlloc(NULL); 121 } 122 psListAdd (done, PS_LIST_TAIL, job); 123 } 119 psThreadJob *psThreadJobGetPending(void) 120 { 121 if (!pending) { 122 return NULL; 123 } 124 125 psThreadJob *job = psListGetAndRemove(pending, PS_LIST_HEAD); 124 126 return job; 125 127 } 126 128 127 // this function is not locked -- see thread launder for example 128 psThreadJob *psThreadJobGetDone () { 129 130 if (!done) return NULL; 131 132 psThreadJob *job = psListGetAndRemove (done, PS_LIST_HEAD); 129 // this function is not locked -- see thread launcher for example 130 psThreadJob *psThreadJobGetDone(void) 131 { 132 if (!done) { 133 return NULL; 134 } 135 136 psThreadJob *job = psListGetAndRemove(done, PS_LIST_HEAD); 133 137 return job; 134 138 } … … 136 140 /***** thread task functions *****/ 137 141 138 void psThreadTaskFree (psThreadTask *task) { 139 140 if (!task) return; 141 142 psFree (task->type); 142 static void threadTaskFree(psThreadTask *task) 143 { 144 psFree(task->type); 143 145 return; 144 146 } 145 147 146 148 // allocate a psThreadTask with nArgs arguments 147 psThreadTask *psThreadTaskAlloc (char *type, int nArgs) {148 149 psThreadTask *psThreadTaskAlloc(const char *type, int nArgs) 150 { 149 151 psThreadTask *task = (psThreadTask *)psAlloc(sizeof(psThreadTask)); 150 psMemSetDeallocator(task, (psFreeFunc) psThreadTaskFree);151 152 task->type = psStringCopy (type);152 psMemSetDeallocator(task, (psFreeFunc)threadTaskFree); 153 154 task->type = psStringCopy(type); 153 155 task->nArgs = nArgs; 154 156 task->function = NULL; … … 157 159 158 160 // add a task to the collection of tasks 159 bool psThreadTaskAdd (psThreadTask *task) { 160 161 if (tasks == NULL) { 162 tasks = psArrayAllocEmpty(8); 163 } 164 165 psArrayAdd (tasks, 1, task); 161 bool psThreadTaskAdd(psThreadTask *task) 162 { 163 PS_ASSERT_THREAD_TASK_NON_NULL(task, false); 164 165 if (!tasks) { 166 tasks = psArrayAllocEmpty(8); 167 } 168 169 psArrayAdd(tasks, 1, task); 166 170 return true; 167 171 } 168 172 169 173 // each thread runs this function to choose the task functions 170 void *psThreadLauncher (void *data) { 171 172 psThread *self = data; 173 psThreadJob *job = NULL; 174 void *psThreadLauncher(void *thread) 175 { 176 psThread *self = thread; // Thread that's running 174 177 175 178 while (1) { 176 177 // if we get an error, just wait until we are cleared or killed 178 while (self->fault) { 179 usleep (10000); 180 } 181 182 // if no tasks are assigned, just wait until they are 183 while (tasks == NULL) { 184 usleep (10000); 185 } 186 187 // request a new job, if there are none available, sleep a bit 188 // we have to lock here so the job queue cannot be empty yet no threads busy 189 psThreadLock(); 190 while ((job = psThreadJobGetPending ()) == NULL) { 191 psThreadUnlock(); 192 usleep (10000); 193 psThreadLock(); // XXX ??? 194 } 195 self->busy = true; 196 197 for (int i = 0; i < tasks->n; i++) { 198 psThreadTask *task = tasks->data[i]; 199 if (strcmp (job->type, task->type)) continue; 200 201 psThreadUnlock(); 202 psAssert (job->args->n == task->nArgs, "invalid number of arguments to %s", task->type); 203 204 bool status = task->function(job); 205 if (!status) { 206 self->fault = true; 207 } 208 self->busy = false; 209 break; 210 } 211 psThreadUnlock(); 212 // XXX what do we do if the job is unknown? 179 // if we get an error, just wait until we are cleared or killed 180 while (self->fault) { 181 usleep(THREAD_WAIT); 182 } 183 184 // if no tasks are assigned, just wait until they are 185 while (!tasks) { 186 usleep(THREAD_WAIT); 187 } 188 189 // request a new job, if there are none available, sleep a bit 190 // we have to lock here so the job queue cannot be empty yet no threads busy 191 psThreadLock(); 192 psThreadJob *job = NULL; // Job to process 193 while ((job = psThreadJobGetPending()) == NULL) { 194 psThreadUnlock(); 195 usleep(THREAD_WAIT); 196 psThreadLock(); // XXX ??? 197 } 198 self->busy = true; 199 200 bool found = false; // Found the job? 201 for (int i = 0; i < tasks->n; i++) { 202 psThreadTask *task = tasks->data[i]; // Task to do 203 if (strcmp(job->type, task->type) != 0) { 204 continue; 205 } 206 found = true; 207 208 psThreadUnlock(); 209 psAssert(job->args->n == task->nArgs, "invalid number of arguments to %s", task->type); 210 211 bool status = task->function(job); // Status of executing task 212 213 // Put the completed job on the 'done' queue 214 psThreadLock(); 215 if (!done) { 216 done = psListAlloc(NULL); 217 } 218 psListAdd(done, PS_LIST_TAIL, job); 219 psFree(job); 220 221 if (!status) { 222 self->fault = true; 223 } 224 self->busy = false; 225 break; 226 } 227 psThreadUnlock(); 228 if (!found) { 229 // Programming error 230 psAbort("Unable to find job %s", job->type); 231 } 213 232 } 214 233 } … … 217 236 218 237 // create a pool of Nthreads, each running the user's job-launcher function 219 bool psThreadPoolInit (int nThreads) { 220 221 if (pool) psAbort ("psThreadsInit already called"); 222 223 pool = psArrayAlloc (nThreads); 238 bool psThreadPoolInit(int nThreads) 239 { 240 if (pool) { 241 psAbort("psThreadsInit already called"); 242 } 243 244 PS_ASSERT_INT_NONNEGATIVE(nThreads, false); 245 if (nThreads == 0) { 246 // No threading 247 return true; 248 } 249 250 pool = psArrayAlloc(nThreads); 224 251 for (int i = 0; i < nThreads; i++) { 225 psThread *thread = psThreadAlloc(); 226 pthread_create(&thread->pt, NULL, psThreadLauncher, thread);227 pool->data[i] = thread;252 psThread *thread = psThreadAlloc(); // Thread for pool 253 pthread_create(&thread->pt, NULL, psThreadLauncher, thread); 254 pool->data[i] = thread; 228 255 } 229 256 return true; 230 257 } 231 258 232 // call this function after you have added jobs to the queue and 233 bool psThreadPoolWait () { 234 235 // this function blocks (waits in usleep) until either 236 // an error is detected on one of the threads or until 237 // all threads are idle and no jobs are left on the queue 238 239 if (!pool) return true; 240 if (!pool->n) return true; 259 int psThreadPoolSize(void) 260 { 261 return pool ? pool->n : 0; 262 } 263 264 // call this function after you have added jobs to the queue and 265 bool psThreadPoolWait(bool harvest) 266 { 267 if (!pool || pool->n == 0) { 268 // No threads initialised, so everything's done 269 return true; 270 } 241 271 242 272 while (1) { 243 244 // check for an error 245 for (int i = 0; i < pool->n; i++) { 246 psThread *thread = pool->data[i]; 247 if (thread->fault) return false; 248 } 249 250 // are all threads idle? 251 psThreadLock(); 252 for (int i = 0; i < pool->n; i++) { 253 psThread *thread = pool->data[i]; 254 if (thread->busy) goto SLEEP; 255 } 256 257 if (!pending) return true; 258 259 // is the queue empty? 260 if (pending->head == NULL) { 261 psThreadUnlock(); 262 return true; 263 } 273 // check for an error 274 for (int i = 0; i < pool->n; i++) { 275 psThread *thread = pool->data[i]; 276 if (thread->fault) { 277 return false; 278 } 279 } 280 281 // Harvest jobs, if requested 282 if (harvest) { 283 psThreadLock(); 284 psThreadJob *job; // Job from done queue 285 while ((job = psThreadJobGetDone())) { 286 psFree(job); 287 } 288 psThreadUnlock(); 289 } 290 291 // are all threads idle? 292 psThreadLock(); 293 for (int i = 0; i < pool->n; i++) { 294 psThread *thread = pool->data[i]; 295 if (thread->busy) { 296 // At least one thread is busy: sleep. 297 goto SLEEP; 298 } 299 } 300 301 if (!pending || !pending->head) { 302 // Nothing in the queue 303 psThreadUnlock(); 304 return true; 305 } 264 306 265 307 SLEEP: 266 psThreadUnlock(); 267 usleep (10000); 268 } 308 psThreadUnlock(); 309 usleep(THREAD_WAIT); 310 } 311 269 312 return false; 270 313 } 271 314 272 // create a pool of Nthreads, each running the user's job-launcher function 273 bool psThreadPoolFinalize () { 274 275 psFree (pending); 315 bool psThreadPoolFinalize(void) 316 { 317 psFree(pending); 276 318 pending = NULL; 277 319 278 psFree (done);320 psFree(done); 279 321 done = NULL; 280 322 281 psFree (pool);323 psFree(pool); 282 324 pool = NULL; 283 325 284 psFree (tasks);326 psFree(tasks); 285 327 tasks = NULL; 286 328 -
trunk/psLib/src/sys/psThread.h
r18827 r18953 4 4 * 5 5 * @author EAM, IFA 6 * @version $Revision: 1. 2$ $Name: not supported by cvs2svn $7 * @date $Date: 2008-0 7-31 23:56:29 $6 * @version $Revision: 1.3 $ $Name: not supported by cvs2svn $ 7 * @date $Date: 2008-08-08 18:05:09 $ 8 8 * 9 9 * Copyright 2004-2005 Insitute for Astronomy, University of Hawaii … … 16 16 /// @{ 17 17 18 /// Job to be executed on a thread 19 /// 20 /// This job is passed to the function that executes it 18 21 typedef struct { 19 psString type; 20 psArray *args; 22 psString type; // Type of thread 23 psArray *args; // Arguments to job 21 24 } psThreadJob; 22 25 26 #define PS_ASSERT_THREAD_JOB_NON_NULL(JOB, RVAL) \ 27 if (!(JOB) || !(JOB)->type || !(JOB)->args) { \ 28 psError(PS_ERR_UNEXPECTED_NULL, true, "Thread job %s or one of its components is NULL.", #JOB); \ 29 return RVAL; \ 30 } 31 32 /// A thread, which executes a job 33 /// 34 /// Wraps pthread with a few extra conveniences 23 35 typedef struct { 24 bool busy; 25 bool fault; 26 pthread_t pt; 36 bool busy; // Is the thread busy? 37 bool fault; // Has the thread faulted? 38 pthread_t pt; // The thread itself 27 39 } psThread; 28 40 29 typedef bool (*psThreadTaskFunction)(psThreadJob *job); 41 /// Function to execute a thread job 42 typedef bool (*psThreadTaskFunction)(const psThreadJob *job); 30 43 44 /// Task that is executed on a thread 31 45 typedef struct { 32 psString type; 33 int nArgs; 34 psThreadTaskFunction function; 46 psString type; // Type of task 47 int nArgs; // Number of arguments that function takes 48 psThreadTaskFunction function; // Function to execute 35 49 } psThreadTask; 36 50 37 // typedef void *(*psThreadLaunchJobsFunction)(void *data); 51 #define PS_ASSERT_THREAD_TASK_NON_NULL(TASK, RVAL) \ 52 if (!(TASK) || !(TASK)->type || (TASK)->nArgs < 0 || !(TASK)->function) { \ 53 psError(PS_ERR_UNEXPECTED_NULL, true, "Thread task %s or one of its components is NULL.", #TASK); \ 54 return RVAL; \ 55 } 38 56 39 void psThreadLock ();40 void psThreadUnlock ();41 57 42 psThread *psThreadAlloc (); 58 /// Lock the thread mutex 59 void psThreadLock(void); 43 60 44 psThreadJob *psThreadJobAlloc (char *type); 45 bool psThreadJobAddPending (psThreadJob *job); 46 psThreadJob *psThreadJobGetPending (); 47 psThreadJob *psThreadJobGetDone (); 61 /// Unlock the thread mutex 62 void psThreadUnlock(void); 48 63 49 psThreadTask *psThreadTaskAlloc (char *type, int nArgs); 50 bool psThreadTaskAdd (psThreadTask *task); 51 void *psThreadLauncher (void *data); 64 /// Allocate a thread 65 psThread *psThreadAlloc(void); 52 66 53 bool psThreadPoolInit (int nThreads); 54 bool psThreadPoolWait (); 55 bool psThreadPoolFinalize (); 67 /// Allocate a thread job 68 psThreadJob *psThreadJobAlloc(const char *type); 69 70 /// Add a pending job to the queue 71 bool psThreadJobAddPending(psThreadJob *job); 72 73 /// Get a job off the queue of pending jobs 74 psThreadJob *psThreadJobGetPending(void); 75 76 /// Get a job off the queue of done jobs 77 psThreadJob *psThreadJobGetDone(void); 78 79 /// Allocate a thread task 80 psThreadTask *psThreadTaskAlloc(const char *type, // Type of task 81 int nArgs // Number of arguments 82 ); 83 84 /// Add a task to the list 85 bool psThreadTaskAdd(psThreadTask *task // Task to add 86 ); 87 88 /// Launch jobs on a thread 89 void *psThreadLauncher(void *thread // Thread (of type psThread) 90 ); 91 92 /// Initialise a pool of threads 93 bool psThreadPoolInit(int nThreads // Number of threads 94 ); 95 96 /// Return size of thread pool 97 int psThreadPoolSize(void); 98 99 /// Wait for the thread pool to finish 100 /// 101 /// This function blocks (waits in usleep) until either an error is detected on one of the threads or until ll 102 /// threads are idle and no jobs are left on the queue 103 bool psThreadPoolWait(bool harvest // Harvest the jobs from the queue? 104 ); 105 106 /// Clean up the thread pool 107 bool psThreadPoolFinalize(void); 56 108 57 109 /// @}
Note:
See TracChangeset
for help on using the changeset viewer.
