Changeset 18967 for trunk/ppMerge/src/ppMergeReadChunk.c
- Timestamp:
- Aug 8, 2008, 8:17:12 AM (18 years ago)
- File:
-
- 1 edited
-
trunk/ppMerge/src/ppMergeReadChunk.c (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
trunk/ppMerge/src/ppMergeReadChunk.c
r18839 r18967 1 1 # include "ppMerge.h" 2 2 3 ppMergeFileGroup *ppMergeReadChunk (bool *status, psArray *fileGroups, pmConfig *config, int numChunk) { 3 #define THREAD_WAIT 10000 // Microseconds to wait if thread is not available 4 4 5 ppMergeFileGroup *ppMergeReadChunk(bool *status, psArray *fileGroups, pmConfig *config, int numChunk) 6 { 5 7 *status = true; 6 8 7 9 bool mdok; 8 10 bool haveMasks = psMetadataLookupBool(&mdok, config->arguments, "INPUTS.MASKS"); // Do we have masks? 9 bool haveWeights = psMetadataLookupBool(&mdok, config->arguments, "INPUTS.WEIGHTS"); // Do we have weights?11 bool haveWeights = psMetadataLookupBool(&mdok, config->arguments, "INPUTS.WEIGHTS");// Do we have weights? 10 12 int rows = psMetadataLookupS32(NULL, config->arguments, "ROWS"); // Number of rows to read per chunk 11 13 12 14 // select an available fileGroup 13 15 while (1) { 14 // check for any fileGroups which can read data15 for (int j = 0; j < fileGroups->n; j++) {16 ppMergeFileGroup *fileGroup = fileGroups->data[j];17 if (fileGroup->read) continue;16 // check for any fileGroups which can read data 17 for (int j = 0; j < fileGroups->n; j++) { 18 ppMergeFileGroup *fileGroup = fileGroups->data[j]; 19 if (fileGroup->read) continue; 18 20 19 // find max last scan so far20 int lastScan = 0;21 for (int i = 0; i < fileGroups->n; i++) {22 ppMergeFileGroup *fileGroup = fileGroups->data[i];23 lastScan = PS_MAX(fileGroup->lastScan, lastScan);24 }25 fileGroup->firstScan = lastScan;26 fileGroup->lastScan = lastScan + rows;21 // find max last scan so far 22 int lastScan = 0; 23 for (int i = 0; i < fileGroups->n; i++) { 24 ppMergeFileGroup *fileGroup = fileGroups->data[i]; 25 lastScan = PS_MAX(fileGroup->lastScan, lastScan); 26 } 27 fileGroup->firstScan = lastScan; 28 fileGroup->lastScan = lastScan + rows; 27 29 28 psArray *readouts = fileGroup->readouts;30 psArray *readouts = fileGroup->readouts; 29 31 30 psTimerStart ("ppMergeReadChunk");32 psTimerStart ("ppMergeReadChunk"); 31 33 32 psTrace("ppStack", 2, "Reading data for chunk %d into fileGroup %d....n", numChunk, j);33 for (int i = 0; i < readouts->n; i++) {34 pmReadout *inRO = readouts->data[i]; // Input readout34 psTrace("ppStack", 2, "Reading data for chunk %d into fileGroup %d....n", numChunk, j); 35 for (int i = 0; i < readouts->n; i++) { 36 pmReadout *inRO = readouts->data[i]; // Input readout 35 37 36 // override the recorded last scan37 inRO->thisImageScan = fileGroup->firstScan;38 inRO->thisWeightScan = fileGroup->firstScan;39 inRO->thisMaskScan = fileGroup->firstScan;38 // override the recorded last scan 39 inRO->thisImageScan = fileGroup->firstScan; 40 inRO->thisWeightScan = fileGroup->firstScan; 41 inRO->thisMaskScan = fileGroup->firstScan; 40 42 41 // Read a chunk from a file42 pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT", i); 43 // Read a chunk from a file 44 pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT", i); 43 45 44 bool keepReading = false; 45 if (pmReadoutMore(inRO, file->fits, 0, rows, config)) { 46 keepReading = true; 47 if (!pmReadoutReadChunk(inRO, file->fits, 0, rows, 0, config)) { 48 psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT %d", numChunk, i); 49 *status = false; 50 return NULL; 51 } 52 } 46 bool keepReading = false; 47 if (pmReadoutMore(inRO, file->fits, 0, rows, config)) { 48 keepReading = true; 49 if (!pmReadoutReadChunk(inRO, file->fits, 0, rows, 0, config)) { 50 psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT %d", 51 numChunk, i); 52 *status = false; 53 return NULL; 54 } 55 } 53 56 54 if (haveMasks && pmReadoutMoreMask(inRO, file->fits, 0, rows, config)) { 55 keepReading = true; 56 pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT.MASK", i); 57 if (!pmReadoutReadChunkMask(inRO, file->fits, 0, rows, 0, config)) { 58 psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT.MASK %d", numChunk, i); 59 *status = false; 60 return NULL; 61 } 62 } 57 if (haveMasks && pmReadoutMoreMask(inRO, file->fits, 0, rows, config)) { 58 keepReading = true; 59 pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT.MASK", i); 60 if (!pmReadoutReadChunkMask(inRO, file->fits, 0, rows, 0, config)) { 61 psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT.MASK %d", 62 numChunk, i); 63 *status = false; 64 return NULL; 65 } 66 } 63 67 64 if (haveWeights && pmReadoutMoreWeight(inRO, file->fits, 0, rows, config)) { 65 keepReading = true; 66 pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT.WEIGHT", i); 67 if (!pmReadoutReadChunkWeight(inRO, file->fits, 0, rows, 0, config)) { 68 psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT.WEIGHT %d", numChunk, i); 69 *status = false; 70 return NULL; 71 } 72 } 73 if (!keepReading) { 74 return NULL; 75 } 76 } 68 if (haveWeights && pmReadoutMoreWeight(inRO, file->fits, 0, rows, config)) { 69 keepReading = true; 70 pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT.WEIGHT", i); 71 if (!pmReadoutReadChunkWeight(inRO, file->fits, 0, rows, 0, config)) { 72 psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT.WEIGHT %d", 73 numChunk, i); 74 *status = false; 75 return NULL; 76 } 77 } 78 if (!keepReading) { 79 return NULL; 80 } 81 } 77 82 78 fileGroup->read = fileGroup->busy = true;79 return fileGroup;80 }83 fileGroup->read = fileGroup->busy = true; 84 return fileGroup; 85 } 81 86 82 // check for any fileGroups which are done processing 83 bool wait = false; 84 for (int j = 0; j < fileGroups->n; j++) { 85 ppMergeFileGroup *fileGroup = fileGroups->data[j]; 86 if (!fileGroup->read || fileGroup->busy) continue; 87 fileGroup->read = false; 88 wait = true; 89 } 90 if (wait) usleep (10000); 87 // Check for threads that are ready to read 88 bool wait = true; 89 for (int j = 0; j < fileGroups->n; j++) { 90 ppMergeFileGroup *fileGroup = fileGroups->data[j]; 91 if (fileGroup->busy) { 92 continue; 93 } 94 fileGroup->read = false; 95 wait = false; 96 } 97 if (wait) { 98 // No threads currently available 99 usleep(THREAD_WAIT); 100 } 91 101 } 92 102 return NULL;
Note:
See TracChangeset
for help on using the changeset viewer.
