Changeset 18822 for branches/eam_branch_20080719/psLib/src/sys/psThread.c
- Timestamp:
- Jul 31, 2008, 1:24:22 PM (18 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
branches/eam_branch_20080719/psLib/src/sys/psThread.c
r18808 r18822 6 6 #include <stdarg.h> 7 7 #include <unistd.h> 8 #include <string.h> 8 9 9 10 #include "psAssert.h" … … 19 20 static psList *pending = NULL; // queue of pending jobs 20 21 static psList *done = NULL; // queue of done jobs 21 22 22 static psArray *pool = NULL; // array of defined threads 23 static psArray *tasks = NULL; // queue of tasks 24 25 /***** basic thread functions *****/ 23 26 24 27 void psThreadLock () { … … 30 33 pthread_mutex_unlock (&mutex); 31 34 return; 32 }33 34 void psThreadJobFree (psThreadJob *job) {35 36 if (!job) return;37 38 psFree (job->type);39 psFree (job->args);40 return;41 }42 43 // allocate a psThreadJob with nArgs arguments44 psThreadJob *psThreadJobAlloc (char *type, int nArgs) {45 46 psThreadJob *job = (psThreadJob *)psAlloc(sizeof(psThreadJob));47 psMemSetDeallocator(job, (psFreeFunc)psThreadJobFree);48 49 job->type = psStringCopy (type);50 job->args = psArrayAllocEmpty (nArgs);51 return job;52 }53 54 // add a job to the queue of pending jobs55 bool psThreadJobAddPending (psThreadJob *job) {56 57 psThreadLock ();58 if (pending == NULL) {59 pending = psListAlloc(NULL);60 }61 62 psListAdd (pending, PS_LIST_TAIL, job);63 psThreadUnlock ();64 return true;65 }66 67 // this function is not locked -- see thread launder for example68 psThreadJob *psThreadJobGetPending () {69 70 if (!pending) return NULL;71 72 psThreadJob *job = psListGetAndRemove (pending, PS_LIST_HEAD);73 74 // jobs we pull off the pending queue get placed on the done queue75 if (job) {76 if (done == NULL) {77 done = psListAlloc(NULL);78 }79 psListAdd (done, PS_LIST_TAIL, job);80 }81 return job;82 }83 84 // this function is not locked -- see thread launder for example85 psThreadJob *psThreadJobGetDone () {86 87 if (!done) return NULL;88 89 psThreadJob *job = psListGetAndRemove (done, PS_LIST_HEAD);90 return job;91 35 } 92 36 … … 106 50 } 107 51 52 /***** thread job functions *****/ 53 54 void psThreadJobFree (psThreadJob *job) { 55 56 if (!job) return; 57 58 psFree (job->type); 59 psFree (job->args); 60 return; 61 } 62 63 // allocate a psThreadJob of the given type 64 psThreadJob *psThreadJobAlloc (char *type) { 65 66 psThreadJob *job = (psThreadJob *)psAlloc(sizeof(psThreadJob)); 67 psMemSetDeallocator(job, (psFreeFunc)psThreadJobFree); 68 69 job->type = psStringCopy (type); 70 job->args = psArrayAlloc (0); 71 return job; 72 } 73 74 // add a job to the queue of pending jobs 75 bool psThreadJobAddPending (psThreadJob *job) { 76 77 // if we failed to call psThreadPoolInit, or we called it with nThreads == 0, 78 // find the matching function and just run it. 79 if (!pool || !pool->n) { 80 81 // in non-threaded operation, the job is placed on the done list and immediately run 82 if (done == NULL) { 83 done = psListAlloc(NULL); 84 } 85 psListAdd (done, PS_LIST_TAIL, job); 86 87 // find the corresponding task and run it 88 for (int i = 0; i < tasks->n; i++) { 89 psThreadTask *task = tasks->data[i]; 90 if (strcmp (job->type, task->type)) continue; 91 92 psAssert (job->args->n == task->nArgs, "invalid number of arguments to %s", task->type); 93 94 bool status = task->function(job); 95 return status; 96 } 97 return false; 98 } 99 100 psThreadLock (); 101 if (pending == NULL) { 102 pending = psListAlloc(NULL); 103 } 104 105 psListAdd (pending, PS_LIST_TAIL, job); 106 psThreadUnlock (); 107 return true; 108 } 109 110 // this function is not locked -- see thread launder for example 111 psThreadJob *psThreadJobGetPending () { 112 113 if (!pending) return NULL; 114 115 psThreadJob *job = psListGetAndRemove (pending, PS_LIST_HEAD); 116 117 // jobs we pull off the pending queue get placed on the done queue 118 if (job) { 119 if (done == NULL) { 120 done = psListAlloc(NULL); 121 } 122 psListAdd (done, PS_LIST_TAIL, job); 123 } 124 return job; 125 } 126 127 // this function is not locked -- see thread launder for example 128 psThreadJob *psThreadJobGetDone () { 129 130 if (!done) return NULL; 131 132 psThreadJob *job = psListGetAndRemove (done, PS_LIST_HEAD); 133 return job; 134 } 135 136 /***** thread task functions *****/ 137 138 void psThreadTaskFree (psThreadTask *task) { 139 140 if (!task) return; 141 142 psFree (task->type); 143 return; 144 } 145 146 // allocate a psThreadTask with nArgs arguments 147 psThreadTask *psThreadTaskAlloc (char *type, int nArgs) { 148 149 psThreadTask *task = (psThreadTask *)psAlloc(sizeof(psThreadTask)); 150 psMemSetDeallocator(task, (psFreeFunc)psThreadTaskFree); 151 152 task->type = psStringCopy (type); 153 task->nArgs = nArgs; 154 task->function = NULL; 155 return task; 156 } 157 158 // add a task to the collection of tasks 159 bool psThreadTaskAdd (psThreadTask *task) { 160 161 if (tasks == NULL) { 162 tasks = psArrayAllocEmpty(8); 163 } 164 165 psArrayAdd (tasks, 1, task); 166 return true; 167 } 168 169 // each thread runs this function to choose the task functions 170 void *psThreadLauncher (void *data) { 171 172 psThread *self = data; 173 psThreadJob *job = NULL; 174 175 while (1) { 176 177 // if we get an error, just wait until we are cleared or killed 178 while (self->fault) { 179 usleep (10000); 180 } 181 182 // if no tasks are assigned, just wait until they are 183 while (tasks == NULL) { 184 usleep (10000); 185 } 186 187 // request a new job, if there are none available, sleep a bit 188 // we have to lock here so the job queue cannot be empty yet no threads busy 189 psThreadLock(); 190 while ((job = psThreadJobGetPending ()) == NULL) { 191 psThreadUnlock(); 192 usleep (10000); 193 psThreadLock(); // XXX ??? 194 } 195 self->busy = true; 196 197 for (int i = 0; i < tasks->n; i++) { 198 psThreadTask *task = tasks->data[i]; 199 if (strcmp (job->type, task->type)) continue; 200 201 psThreadUnlock(); 202 psAssert (job->args->n == task->nArgs, "invalid number of arguments to %s", task->type); 203 204 bool status = task->function(job); 205 if (!status) { 206 self->fault = true; 207 } 208 self->busy = false; 209 break; 210 } 211 psThreadUnlock(); 212 // XXX what do we do if the job is unknown? 213 } 214 } 215 216 /***** thread pool functions *****/ 217 108 218 // create a pool of Nthreads, each running the user's job-launcher function 109 bool psThreadPoolInit (int nThreads , psThreadLaunchJobsFunction function) {219 bool psThreadPoolInit (int nThreads) { 110 220 111 221 if (pool) psAbort ("psThreadsInit already called"); … … 114 224 for (int i = 0; i < nThreads; i++) { 115 225 psThread *thread = psThreadAlloc(); 116 pthread_create (&thread->pt, NULL, function, thread);226 pthread_create (&thread->pt, NULL, psThreadLauncher, thread); 117 227 pool->data[i] = thread; 118 228 } … … 126 236 // an error is detected on one of the threads or until 127 237 // all threads are idle and no jobs are left on the queue 238 239 if (!pool) return true; 240 if (!pool->n) return true; 128 241 129 242 while (1) { … … 169 282 pool = NULL; 170 283 171 return true; 172 } 173 284 psFree (tasks); 285 tasks = NULL; 286 287 return true; 288 } 289
Note:
See TracChangeset
for help on using the changeset viewer.
