IPP Software Navigation Tools IPP Links Communication Pan-STARRS Links

Ignore:
Timestamp:
Jul 31, 2008, 2:13:59 PM (18 years ago)
Author:
eugene
Message:

adding multithread capability

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/ppMerge/src/ppMergeReadChunk.c

    r18757 r18839  
    11# include "ppMerge.h"
    22
    3 ppMergeFileGroup *ppMergeReadChunk (psArray *fileGroups, pmConfig *config, int numChunk) {
     3ppMergeFileGroup *ppMergeReadChunk (bool *status, psArray *fileGroups, pmConfig *config, int numChunk) {
     4
     5    *status = true;
     6
     7    bool mdok;
     8    bool haveMasks = psMetadataLookupBool(&mdok, config->arguments, "INPUTS.MASKS"); // Do we have masks?
     9    bool haveWeights = psMetadataLookupBool(&mdok, config->arguments, "INPUTS.WEIGHTS"); // Do we have weights?
     10    int rows = psMetadataLookupS32(NULL, config->arguments, "ROWS"); // Number of rows to read per chunk
    411
    512    // select an available fileGroup
    6  
    7     bool haveMasks = psMetadataLookupBool(&mdok, arguments, "INPUTS.MASKS"); // Do we have masks?
    8     bool haveWeights = psMetadataLookupBool(&mdok, arguments, "INPUTS.WEIGHTS"); // Do we have weights?
    9 
    1013    while (1) {
    1114        // check for any fileGroups which can read data
     
    1417            if (fileGroup->read) continue;
    1518
     19            // find max last scan so far
     20            int lastScan = 0;
     21            for (int i = 0; i < fileGroups->n; i++) {
     22                ppMergeFileGroup *fileGroup = fileGroups->data[i];
     23                lastScan = PS_MAX (fileGroup->lastScan, lastScan);
     24            }
     25            fileGroup->firstScan = lastScan;
     26            fileGroup->lastScan = lastScan + rows;
     27
    1628            psArray *readouts = fileGroup->readouts;
     29
     30            psTimerStart ("ppMergeReadChunk");
    1731
    1832            psTrace("ppStack", 2, "Reading data for chunk %d into fileGroup %d....n", numChunk, j);
     
    2034                pmReadout *inRO = readouts->data[i]; // Input readout
    2135
     36                // override the recorded last scan
     37                inRO->thisImageScan  = fileGroup->firstScan;
     38                inRO->thisWeightScan = fileGroup->firstScan;
     39                inRO->thisMaskScan   = fileGroup->firstScan;
     40
    2241                // Read a chunk from a file
    2342                pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT", i);
    24                 if (!pmReadoutReadChunk(inRO, file->fits, 0, rows, 0, config)) {
    25                     psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT %d", numChunk, i);
    26                     return NULL;
    27                 }                                                       
    2843
    29                 if (haveMasks) {
    30                     pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT.MASK", i);
    31                     if (!pmReadoutReadChunkMask(inRO, file->fits, 0, rows, 0, config)) {
    32                         psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT.MASK %d", numChunk, NAME, i);
     44                bool keepReading = false;
     45                if (pmReadoutMore(inRO, file->fits, 0, rows, config)) {
     46                    keepReading = true;
     47                    if (!pmReadoutReadChunk(inRO, file->fits, 0, rows, 0, config)) {
     48                        psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT %d", numChunk, i);
     49                        *status = false;
    3350                        return NULL;
    3451                    }                                                   
    3552                }
    3653
    37                 if (haveWeights) {
    38                     pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT.WEIGHT", i);
    39                     if (!pmReadoutReadChunkWeight(inRO, file->fits, 0, rows, 0, config)) {
    40                         psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT.WEIGHT %d", numChunk, NAME, i);
     54                if (haveMasks && pmReadoutMoreMask(inRO, file->fits, 0, rows, config)) {
     55                    keepReading = true;
     56                    pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT.MASK", i);
     57                    if (!pmReadoutReadChunkMask(inRO, file->fits, 0, rows, 0, config)) {
     58                        psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT.MASK %d", numChunk, i);
     59                        *status = false;
    4160                        return NULL;
    4261                    }                                                   
    4362                }
     63
     64                if (haveWeights && pmReadoutMoreWeight(inRO, file->fits, 0, rows, config)) {
     65                    keepReading = true;
     66                    pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT.WEIGHT", i);
     67                    if (!pmReadoutReadChunkWeight(inRO, file->fits, 0, rows, 0, config)) {
     68                        psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT.WEIGHT %d", numChunk, i);
     69                        *status = false;
     70                        return NULL;
     71                    }                                                   
     72                }
     73                if (!keepReading) {
     74                    return NULL;
     75                }
    4476            }
     77
    4578            fileGroup->read = fileGroup->busy = true;
    4679            return fileGroup;
     
    4881
    4982        // check for any fileGroups which are done processing
    50         bool wait = true;
    51         bool more = true;
     83        bool wait = false;
    5284        for (int j = 0; j < fileGroups->n; j++) {
    5385            ppMergeFileGroup *fileGroup = fileGroups->data[j];
    5486            if (!fileGroup->read || fileGroup->busy) continue;
    55            
    56             wait = false;
    57             psArray *readouts = fileGroup->readouts;
    58             // any more data to be read?
    59             for (int i = 0; i < readouts->n && more; i++) {
    60                 pmReadout *inRO = readouts->data[i];
    61 
    62                 pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT", i);
    63                 more &= pmReadoutMore(inRO, file->fits, 0, rows, config);
    64 
    65                 if (haveMasks) {
    66                     pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT.MASK", i);
    67                     more &= pmReadoutMoreMask(inRO, file->fits, 0, rows, config);
    68                 }
    69                 if (haveWeights) {
    70                     pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT.WEIGHT", i);
    71                     more &= pmReadoutMoreWeight(inRO, file->fits, 0, rows, config);
    72                 }
    73             }
    7487            fileGroup->read = false;
     88            wait = true;
    7589        }
    76         if (!more) return NULL;
    77 
    7890        if (wait) usleep (10000);
    7991    }
Note: See TracChangeset for help on using the changeset viewer.