Changeset 18967
- Timestamp:
- Aug 8, 2008, 8:17:12 AM (18 years ago)
- Location:
- trunk
- Files:
-
- 7 edited
-
ppMerge/src/ppMerge.h (modified) (3 diffs)
-
ppMerge/src/ppMergeArguments.c (modified) (2 diffs)
-
ppMerge/src/ppMergeCamera.c (modified) (5 diffs)
-
ppMerge/src/ppMergeLoop_Threaded.c (modified) (8 diffs)
-
ppMerge/src/ppMergeReadChunk.c (modified) (1 diff)
-
ppMerge/src/ppMergeSetThreads.c (modified) (5 diffs)
-
pswarp/src/pswarpTransformReadout.c (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
-
trunk/ppMerge/src/ppMerge.h
r18839 r18967 15 15 #define TIMERNAME "ppMerge" // Name for timer 16 16 #define PPMERGE_RECIPE "PPMERGE" // Recipe name 17 #define THREADED 1 17 #define THREADED 1 // Compile with threads? 18 18 19 19 // Type of frame to merge … … 35 35 } ppMergeFiles; 36 36 37 // Group of files to read 38 // 39 // Each file contributes a readout, into which is read a chunk from that file 37 40 typedef struct { 38 psArray *readouts; 39 bool read; 40 bool busy; 41 int firstScan; 42 int lastScan; 41 psArray *readouts; // Input readouts 42 bool read; // Has the scan been read? 43 bool busy; // Is the scan being processed? 44 int firstScan; // First row of the chunk to be read for this group 45 int lastScan; // Last row of the chunk to be read for this group 43 46 } ppMergeFileGroup; 44 47 … … 101 104 102 105 103 ppMergeFileGroup *ppMergeFileGroupAlloc(); 104 ppMergeFileGroup *ppMergeReadChunk (bool *status, psArray *fileGroups, pmConfig *config, int numChunk); 105 void *ppMergeThreadLauncher (void *data); 106 // Allocator for group of files 107 ppMergeFileGroup *ppMergeFileGroupAlloc(void); 106 108 107 bool ppMergeSetThreads (); 109 // Read chunk into the first available file group 110 ppMergeFileGroup *ppMergeReadChunk(bool *status, // Status of read 111 psArray *fileGroups, // All groups 112 pmConfig *config, // Configuration 113 int numChunk // Chunk number (only for interest) 114 ); 115 116 // Set up thread handling 117 bool ppMergeSetThreads(void); 108 118 109 119 #endif -
trunk/ppMerge/src/ppMergeArguments.c
r18839 r18967 174 174 if ((argnum = psArgumentGet(argc, argv, "-threads"))) { 175 175 psArgumentRemove(argnum, &argc, argv); 176 int nThreads = atoi(argv[argnum]);176 int nThreads = atoi(argv[argnum]); 177 177 psMetadataAddS32(config->arguments, PS_LIST_TAIL, "NTHREADS", 0, "number of warp threads", nThreads); 178 178 psArgumentRemove(argnum, &argc, argv); 179 179 180 // create the thread pool with number of desired threads, supplying our thread launcher function181 // XXX need to determine the number of threads from the config data182 psThreadPoolInit (nThreads);180 // create the thread pool with number of desired threads, supplying our thread launcher function 181 // XXX need to determine the number of threads from the config data 182 psThreadPoolInit (nThreads); 183 183 } 184 184 ppMergeSetThreads(); … … 379 379 #endif 380 380 381 psFree(arguments); 381 382 return true; 382 383 -
trunk/ppMerge/src/ppMergeCamera.c
r18756 r18967 31 31 if (! pmFPASelectChip(fpa, chipNum, false)) { 32 32 psError(PS_ERR_IO, false, "Chip number %d doesn't exist in camera.\n", chipNum); 33 psFree (chips);33 psFree (chips); 34 34 return false; 35 35 } … … 43 43 psArray *cells = psStringSplitArray (cellLine, ",", false); 44 44 if (cells->n > 0) { 45 for (int i = 0; i < fpa->chips->n; i++) {46 pmChip *chip = fpa->chips->data[i];47 pmChipSelectCell (chip, -1, true); // deselect all cells48 for (int j = 0; j < cells->n; j++) {49 int cellNum = atoi(cells->data[j]);50 if (! pmChipSelectCell(chip, cellNum, false)) {51 psError(PS_ERR_IO, false, "Cell number %d doesn't exist in camera.\n", cellNum);52 psFree (cells);53 return false;54 }55 }56 }45 for (int i = 0; i < fpa->chips->n; i++) { 46 pmChip *chip = fpa->chips->data[i]; 47 pmChipSelectCell (chip, -1, true); // deselect all cells 48 for (int j = 0; j < cells->n; j++) { 49 int cellNum = atoi(cells->data[j]); 50 if (! pmChipSelectCell(chip, cellNum, false)) { 51 psError(PS_ERR_IO, false, "Cell number %d doesn't exist in camera.\n", cellNum); 52 psFree (cells); 53 return false; 54 } 55 } 56 } 57 57 } 58 58 psFree (cells); … … 259 259 cell->file_exists = false; 260 260 culled++; 261 if (cell->concepts) {262 psFree(cell->concepts);263 cell->concepts = NULL;264 }261 if (cell->concepts) { 262 psFree(cell->concepts); 263 cell->concepts = NULL; 264 } 265 265 } 266 266 } … … 268 268 chip->data_exists = false; 269 269 chip->file_exists = false; 270 if (chip->concepts) {271 psFree(chip->concepts);272 chip->concepts = NULL;273 }270 if (chip->concepts) { 271 psFree(chip->concepts); 272 chip->concepts = NULL; 273 } 274 274 } 275 275 } … … 298 298 } 299 299 300 // Output image301 pmFPA *fpa = pmFPAConstruct(config->camera, config->cameraName); // FPA to contain the output302 if (!fpa) {303 psError(PS_ERR_UNEXPECTED_NULL, false, "Unable to construct an FPA from camera configuration.");304 psFree(phuView);305 return false;306 }307 308 300 psString outName = ppMergeOutputFile(config); // Name of output file 309 301 -
trunk/ppMerge/src/ppMergeLoop_Threaded.c
r18862 r18967 12 12 #include "ppMerge.h" 13 13 14 // XXX this function is now sufficiently different for the major types, it would make sense to just 14 // XXX this function is now sufficiently different for the major types, it would make sense to just 15 15 // split it into three: BASIC, SHUTTER, DARK 16 16 … … 57 57 psMaskType markVal; 58 58 if (!pmConfigMaskSetBits (&maskVal, &markVal, config)) { 59 psError (PS_ERR_UNKNOWN, true, "Unable to define the mask bit values");60 return false;59 psError (PS_ERR_UNKNOWN, true, "Unable to define the mask bit values"); 60 return false; 61 61 } 62 62 … … 151 151 if (type == PPMERGE_TYPE_SHUTTER) { 152 152 shutterRef = pmShutterCorrectionReference(shutters->data[cellNum]); 153 pattern = pmReadoutAlloc(NULL);153 pattern = pmReadoutAlloc(NULL); 154 154 } 155 155 156 156 pmReadout *outRO = pmReadoutAlloc(outCell); 157 157 158 // open the input files (we need to do the work ourselves)158 // open the input files (we need to do the work ourselves) 159 159 for (int i = 0; i < numFiles; i++) { 160 160 if (!ppMergeFileOpenInput(config, view, i)) { … … 162 162 goto ERROR; 163 163 } 164 } 165 166 ppMergeFileGroup *fileGroup = NULL; 167 psArray *fileGroups = psArrayAlloc (nThreads + 1); 168 169 // generate readouts for each input file in each file group 170 for (int i = 0; i < fileGroups->n; i++) { 171 psArray *readouts = psArrayAlloc(numFiles); // Input readouts 172 for (int j = 0; j < numFiles; j++) { 173 pmFPAfile *input = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT", i); 174 pmCell *inCell = pmFPAviewThisCell(view, input->fpa); // Input cell 175 readouts->data[j] = pmReadoutAlloc(inCell); 176 } 177 178 fileGroup = ppMergeFileGroupAlloc(); 179 fileGroup->readouts = readouts; 180 fileGroup->read = false; 181 fileGroup->busy = false; 182 fileGroup->lastScan = 0; 183 fileGroup->firstScan = 0; 184 fileGroups->data[i] = fileGroup; 185 } 186 187 // call the init functions 188 switch (type) { 189 case PPMERGE_TYPE_BIAS: 190 case PPMERGE_TYPE_FLAT: 191 case PPMERGE_TYPE_FRINGE: 192 psAssert (fileGroups->n > 0, "no valid file groups defined"); 193 fileGroup = fileGroups->data[0]; 194 if (!pmReadoutCombinePrepare(outRO, fileGroup->readouts, combination)) { 195 goto ERROR; 196 } 197 break; 198 case PPMERGE_TYPE_DARK: 199 psAssert (fileGroups->n > 0, "no valid file groups defined"); 200 fileGroup = fileGroups->data[0]; 201 if (!pmDarkCombinePrepare(outCell, fileGroup->readouts, darkOrdinates, darkNorm)) { 202 goto ERROR; 203 } 204 break; 205 case PPMERGE_TYPE_SHUTTER: 206 psAssert (fileGroups->n > 0, "no valid file groups defined"); 207 fileGroup = fileGroups->data[0]; 208 if (!pmShutterCorrectionGeneratePrepare(outRO, pattern, fileGroup->readouts, maskVal)) { 209 goto ERROR; 210 } 211 break; 212 default: 213 fprintf (stderr, "not yet ready"); 214 goto ERROR; 215 } 164 } 165 166 ppMergeFileGroup *fileGroup = NULL; 167 psArray *fileGroups = psArrayAlloc(nThreads + 1); 168 169 // Generate readouts for each input file in each file group 170 for (int i = 0; i < fileGroups->n; i++) { 171 psArray *readouts = psArrayAlloc(numFiles); // Input readouts 172 for (int j = 0; j < numFiles; j++) { 173 pmFPAfile *input = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT", i); 174 pmCell *inCell = pmFPAviewThisCell(view, input->fpa); // Input cell 175 readouts->data[j] = pmReadoutAlloc(inCell); 176 } 177 178 fileGroup = ppMergeFileGroupAlloc(); 179 fileGroup->readouts = readouts; 180 fileGroup->read = false; 181 fileGroup->busy = false; 182 fileGroup->lastScan = 0; 183 fileGroup->firstScan = 0; 184 fileGroups->data[i] = fileGroup; 185 } 186 187 // call the init functions 188 switch (type) { 189 case PPMERGE_TYPE_BIAS: 190 case PPMERGE_TYPE_FLAT: 191 case PPMERGE_TYPE_FRINGE: 192 psAssert (fileGroups->n > 0, "no valid file groups defined"); 193 fileGroup = fileGroups->data[0]; 194 if (!pmReadoutCombinePrepare(outRO, fileGroup->readouts, combination)) { 195 goto ERROR; 196 } 197 break; 198 case PPMERGE_TYPE_DARK: 199 psAssert (fileGroups->n > 0, "no valid file groups defined"); 200 fileGroup = fileGroups->data[0]; 201 if (!pmDarkCombinePrepare(outCell, fileGroup->readouts, darkOrdinates, darkNorm)) { 202 goto ERROR; 203 } 204 break; 205 case PPMERGE_TYPE_SHUTTER: 206 psAssert (fileGroups->n > 0, "no valid file groups defined"); 207 fileGroup = fileGroups->data[0]; 208 if (!pmShutterCorrectionGeneratePrepare(outRO, pattern, fileGroup->readouts, maskVal)) { 209 goto ERROR; 210 } 211 break; 212 default: 213 psAbort("Should never get here."); 214 } 216 215 217 216 // Read input data by chunks 218 psTimerStart("ppMergeLoop");217 psTimerStart("ppMergeLoop"); 219 218 for (int numChunk = 0; true; numChunk++) { 220 219 221 bool status = false; 222 fileGroup = ppMergeReadChunk (&status, fileGroups, config, numChunk); 223 if (!status) goto ERROR; 224 if (!fileGroup) break; 225 226 psThreadJob *job = NULL; 227 220 bool status = false; 221 fileGroup = ppMergeReadChunk(&status, fileGroups, config, numChunk); 222 if (!status) { 223 // Something went wrong 224 goto ERROR; 225 } 226 if (!fileGroup) { 227 // Nothing more to read 228 break; 229 } 230 231 // Start a job 228 232 switch (type) { 229 233 case PPMERGE_TYPE_BIAS: 230 234 case PPMERGE_TYPE_FLAT: 231 case PPMERGE_TYPE_FRINGE: 232 // allocate a job 233 job = psThreadJobAlloc ("PPMERGE_READOUT_COMBINE"); 234 235 // construct the arguments for this job 236 psArrayAdd (job->args, 1, outRO); 237 psArrayAdd (job->args, 1, fileGroup); 238 psArrayAdd (job->args, 1, zeros); 239 psArrayAdd (job->args, 1, scales); 240 psArrayAdd (job->args, 1, combination); 241 242 // call: pmReadoutCombine(outRO, fileGroup->readouts, zeros, scales, combination); 243 if (!psThreadJobAddPending (job)) { 244 goto ERROR; 245 } 246 break; 247 case PPMERGE_TYPE_DARK: 248 // allocate a job 249 job = psThreadJobAlloc ("PPMERGE_DARK_COMBINE"); 250 251 // construct the arguments for this job 252 psArrayAdd (job->args, 1, outCell); 253 psArrayAdd (job->args, 1, fileGroup); 254 psArrayAdd (job->args, 1, psScalarAlloc(iter, PS_TYPE_S32)); 255 psArrayAdd (job->args, 1, psScalarAlloc(rej, PS_TYPE_F32)); 256 psArrayAdd (job->args, 1, psScalarAlloc(maskVal, PS_TYPE_U8)); 257 258 // call: pmDarkCombine(outCell, fileGroup->readouts, iter, rej, maskVal); 259 if (!psThreadJobAddPending (job)) { 260 goto ERROR; 261 } 262 break; 263 case PPMERGE_TYPE_SHUTTER: 264 // allocate a job 265 job = psThreadJobAlloc ("PPMERGE_SHUTTER_CORRECTION"); 266 267 // construct the arguments for this job 268 psArrayAdd (job->args, 1, outRO); 269 psArrayAdd (job->args, 1, pattern); 270 psArrayAdd (job->args, 1, fileGroup); 271 psArrayAdd (job->args, 1, psScalarAlloc(shutterRef, PS_TYPE_F32)); 272 psArrayAdd (job->args, 1, shutters->data[cellNum]); 273 psArrayAdd (job->args, 1, psScalarAlloc(iter, PS_TYPE_S32)); 274 psArrayAdd (job->args, 1, psScalarAlloc(rej, PS_TYPE_F32)); 275 psArrayAdd (job->args, 1, psScalarAlloc(maskVal, PS_TYPE_U8)); 276 277 // call: pmShutterCorrectionGenerate(outRO, pattern, fileGroup->readouts, shutterRef, shutters->data[cellNum], iter, rej, maskVal) 278 if (!psThreadJobAddPending (job)) { 279 goto ERROR; 280 } 281 break; 235 case PPMERGE_TYPE_FRINGE: { 236 psThreadJob *job = psThreadJobAlloc("PPMERGE_READOUT_COMBINE"); // Job to start 237 238 // Construct the arguments for this job 239 psArrayAdd(job->args, 1, outRO); 240 psArrayAdd(job->args, 1, fileGroup); 241 psArrayAdd(job->args, 1, zeros); 242 psArrayAdd(job->args, 1, scales); 243 psArrayAdd(job->args, 1, combination); 244 245 // call: pmReadoutCombine(outRO, fileGroup->readouts, zeros, scales, combination); 246 if (!psThreadJobAddPending(job)) { 247 goto ERROR; 248 } 249 break; 250 } 251 case PPMERGE_TYPE_DARK: { 252 psThreadJob *job = psThreadJobAlloc ("PPMERGE_DARK_COMBINE"); // Job to start 253 254 // construct the arguments for this job 255 psArrayAdd(job->args, 1, outCell); 256 psArrayAdd(job->args, 1, fileGroup); 257 psArrayAdd(job->args, 1, psScalarAlloc(iter, PS_TYPE_S32)); 258 psArrayAdd(job->args, 1, psScalarAlloc(rej, PS_TYPE_F32)); 259 psArrayAdd(job->args, 1, psScalarAlloc(maskVal, PS_TYPE_U8)); 260 261 // call: pmDarkCombine(outCell, fileGroup->readouts, iter, rej, maskVal); 262 if (!psThreadJobAddPending(job)) { 263 goto ERROR; 264 } 265 break; 266 } 267 case PPMERGE_TYPE_SHUTTER: { 268 psThreadJob *job = psThreadJobAlloc ("PPMERGE_SHUTTER_CORRECTION"); 269 270 // construct the arguments for this job 271 psArrayAdd(job->args, 1, outRO); 272 psArrayAdd(job->args, 1, pattern); 273 psArrayAdd(job->args, 1, fileGroup); 274 psArrayAdd(job->args, 1, psScalarAlloc(shutterRef, PS_TYPE_F32)); 275 psArrayAdd(job->args, 1, shutters->data[cellNum]); 276 psArrayAdd(job->args, 1, psScalarAlloc(iter, PS_TYPE_S32)); 277 psArrayAdd(job->args, 1, psScalarAlloc(rej, PS_TYPE_F32)); 278 psArrayAdd(job->args, 1, psScalarAlloc(maskVal, PS_TYPE_U8)); 279 280 // call: pmShutterCorrectionGenerate(outRO, pattern, fileGroup->readouts, shutterRef, 281 // shutters->data[cellNum], iter, rej, maskVal); 282 if (!psThreadJobAddPending (job)) { 283 goto ERROR; 284 } 285 break; 286 } 282 287 default: 283 288 psAbort("Should never get here."); … … 285 290 } 286 291 287 // wait for the threads to finish and manage results288 if (!psThreadPoolWait ()) {289 psError(PS_ERR_UNKNOWN, false, "Unable to combine images.");290 return false;291 }292 293 // we don't care about the results, just dump the done queue jobs294 psThreadJob *job = NULL; 295 while ((job = psThreadJobGetDone()) != NULL) {296 psFree (job);297 }292 // Wait for the threads to finish and manage results 293 if (!psThreadPoolWait(false)) { 294 psError(PS_ERR_UNKNOWN, false, "Unable to combine images."); 295 return false; 296 } 297 298 // we don't care about the results, just dump the done queue jobs 299 psThreadJob *job = NULL; // Job to dump 300 while ((job = psThreadJobGetDone())) { 301 psFree (job); 302 } 298 303 299 304 psFree(fileGroups); 300 305 301 // XXX eventually need to keep both the shutter and the pattern, as we do with dark 306 // XXX eventually need to keep both the shutter and the pattern, as we do with dark 302 307 psFree(pattern); 303 308 … … 305 310 psList *inCells = psListAlloc(NULL); // List of cells 306 311 for (int i = 0; i < numFiles; i++) { 307 pmFPAfile *input = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT", i);308 pmCell *inCell = pmFPAviewThisCell(view, input->fpa); // Input cell312 pmFPAfile *input = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT", i); 313 pmCell *inCell = pmFPAviewThisCell(view, input->fpa); // Input cell 309 314 psListAdd(inCells, PS_LIST_TAIL, inCell); 310 315 } … … 316 321 } 317 322 psFree(inCells); 318 fprintf (stdout, "done ppMergeLoop for cell : %f\n", psTimerMark ("ppMergeLoop"));319 323 320 324 // Plug supplementary images into their own FPAs … … 364 368 } 365 369 366 if ( !ppStatsFPA(stats, outFPA, view, maskVal, config)) {370 if (stats && !ppStatsFPA(stats, outFPA, view, maskVal, config)) { 367 371 psError(PS_ERR_UNKNOWN, true, "Unable to generate stats for image."); 368 372 goto ERROR; -
trunk/ppMerge/src/ppMergeReadChunk.c
r18839 r18967 1 1 # include "ppMerge.h" 2 2 3 ppMergeFileGroup *ppMergeReadChunk (bool *status, psArray *fileGroups, pmConfig *config, int numChunk) { 3 #define THREAD_WAIT 10000 // Microseconds to wait if thread is not available 4 4 5 ppMergeFileGroup *ppMergeReadChunk(bool *status, psArray *fileGroups, pmConfig *config, int numChunk) 6 { 5 7 *status = true; 6 8 7 9 bool mdok; 8 10 bool haveMasks = psMetadataLookupBool(&mdok, config->arguments, "INPUTS.MASKS"); // Do we have masks? 9 bool haveWeights = psMetadataLookupBool(&mdok, config->arguments, "INPUTS.WEIGHTS"); // Do we have weights?11 bool haveWeights = psMetadataLookupBool(&mdok, config->arguments, "INPUTS.WEIGHTS");// Do we have weights? 10 12 int rows = psMetadataLookupS32(NULL, config->arguments, "ROWS"); // Number of rows to read per chunk 11 13 12 14 // select an available fileGroup 13 15 while (1) { 14 // check for any fileGroups which can read data15 for (int j = 0; j < fileGroups->n; j++) {16 ppMergeFileGroup *fileGroup = fileGroups->data[j];17 if (fileGroup->read) continue;16 // check for any fileGroups which can read data 17 for (int j = 0; j < fileGroups->n; j++) { 18 ppMergeFileGroup *fileGroup = fileGroups->data[j]; 19 if (fileGroup->read) continue; 18 20 19 // find max last scan so far20 int lastScan = 0;21 for (int i = 0; i < fileGroups->n; i++) {22 ppMergeFileGroup *fileGroup = fileGroups->data[i];23 lastScan = PS_MAX(fileGroup->lastScan, lastScan);24 }25 fileGroup->firstScan = lastScan;26 fileGroup->lastScan = lastScan + rows;21 // find max last scan so far 22 int lastScan = 0; 23 for (int i = 0; i < fileGroups->n; i++) { 24 ppMergeFileGroup *fileGroup = fileGroups->data[i]; 25 lastScan = PS_MAX(fileGroup->lastScan, lastScan); 26 } 27 fileGroup->firstScan = lastScan; 28 fileGroup->lastScan = lastScan + rows; 27 29 28 psArray *readouts = fileGroup->readouts;30 psArray *readouts = fileGroup->readouts; 29 31 30 psTimerStart ("ppMergeReadChunk");32 psTimerStart ("ppMergeReadChunk"); 31 33 32 psTrace("ppStack", 2, "Reading data for chunk %d into fileGroup %d....n", numChunk, j);33 for (int i = 0; i < readouts->n; i++) {34 pmReadout *inRO = readouts->data[i]; // Input readout34 psTrace("ppStack", 2, "Reading data for chunk %d into fileGroup %d....n", numChunk, j); 35 for (int i = 0; i < readouts->n; i++) { 36 pmReadout *inRO = readouts->data[i]; // Input readout 35 37 36 // override the recorded last scan37 inRO->thisImageScan = fileGroup->firstScan;38 inRO->thisWeightScan = fileGroup->firstScan;39 inRO->thisMaskScan = fileGroup->firstScan;38 // override the recorded last scan 39 inRO->thisImageScan = fileGroup->firstScan; 40 inRO->thisWeightScan = fileGroup->firstScan; 41 inRO->thisMaskScan = fileGroup->firstScan; 40 42 41 // Read a chunk from a file42 pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT", i); 43 // Read a chunk from a file 44 pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT", i); 43 45 44 bool keepReading = false; 45 if (pmReadoutMore(inRO, file->fits, 0, rows, config)) { 46 keepReading = true; 47 if (!pmReadoutReadChunk(inRO, file->fits, 0, rows, 0, config)) { 48 psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT %d", numChunk, i); 49 *status = false; 50 return NULL; 51 } 52 } 46 bool keepReading = false; 47 if (pmReadoutMore(inRO, file->fits, 0, rows, config)) { 48 keepReading = true; 49 if (!pmReadoutReadChunk(inRO, file->fits, 0, rows, 0, config)) { 50 psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT %d", 51 numChunk, i); 52 *status = false; 53 return NULL; 54 } 55 } 53 56 54 if (haveMasks && pmReadoutMoreMask(inRO, file->fits, 0, rows, config)) { 55 keepReading = true; 56 pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT.MASK", i); 57 if (!pmReadoutReadChunkMask(inRO, file->fits, 0, rows, 0, config)) { 58 psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT.MASK %d", numChunk, i); 59 *status = false; 60 return NULL; 61 } 62 } 57 if (haveMasks && pmReadoutMoreMask(inRO, file->fits, 0, rows, config)) { 58 keepReading = true; 59 pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT.MASK", i); 60 if (!pmReadoutReadChunkMask(inRO, file->fits, 0, rows, 0, config)) { 61 psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT.MASK %d", 62 numChunk, i); 63 *status = false; 64 return NULL; 65 } 66 } 63 67 64 if (haveWeights && pmReadoutMoreWeight(inRO, file->fits, 0, rows, config)) { 65 keepReading = true; 66 pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT.WEIGHT", i); 67 if (!pmReadoutReadChunkWeight(inRO, file->fits, 0, rows, 0, config)) { 68 psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT.WEIGHT %d", numChunk, i); 69 *status = false; 70 return NULL; 71 } 72 } 73 if (!keepReading) { 74 return NULL; 75 } 76 } 68 if (haveWeights && pmReadoutMoreWeight(inRO, file->fits, 0, rows, config)) { 69 keepReading = true; 70 pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT.WEIGHT", i); 71 if (!pmReadoutReadChunkWeight(inRO, file->fits, 0, rows, 0, config)) { 72 psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT.WEIGHT %d", 73 numChunk, i); 74 *status = false; 75 return NULL; 76 } 77 } 78 if (!keepReading) { 79 return NULL; 80 } 81 } 77 82 78 fileGroup->read = fileGroup->busy = true;79 return fileGroup;80 }83 fileGroup->read = fileGroup->busy = true; 84 return fileGroup; 85 } 81 86 82 // check for any fileGroups which are done processing 83 bool wait = false; 84 for (int j = 0; j < fileGroups->n; j++) { 85 ppMergeFileGroup *fileGroup = fileGroups->data[j]; 86 if (!fileGroup->read || fileGroup->busy) continue; 87 fileGroup->read = false; 88 wait = true; 89 } 90 if (wait) usleep (10000); 87 // Check for threads that are ready to read 88 bool wait = true; 89 for (int j = 0; j < fileGroups->n; j++) { 90 ppMergeFileGroup *fileGroup = fileGroups->data[j]; 91 if (fileGroup->busy) { 92 continue; 93 } 94 fileGroup->read = false; 95 wait = false; 96 } 97 if (wait) { 98 // No threads currently available 99 usleep(THREAD_WAIT); 100 } 91 101 } 92 102 return NULL; -
trunk/ppMerge/src/ppMergeSetThreads.c
r18862 r18967 2 2 3 3 // "PPMERGE_READOUT_COMBINE", 5 4 bool ppMergeThread_pmReadoutCombine (psThreadJob *job) { 4 bool ppMergeThread_pmReadoutCombine(const psThreadJob *job) 5 { 6 PS_ASSERT_THREAD_JOB_NON_NULL(job, false); 5 7 6 8 pmReadout *output = job->args->data[0]; … … 13 15 14 16 // after we are done, tell the I/O system that this file group is done 15 fileGroup->busy = false; 17 fileGroup->busy = false; 16 18 return status; 17 19 } 18 20 19 bool ppMergeThread_pmDarkCombine (psThreadJob *job) { 21 bool ppMergeThread_pmDarkCombine(const psThreadJob *job) 22 { 23 PS_ASSERT_THREAD_JOB_NON_NULL(job, false); 20 24 21 25 pmCell *outCell = job->args->data[0]; 22 26 ppMergeFileGroup *fileGroup = job->args->data[1]; 23 psScalar *iter = job->args->data[2];24 psScalar *rej = job->args->data[3];25 psScalar *maskVal = job->args->data[4];27 psScalar *iter = job->args->data[2]; 28 psScalar *rej = job->args->data[3]; 29 psScalar *maskVal = job->args->data[4]; 26 30 27 31 bool status = pmDarkCombine(outCell, fileGroup->readouts, iter->data.S32, rej->data.F32, maskVal->data.U8); … … 32 36 } 33 37 34 bool ppMergeThread_pmShutterCorrectionGenerate (psThreadJob *job) { 38 bool ppMergeThread_pmShutterCorrectionGenerate(const psThreadJob *job) 39 { 40 PS_ASSERT_THREAD_JOB_NON_NULL(job, false); 35 41 36 42 pmReadout *output = job->args->data[0]; … … 39 45 psScalar *shutterRef = job->args->data[3]; 40 46 pmShutterCorrectionData *data = job->args->data[4]; 41 psScalar *iter = job->args->data[5];42 psScalar *rej = job->args->data[6];43 psScalar *maskVal = job->args->data[7];47 psScalar *iter = job->args->data[5]; 48 psScalar *rej = job->args->data[6]; 49 psScalar *maskVal = job->args->data[7]; 44 50 45 51 bool status = pmShutterCorrectionGenerate(output, pattern, fileGroup->readouts, shutterRef->data.F32, data, iter->data.S32, rej->data.F32, maskVal->data.U8); … … 50 56 } 51 57 52 bool ppMergeSetThreads () { 58 bool ppMergeSetThreads(void) 59 { 53 60 54 psThreadTask *task = NULL; 61 { 62 psThreadTask *task = psThreadTaskAlloc("PPMERGE_READOUT_COMBINE", 5); 63 task->function = &ppMergeThread_pmReadoutCombine; 64 psThreadTaskAdd(task); 65 psFree(task); 66 } 55 67 56 task = psThreadTaskAlloc ("PPMERGE_READOUT_COMBINE", 5); 57 task->function = &ppMergeThread_pmReadoutCombine; 58 psThreadTaskAdd (task); 68 { 69 psThreadTask *task = psThreadTaskAlloc("PPMERGE_DARK_COMBINE", 5); 70 task->function = &ppMergeThread_pmDarkCombine; 71 psThreadTaskAdd(task); 72 psFree(task); 73 } 59 74 60 task = psThreadTaskAlloc ("PPMERGE_DARK_COMBINE", 5); 61 task->function = &ppMergeThread_pmDarkCombine; 62 psThreadTaskAdd (task); 63 64 task = psThreadTaskAlloc ("PPMERGE_SHUTTER_CORRECTION", 8); 65 task->function = &ppMergeThread_pmShutterCorrectionGenerate; 66 psThreadTaskAdd (task); 75 { 76 psThreadTask *task = psThreadTaskAlloc("PPMERGE_SHUTTER_CORRECTION", 8); 77 task->function = &ppMergeThread_pmShutterCorrectionGenerate; 78 psThreadTaskAdd(task); 79 psFree(task); 80 } 67 81 68 82 return true; -
trunk/pswarp/src/pswarpTransformReadout.c
r18884 r18967 84 84 return false; 85 85 } 86 psFree (args); 86 psFree(job); 87 psFree(args); 87 88 } 88 89 } … … 90 91 // wait for the threads to finish and manage results 91 92 // wait here for the threaded jobs to finish 92 if (!psThreadPoolWait ( )) {93 if (!psThreadPoolWait (false)) { 93 94 psError(PS_ERR_UNKNOWN, false, "Unable to interpolate image."); 94 95 return false;
Note:
See TracChangeset
for help on using the changeset viewer.
