Changeset 18822
- Timestamp:
- Jul 31, 2008, 1:24:22 PM (18 years ago)
- Location:
- branches/eam_branch_20080719
- Files:
-
- 2 added
- 10 edited
-
ppMerge/src/Makefile.am (modified) (1 diff)
-
ppMerge/src/ppMerge.h (modified) (1 diff)
-
ppMerge/src/ppMergeArguments.c (modified) (2 diffs)
-
ppMerge/src/ppMergeLoop_Threaded.c (modified) (5 diffs)
-
ppMerge/src/ppMergeSetThreads.c (added)
-
psLib/src/sys/psThread.c (modified) (7 diffs)
-
psLib/src/sys/psThread.h (modified) (2 diffs)
-
pswarp/src/Makefile.am (modified) (1 diff)
-
pswarp/src/pswarp.h (modified) (1 diff)
-
pswarp/src/pswarpArguments.c (modified) (1 diff)
-
pswarp/src/pswarpSetThreads.c (added)
-
pswarp/src/pswarpTransformReadout.c (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
-
branches/eam_branch_20080719/ppMerge/src/Makefile.am
r18819 r18822 13 13 ppMergeReadChunk.c \ 14 14 ppMergeLoop_Threaded.c \ 15 ppMerge ThreadLauncher.c \15 ppMergeSetThreads.c \ 16 16 ppMergeMask.c 17 17 -
branches/eam_branch_20080719/ppMerge/src/ppMerge.h
r18818 r18822 105 105 void *ppMergeThreadLauncher (void *data); 106 106 107 bool ppMergeSetThreads (); 108 107 109 #endif -
branches/eam_branch_20080719/ppMerge/src/ppMergeArguments.c
r18759 r18822 171 171 } 172 172 173 # if (THREADED)174 173 // Number of threads 175 174 if ((argnum = psArgumentGet(argc, argv, "-threads"))) { … … 181 180 // create the thread pool with number of desired threads, supplying our thread launcher function 182 181 // XXX need to determine the number of threads from the config data 183 psThreadPoolInit (nThreads , &ppMergeThreadLauncher);184 } 185 # endif 182 psThreadPoolInit (nThreads); 183 } 184 ppMergeSetThreads(); 186 185 187 186 if (argc == 1 || !psArgumentParse(arguments, &argc, argv) || argc != 3) { -
branches/eam_branch_20080719/ppMerge/src/ppMergeLoop_Threaded.c
r18815 r18822 197 197 198 198 // Read input data by chunks 199 psTimerStart ("ppMergeLoop");199 // psTimerStart ("ppMergeLoop"); 200 200 for (int numChunk = 0; true; numChunk++) { 201 201 … … 205 205 if (!fileGroup) break; 206 206 207 psThreadJob *job = NULL; 208 207 209 switch (type) { 208 210 case PPMERGE_TYPE_SHUTTER: 209 if (nThreads) { 210 // allocate a job 211 psThreadJob *job = psThreadJobAlloc ("PPMERGE_SHUTTER_CORRECTION", 0); 212 213 // construct the arguments for this job 214 psArrayAdd (job->args, 1, outRO); 215 psArrayAdd (job->args, 1, fileGroup); 216 psArrayAdd (job->args, 1, psScalarAlloc(shutterRef, PS_TYPE_F32)); 217 psArrayAdd (job->args, 1, shutters->data[cellNum]); 218 psArrayAdd (job->args, 1, psScalarAlloc(iter, PS_TYPE_S32)); 219 psArrayAdd (job->args, 1, psScalarAlloc(rej, PS_TYPE_F32)); 220 psArrayAdd (job->args, 1, psScalarAlloc(maskVal, PS_TYPE_U8)); 221 222 psThreadJobAddPending (job); 223 } else { 224 if (!pmShutterCorrectionGenerate(outRO, NULL, fileGroup->readouts, shutterRef, shutters->data[cellNum], iter, rej, maskVal)) { 225 goto ERROR; 226 } 227 fileGroup->busy = false; 211 // allocate a job 212 job = psThreadJobAlloc ("PPMERGE_SHUTTER_CORRECTION"); 213 214 // construct the arguments for this job 215 psArrayAdd (job->args, 1, outRO); 216 psArrayAdd (job->args, 1, fileGroup); 217 psArrayAdd (job->args, 1, psScalarAlloc(shutterRef, PS_TYPE_F32)); 218 psArrayAdd (job->args, 1, shutters->data[cellNum]); 219 psArrayAdd (job->args, 1, psScalarAlloc(iter, PS_TYPE_S32)); 220 psArrayAdd (job->args, 1, psScalarAlloc(rej, PS_TYPE_F32)); 221 psArrayAdd (job->args, 1, psScalarAlloc(maskVal, PS_TYPE_U8)); 222 223 // call: pmShutterCorrectionGenerate(outRO, NULL, fileGroup->readouts, shutterRef, shutters->data[cellNum], iter, rej, maskVal) 224 if (!psThreadJobAddPending (job)) { 225 goto ERROR; 228 226 } 229 227 break; 230 228 case PPMERGE_TYPE_DARK: 231 if (nThreads) { 232 // allocate a job 233 psThreadJob *job = psThreadJobAlloc ("PPMERGE_DARK_COMBINE", 0); 234 235 // construct the arguments for this job 236 psArrayAdd (job->args, 1, outCell); 237 psArrayAdd (job->args, 1, fileGroup); 238 psArrayAdd (job->args, 1, darkOrdinates); 239 psArrayAdd (job->args, 1, darkNorm); 240 psArrayAdd (job->args, 1, psScalarAlloc(iter, PS_TYPE_S32)); 241 psArrayAdd (job->args, 1, psScalarAlloc(rej, PS_TYPE_F32)); 242 psArrayAdd (job->args, 1, psScalarAlloc(maskVal, PS_TYPE_U8)); 243 244 psThreadJobAddPending (job); 245 } else { 246 if (!pmDarkCombine(outCell, fileGroup->readouts, darkOrdinates, darkNorm, iter, rej, maskVal)) { 247 goto ERROR; 248 } 249 fileGroup->busy = false; 229 // allocate a job 230 job = psThreadJobAlloc ("PPMERGE_DARK_COMBINE"); 231 232 // construct the arguments for this job 233 psArrayAdd (job->args, 1, outCell); 234 psArrayAdd (job->args, 1, fileGroup); 235 psArrayAdd (job->args, 1, darkOrdinates); 236 psArrayAdd (job->args, 1, darkNorm); 237 psArrayAdd (job->args, 1, psScalarAlloc(iter, PS_TYPE_S32)); 238 psArrayAdd (job->args, 1, psScalarAlloc(rej, PS_TYPE_F32)); 239 psArrayAdd (job->args, 1, psScalarAlloc(maskVal, PS_TYPE_U8)); 240 241 // call: pmDarkCombine(outCell, fileGroup->readouts, darkOrdinates, darkNorm, iter, rej, maskVal); 242 if (!psThreadJobAddPending (job)) { 243 goto ERROR; 250 244 } 251 245 break; … … 253 247 case PPMERGE_TYPE_FLAT: 254 248 case PPMERGE_TYPE_FRINGE: 255 if (nThreads) { 256 // allocate a job 257 psThreadJob *job = psThreadJobAlloc ("PPMERGE_READOUT_COMBINE", 0); 258 259 // construct the arguments for this job 260 psArrayAdd (job->args, 1, outRO); 261 psArrayAdd (job->args, 1, fileGroup); 262 psArrayAdd (job->args, 1, zeros); 263 psArrayAdd (job->args, 1, scales); 264 psArrayAdd (job->args, 1, combination); 265 266 psThreadJobAddPending (job); 267 } else { 268 if (!pmReadoutCombine(outRO, fileGroup->readouts, zeros, scales, combination)) { 269 goto ERROR; 270 } 271 fileGroup->busy = false; 249 // allocate a job 250 job = psThreadJobAlloc ("PPMERGE_READOUT_COMBINE"); 251 252 // construct the arguments for this job 253 psArrayAdd (job->args, 1, outRO); 254 psArrayAdd (job->args, 1, fileGroup); 255 psArrayAdd (job->args, 1, zeros); 256 psArrayAdd (job->args, 1, scales); 257 psArrayAdd (job->args, 1, combination); 258 259 // call: pmReadoutCombine(outRO, fileGroup->readouts, zeros, scales, combination); 260 if (!psThreadJobAddPending (job)) { 261 goto ERROR; 272 262 } 273 263 break; … … 278 268 279 269 // wait for the threads to finish and manage results 280 if (nThreads) { 281 // wait here for the threaded jobs to finish 282 if (!psThreadPoolWait ()) { 283 psError(PS_ERR_UNKNOWN, false, "Unable to interpolate image."); 284 return false; 285 } 286 fprintf (stderr, "success for threaded jobs\n"); 287 288 // we don't care about the results, just dump the done queue jobs 289 psThreadJob *job = NULL; 290 while ((job = psThreadJobGetDone()) != NULL) { 291 psFree (job); 292 } 270 if (!psThreadPoolWait ()) { 271 psError(PS_ERR_UNKNOWN, false, "Unable to combine images."); 272 return false; 293 273 } 274 275 // we don't care about the results, just dump the done queue jobs 276 psThreadJob *job = NULL; 277 while ((job = psThreadJobGetDone()) != NULL) { 278 psFree (job); 279 } 280 294 281 psFree(fileGroups); 295 282 … … 308 295 } 309 296 psFree(inCells); 310 fprintf (stdout, "done ppMergeLoop for cell : %f\n", psTimerMark ("ppMergeLoop"));297 // fprintf (stdout, "done ppMergeLoop for cell : %f\n", psTimerMark ("ppMergeLoop")); 311 298 312 299 // Plug supplementary images into their own FPAs -
branches/eam_branch_20080719/psLib/src/sys/psThread.c
r18808 r18822 6 6 #include <stdarg.h> 7 7 #include <unistd.h> 8 #include <string.h> 8 9 9 10 #include "psAssert.h" … … 19 20 static psList *pending = NULL; // queue of pending jobs 20 21 static psList *done = NULL; // queue of done jobs 21 22 22 static psArray *pool = NULL; // array of defined threads 23 static psArray *tasks = NULL; // queue of tasks 24 25 /***** basic thread functions *****/ 23 26 24 27 void psThreadLock () { … … 30 33 pthread_mutex_unlock (&mutex); 31 34 return; 32 }33 34 void psThreadJobFree (psThreadJob *job) {35 36 if (!job) return;37 38 psFree (job->type);39 psFree (job->args);40 return;41 }42 43 // allocate a psThreadJob with nArgs arguments44 psThreadJob *psThreadJobAlloc (char *type, int nArgs) {45 46 psThreadJob *job = (psThreadJob *)psAlloc(sizeof(psThreadJob));47 psMemSetDeallocator(job, (psFreeFunc)psThreadJobFree);48 49 job->type = psStringCopy (type);50 job->args = psArrayAllocEmpty (nArgs);51 return job;52 }53 54 // add a job to the queue of pending jobs55 bool psThreadJobAddPending (psThreadJob *job) {56 57 psThreadLock ();58 if (pending == NULL) {59 pending = psListAlloc(NULL);60 }61 62 psListAdd (pending, PS_LIST_TAIL, job);63 psThreadUnlock ();64 return true;65 }66 67 // this function is not locked -- see thread launder for example68 psThreadJob *psThreadJobGetPending () {69 70 if (!pending) return NULL;71 72 psThreadJob *job = psListGetAndRemove (pending, PS_LIST_HEAD);73 74 // jobs we pull off the pending queue get placed on the done queue75 if (job) {76 if (done == NULL) {77 done = psListAlloc(NULL);78 }79 psListAdd (done, PS_LIST_TAIL, job);80 }81 return job;82 }83 84 // this function is not locked -- see thread launder for example85 psThreadJob *psThreadJobGetDone () {86 87 if (!done) return NULL;88 89 psThreadJob *job = psListGetAndRemove (done, PS_LIST_HEAD);90 return job;91 35 } 92 36 … … 106 50 } 107 51 52 /***** thread job functions *****/ 53 54 void psThreadJobFree (psThreadJob *job) { 55 56 if (!job) return; 57 58 psFree (job->type); 59 psFree (job->args); 60 return; 61 } 62 63 // allocate a psThreadJob of the given type 64 psThreadJob *psThreadJobAlloc (char *type) { 65 66 psThreadJob *job = (psThreadJob *)psAlloc(sizeof(psThreadJob)); 67 psMemSetDeallocator(job, (psFreeFunc)psThreadJobFree); 68 69 job->type = psStringCopy (type); 70 job->args = psArrayAlloc (0); 71 return job; 72 } 73 74 // add a job to the queue of pending jobs 75 bool psThreadJobAddPending (psThreadJob *job) { 76 77 // if we failed to call psThreadPoolInit, or we called it with nThreads == 0, 78 // find the matching function and just run it. 79 if (!pool || !pool->n) { 80 81 // in non-threaded operation, the job is placed on the done list and immediately run 82 if (done == NULL) { 83 done = psListAlloc(NULL); 84 } 85 psListAdd (done, PS_LIST_TAIL, job); 86 87 // find the corresponding task and run it 88 for (int i = 0; i < tasks->n; i++) { 89 psThreadTask *task = tasks->data[i]; 90 if (strcmp (job->type, task->type)) continue; 91 92 psAssert (job->args->n == task->nArgs, "invalid number of arguments to %s", task->type); 93 94 bool status = task->function(job); 95 return status; 96 } 97 return false; 98 } 99 100 psThreadLock (); 101 if (pending == NULL) { 102 pending = psListAlloc(NULL); 103 } 104 105 psListAdd (pending, PS_LIST_TAIL, job); 106 psThreadUnlock (); 107 return true; 108 } 109 110 // this function is not locked -- see thread launder for example 111 psThreadJob *psThreadJobGetPending () { 112 113 if (!pending) return NULL; 114 115 psThreadJob *job = psListGetAndRemove (pending, PS_LIST_HEAD); 116 117 // jobs we pull off the pending queue get placed on the done queue 118 if (job) { 119 if (done == NULL) { 120 done = psListAlloc(NULL); 121 } 122 psListAdd (done, PS_LIST_TAIL, job); 123 } 124 return job; 125 } 126 127 // this function is not locked -- see thread launder for example 128 psThreadJob *psThreadJobGetDone () { 129 130 if (!done) return NULL; 131 132 psThreadJob *job = psListGetAndRemove (done, PS_LIST_HEAD); 133 return job; 134 } 135 136 /***** thread task functions *****/ 137 138 void psThreadTaskFree (psThreadTask *task) { 139 140 if (!task) return; 141 142 psFree (task->type); 143 return; 144 } 145 146 // allocate a psThreadTask with nArgs arguments 147 psThreadTask *psThreadTaskAlloc (char *type, int nArgs) { 148 149 psThreadTask *task = (psThreadTask *)psAlloc(sizeof(psThreadTask)); 150 psMemSetDeallocator(task, (psFreeFunc)psThreadTaskFree); 151 152 task->type = psStringCopy (type); 153 task->nArgs = nArgs; 154 task->function = NULL; 155 return task; 156 } 157 158 // add a task to the collection of tasks 159 bool psThreadTaskAdd (psThreadTask *task) { 160 161 if (tasks == NULL) { 162 tasks = psArrayAllocEmpty(8); 163 } 164 165 psArrayAdd (tasks, 1, task); 166 return true; 167 } 168 169 // each thread runs this function to choose the task functions 170 void *psThreadLauncher (void *data) { 171 172 psThread *self = data; 173 psThreadJob *job = NULL; 174 175 while (1) { 176 177 // if we get an error, just wait until we are cleared or killed 178 while (self->fault) { 179 usleep (10000); 180 } 181 182 // if no tasks are assigned, just wait until they are 183 while (tasks == NULL) { 184 usleep (10000); 185 } 186 187 // request a new job, if there are none available, sleep a bit 188 // we have to lock here so the job queue cannot be empty yet no threads busy 189 psThreadLock(); 190 while ((job = psThreadJobGetPending ()) == NULL) { 191 psThreadUnlock(); 192 usleep (10000); 193 psThreadLock(); // XXX ??? 194 } 195 self->busy = true; 196 197 for (int i = 0; i < tasks->n; i++) { 198 psThreadTask *task = tasks->data[i]; 199 if (strcmp (job->type, task->type)) continue; 200 201 psThreadUnlock(); 202 psAssert (job->args->n == task->nArgs, "invalid number of arguments to %s", task->type); 203 204 bool status = task->function(job); 205 if (!status) { 206 self->fault = true; 207 } 208 self->busy = false; 209 break; 210 } 211 psThreadUnlock(); 212 // XXX what do we do if the job is unknown? 213 } 214 } 215 216 /***** thread pool functions *****/ 217 108 218 // create a pool of Nthreads, each running the user's job-launcher function 109 bool psThreadPoolInit (int nThreads , psThreadLaunchJobsFunction function) {219 bool psThreadPoolInit (int nThreads) { 110 220 111 221 if (pool) psAbort ("psThreadsInit already called"); … … 114 224 for (int i = 0; i < nThreads; i++) { 115 225 psThread *thread = psThreadAlloc(); 116 pthread_create (&thread->pt, NULL, function, thread);226 pthread_create (&thread->pt, NULL, psThreadLauncher, thread); 117 227 pool->data[i] = thread; 118 228 } … … 126 236 // an error is detected on one of the threads or until 127 237 // all threads are idle and no jobs are left on the queue 238 239 if (!pool) return true; 240 if (!pool->n) return true; 128 241 129 242 while (1) { … … 169 282 pool = NULL; 170 283 171 return true; 172 } 173 284 psFree (tasks); 285 tasks = NULL; 286 287 return true; 288 } 289 -
branches/eam_branch_20080719/psLib/src/sys/psThread.h
r18754 r18822 4 4 * 5 5 * @author EAM, IFA 6 * @version $Revision: 1.1.2. 3$ $Name: not supported by cvs2svn $7 * @date $Date: 2008-07- 27 22:42:32 $6 * @version $Revision: 1.1.2.4 $ $Name: not supported by cvs2svn $ 7 * @date $Date: 2008-07-31 23:24:22 $ 8 8 * 9 9 * Copyright 2004-2005 Insitute for Astronomy, University of Hawaii … … 27 27 } psThread; 28 28 29 typedef void *(*psThreadLaunchJobsFunction)(void *data); 29 typedef bool (*psThreadTaskFunction)(psThreadJob *job); 30 31 typedef struct { 32 psString type; 33 int nArgs; 34 psThreadTaskFunction function; 35 } psThreadTask; 36 37 // typedef void *(*psThreadLaunchJobsFunction)(void *data); 30 38 31 39 void psThreadLock (); 32 40 void psThreadUnlock (); 33 41 34 psThreadJob *psThreadJobAlloc (char *type, int nArgs); 42 psThread *psThreadAlloc (); 43 44 psThreadJob *psThreadJobAlloc (char *type); 35 45 bool psThreadJobAddPending (psThreadJob *job); 36 46 psThreadJob *psThreadJobGetPending (); 37 47 psThreadJob *psThreadJobGetDone (); 38 48 39 psThread *psThreadAlloc (); 49 psThreadTask *psThreadTaskAlloc (char *type, int nArgs); 50 bool psThreadTaskAdd (psThreadTask *task); 51 void *psThreadLauncher (void *data); 40 52 41 bool psThreadPoolInit (int nThreads , psThreadLaunchJobsFunction function);53 bool psThreadPoolInit (int nThreads); 42 54 bool psThreadPoolWait (); 43 55 bool psThreadPoolFinalize (); -
branches/eam_branch_20080719/pswarp/src/Makefile.am
r18753 r18822 16 16 pswarpPixelFraction.c \ 17 17 pswarpSetMaskBits.c \ 18 pswarp ThreadLauncher.c \18 pswarpSetThreads.c \ 19 19 pswarpTransformReadout.c \ 20 20 pswarpTransformSources.c \ -
branches/eam_branch_20080719/pswarp/src/pswarp.h
r18753 r18822 90 90 ); 91 91 92 // thread launcherfor this program93 void *pswarpThreadLauncher (void *data);92 // define threads for this program 93 bool pswarpSetThreads (); -
branches/eam_branch_20080719/pswarp/src/pswarpArguments.c
r18753 r18822 50 50 // create the thread pool with number of desired threads, supplying our thread launcher function 51 51 // XXX need to determine the number of threads from the config data 52 psThreadPoolInit (nThreads , &pswarpThreadLauncher);52 psThreadPoolInit (nThreads); 53 53 } 54 pswarpSetThreads (); 54 55 55 56 // PSF determination? -
branches/eam_branch_20080719/pswarp/src/pswarpTransformReadout.c
r18753 r18822 71 71 args->goodPixels = 0; 72 72 73 if (nThreads) { 74 // allocate a job 75 psThreadJob *job = psThreadJobAlloc ("PSWARP_TRANSFORM_TILE", 0); 73 // allocate a job 74 psThreadJob *job = psThreadJobAlloc ("PSWARP_TRANSFORM_TILE"); 76 75 77 // construct the arguments for this job 78 // job is pswarpTransformTile (gridX, gridY); 79 psArrayAdd (job->args, 1, args); 80 // fprintf (stderr, "adding job %d,%d, Nargs: %ld\n", gridX, gridY, job->args->n); 81 psThreadJobAddPending (job); 82 } else { 83 pswarpTransformTile (args); 84 goodPixels += args->goodPixels; 76 // construct the arguments for this job 77 // job is pswarpTransformTile (gridX, gridY); 78 psArrayAdd (job->args, 1, args); 79 // fprintf (stderr, "adding job %d,%d, Nargs: %ld\n", gridX, gridY, job->args->n); 80 81 // call: pswarpTransformTile (args); 82 if (!psThreadJobAddPending (job)) { 83 psError(PS_ERR_UNKNOWN, false, "Unable to warp image."); 84 return false; 85 85 } 86 86 psFree (args); … … 89 89 90 90 // wait for the threads to finish and manage results 91 if (nThreads) { 92 // wait here for the threaded jobs to finish 93 if (!psThreadPoolWait ()) { 94 psError(PS_ERR_UNKNOWN, false, "Unable to interpolate image."); 95 return false; 91 // wait here for the threaded jobs to finish 92 if (!psThreadPoolWait ()) { 93 psError(PS_ERR_UNKNOWN, false, "Unable to interpolate image."); 94 return false; 95 } 96 fprintf (stderr, "success for threaded jobs\n"); 97 98 // each job records its own goodPixel values; sum them here 99 // we have only supplied one type of job, so we can assume the types here 100 psThreadJob *job = NULL; 101 while ((job = psThreadJobGetDone()) != NULL) { 102 if (job->args->n < 1) { 103 fprintf (stderr, "error with job\n"); 104 } else { 105 pswarpTransformTileArgs *args = job->args->data[0]; 106 // fprintf (stderr, "finished job %d,%d, Nargs: %ld\n", args->gridX, args->gridY, job->args->n); 107 goodPixels += args->goodPixels; 96 108 } 97 fprintf (stderr, "success for threaded jobs\n"); 98 99 // each job records its own goodPixel values; sum them here 100 // we have only supplied one type of job, so we can assume the types here 101 psThreadJob *job = NULL; 102 while ((job = psThreadJobGetDone()) != NULL) { 103 if (job->args->n < 1) { 104 fprintf (stderr, "error with job\n"); 105 } else { 106 pswarpTransformTileArgs *args = job->args->data[0]; 107 // fprintf (stderr, "finished job %d,%d, Nargs: %ld\n", args->gridX, args->gridY, job->args->n); 108 goodPixels += args->goodPixels; 109 } 110 psFree (job); 111 } 109 psFree (job); 112 110 } 113 111 psFree(interp);
Note:
See TracChangeset
for help on using the changeset viewer.
