IPP Software Navigation Tools IPP Links Communication Pan-STARRS Links

Ignore:
Timestamp:
Aug 8, 2008, 8:17:12 AM (18 years ago)
Author:
Paul Price
Message:

Fixing following changes to psThread.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/ppMerge/src/ppMergeReadChunk.c

    r18839 r18967  
    11# include "ppMerge.h"
    22
    3 ppMergeFileGroup *ppMergeReadChunk (bool *status, psArray *fileGroups, pmConfig *config, int numChunk) {
     3#define THREAD_WAIT 10000               // Microseconds to wait if thread is not available
    44
     5ppMergeFileGroup *ppMergeReadChunk(bool *status, psArray *fileGroups, pmConfig *config, int numChunk)
     6{
    57    *status = true;
    68
    79    bool mdok;
    810    bool haveMasks = psMetadataLookupBool(&mdok, config->arguments, "INPUTS.MASKS"); // Do we have masks?
    9     bool haveWeights = psMetadataLookupBool(&mdok, config->arguments, "INPUTS.WEIGHTS"); // Do we have weights?
     11    bool haveWeights = psMetadataLookupBool(&mdok, config->arguments, "INPUTS.WEIGHTS");// Do we have weights?
    1012    int rows = psMetadataLookupS32(NULL, config->arguments, "ROWS"); // Number of rows to read per chunk
    1113
    1214    // select an available fileGroup
    1315    while (1) {
    14         // check for any fileGroups which can read data
    15         for (int j = 0; j < fileGroups->n; j++) {
    16             ppMergeFileGroup *fileGroup = fileGroups->data[j];
    17             if (fileGroup->read) continue;
     16        // check for any fileGroups which can read data
     17        for (int j = 0; j < fileGroups->n; j++) {
     18            ppMergeFileGroup *fileGroup = fileGroups->data[j];
     19            if (fileGroup->read) continue;
    1820
    19             // find max last scan so far
    20             int lastScan = 0;
    21             for (int i = 0; i < fileGroups->n; i++) {
    22                 ppMergeFileGroup *fileGroup = fileGroups->data[i];
    23                 lastScan = PS_MAX (fileGroup->lastScan, lastScan);
    24             }
    25             fileGroup->firstScan = lastScan;
    26             fileGroup->lastScan = lastScan + rows;
     21            // find max last scan so far
     22            int lastScan = 0;
     23            for (int i = 0; i < fileGroups->n; i++) {
     24                ppMergeFileGroup *fileGroup = fileGroups->data[i];
     25                lastScan = PS_MAX(fileGroup->lastScan, lastScan);
     26            }
     27            fileGroup->firstScan = lastScan;
     28            fileGroup->lastScan = lastScan + rows;
    2729
    28             psArray *readouts = fileGroup->readouts;
     30            psArray *readouts = fileGroup->readouts;
    2931
    30             psTimerStart ("ppMergeReadChunk");
     32            psTimerStart ("ppMergeReadChunk");
    3133
    32             psTrace("ppStack", 2, "Reading data for chunk %d into fileGroup %d....n", numChunk, j);
    33             for (int i = 0; i < readouts->n; i++) {
    34                 pmReadout *inRO = readouts->data[i]; // Input readout
     34            psTrace("ppStack", 2, "Reading data for chunk %d into fileGroup %d....n", numChunk, j);
     35            for (int i = 0; i < readouts->n; i++) {
     36                pmReadout *inRO = readouts->data[i]; // Input readout
    3537
    36                 // override the recorded last scan
    37                 inRO->thisImageScan  = fileGroup->firstScan;
    38                 inRO->thisWeightScan = fileGroup->firstScan;
    39                 inRO->thisMaskScan   = fileGroup->firstScan;
     38                // override the recorded last scan
     39                inRO->thisImageScan  = fileGroup->firstScan;
     40                inRO->thisWeightScan = fileGroup->firstScan;
     41                inRO->thisMaskScan   = fileGroup->firstScan;
    4042
    41                 // Read a chunk from a file
    42                 pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT", i);
     43                // Read a chunk from a file
     44                pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT", i);
    4345
    44                 bool keepReading = false;
    45                 if (pmReadoutMore(inRO, file->fits, 0, rows, config)) {
    46                     keepReading = true;
    47                     if (!pmReadoutReadChunk(inRO, file->fits, 0, rows, 0, config)) {
    48                         psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT %d", numChunk, i);
    49                         *status = false;
    50                         return NULL;
    51                     }                                                   
    52                 }
     46                bool keepReading = false;
     47                if (pmReadoutMore(inRO, file->fits, 0, rows, config)) {
     48                    keepReading = true;
     49                    if (!pmReadoutReadChunk(inRO, file->fits, 0, rows, 0, config)) {
     50                        psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT %d",
     51                                numChunk, i);
     52                        *status = false;
     53                        return NULL;
     54                    }
     55                }
    5356
    54                 if (haveMasks && pmReadoutMoreMask(inRO, file->fits, 0, rows, config)) {
    55                     keepReading = true;
    56                     pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT.MASK", i);
    57                     if (!pmReadoutReadChunkMask(inRO, file->fits, 0, rows, 0, config)) {
    58                         psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT.MASK %d", numChunk, i);
    59                         *status = false;
    60                         return NULL;
    61                     }                                                   
    62                 }
     57                if (haveMasks && pmReadoutMoreMask(inRO, file->fits, 0, rows, config)) {
     58                    keepReading = true;
     59                    pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT.MASK", i);
     60                    if (!pmReadoutReadChunkMask(inRO, file->fits, 0, rows, 0, config)) {
     61                        psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT.MASK %d",
     62                                numChunk, i);
     63                        *status = false;
     64                        return NULL;
     65                    }
     66                }
    6367
    64                 if (haveWeights && pmReadoutMoreWeight(inRO, file->fits, 0, rows, config)) {
    65                     keepReading = true;
    66                     pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT.WEIGHT", i);
    67                     if (!pmReadoutReadChunkWeight(inRO, file->fits, 0, rows, 0, config)) {
    68                         psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT.WEIGHT %d", numChunk, i);
    69                         *status = false;
    70                         return NULL;
    71                     }                                                   
    72                 }
    73                 if (!keepReading) {
    74                     return NULL;
    75                 }
    76             }
     68                if (haveWeights && pmReadoutMoreWeight(inRO, file->fits, 0, rows, config)) {
     69                    keepReading = true;
     70                    pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT.WEIGHT", i);
     71                    if (!pmReadoutReadChunkWeight(inRO, file->fits, 0, rows, 0, config)) {
     72                        psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT.WEIGHT %d",
     73                                numChunk, i);
     74                        *status = false;
     75                        return NULL;
     76                    }
     77                }
     78                if (!keepReading) {
     79                    return NULL;
     80                }
     81            }
    7782
    78             fileGroup->read = fileGroup->busy = true;
    79             return fileGroup;
    80         }
     83            fileGroup->read = fileGroup->busy = true;
     84            return fileGroup;
     85        }
    8186
    82         // check for any fileGroups which are done processing
    83         bool wait = false;
    84         for (int j = 0; j < fileGroups->n; j++) {
    85             ppMergeFileGroup *fileGroup = fileGroups->data[j];
    86             if (!fileGroup->read || fileGroup->busy) continue;
    87             fileGroup->read = false;
    88             wait = true;
    89         }
    90         if (wait) usleep (10000);
     87        // Check for threads that are ready to read
     88        bool wait = true;
     89        for (int j = 0; j < fileGroups->n; j++) {
     90            ppMergeFileGroup *fileGroup = fileGroups->data[j];
     91            if (fileGroup->busy) {
     92                continue;
     93            }
     94            fileGroup->read = false;
     95            wait = false;
     96        }
     97        if (wait) {
     98            // No threads currently available
     99            usleep(THREAD_WAIT);
     100        }
    91101    }
    92102    return NULL;
Note: See TracChangeset for help on using the changeset viewer.