IPP Software Navigation Tools IPP Links Communication Pan-STARRS Links

Changeset 18967


Ignore:
Timestamp:
Aug 8, 2008, 8:17:12 AM (18 years ago)
Author:
Paul Price
Message:

Fixing following changes to psThread.

Location:
trunk
Files:
7 edited

Legend:

Unmodified
Added
Removed
  • trunk/ppMerge/src/ppMerge.h

    r18839 r18967  
    1515#define TIMERNAME "ppMerge"             // Name for timer
    1616#define PPMERGE_RECIPE "PPMERGE"        // Recipe name
    17 #define THREADED 1
     17#define THREADED 1                      // Compile with threads?
    1818
    1919// Type of frame to merge
     
    3535} ppMergeFiles;
    3636
     37// Group of files to read
     38//
     39// Each file contributes a readout, into which is read a chunk from that file
    3740typedef struct {
    38     psArray *readouts;
    39     bool read;
    40     bool busy;
    41     int firstScan;
    42     int lastScan;
     41    psArray *readouts;                  // Input readouts
     42    bool read;                          // Has the scan been read?
     43    bool busy;                          // Is the scan being processed?
     44    int firstScan;                      // First row of the chunk to be read for this group
     45    int lastScan;                       // Last row of the chunk to be read for this group
    4346} ppMergeFileGroup;
    4447
     
    101104
    102105
    103 ppMergeFileGroup *ppMergeFileGroupAlloc();
    104 ppMergeFileGroup *ppMergeReadChunk (bool *status, psArray *fileGroups, pmConfig *config, int numChunk);
    105 void *ppMergeThreadLauncher (void *data);
     106// Allocator for group of files
     107ppMergeFileGroup *ppMergeFileGroupAlloc(void);
    106108
    107 bool ppMergeSetThreads ();
     109// Read chunk into the first available file group
     110ppMergeFileGroup *ppMergeReadChunk(bool *status, // Status of read
     111                                   psArray *fileGroups, // All groups
     112                                   pmConfig *config, // Configuration
     113                                   int numChunk // Chunk number (only for interest)
     114    );
     115
     116// Set up thread handling
     117bool ppMergeSetThreads(void);
    108118
    109119#endif
  • trunk/ppMerge/src/ppMergeArguments.c

    r18839 r18967  
    174174    if ((argnum = psArgumentGet(argc, argv, "-threads"))) {
    175175        psArgumentRemove(argnum, &argc, argv);
    176         int nThreads = atoi(argv[argnum]);
     176        int nThreads = atoi(argv[argnum]);
    177177        psMetadataAddS32(config->arguments, PS_LIST_TAIL, "NTHREADS", 0, "number of warp threads", nThreads);
    178178        psArgumentRemove(argnum, &argc, argv);
    179179
    180         // create the thread pool with number of desired threads, supplying our thread launcher function
    181         // XXX need to determine the number of threads from the config data
    182         psThreadPoolInit (nThreads);
     180        // create the thread pool with number of desired threads, supplying our thread launcher function
     181        // XXX need to determine the number of threads from the config data
     182        psThreadPoolInit (nThreads);
    183183    }
    184184    ppMergeSetThreads();
     
    379379#endif
    380380
     381    psFree(arguments);
    381382    return true;
    382383
  • trunk/ppMerge/src/ppMergeCamera.c

    r18756 r18967  
    3131            if (! pmFPASelectChip(fpa, chipNum, false)) {
    3232                psError(PS_ERR_IO, false, "Chip number %d doesn't exist in camera.\n", chipNum);
    33                 psFree (chips);
     33                psFree (chips);
    3434                return false;
    3535            }
     
    4343    psArray *cells = psStringSplitArray (cellLine, ",", false);
    4444    if (cells->n > 0) {
    45         for (int i = 0; i < fpa->chips->n; i++) {
    46             pmChip *chip = fpa->chips->data[i];
    47             pmChipSelectCell (chip, -1, true); // deselect all cells
    48             for (int j = 0; j < cells->n; j++) {
    49                 int cellNum = atoi(cells->data[j]);
    50                 if (! pmChipSelectCell(chip, cellNum, false)) {
    51                     psError(PS_ERR_IO, false, "Cell number %d doesn't exist in camera.\n", cellNum);
    52                     psFree (cells);
    53                     return false;
    54                 }
    55             }
    56         }
     45        for (int i = 0; i < fpa->chips->n; i++) {
     46            pmChip *chip = fpa->chips->data[i];
     47            pmChipSelectCell (chip, -1, true); // deselect all cells
     48            for (int j = 0; j < cells->n; j++) {
     49                int cellNum = atoi(cells->data[j]);
     50                if (! pmChipSelectCell(chip, cellNum, false)) {
     51                    psError(PS_ERR_IO, false, "Cell number %d doesn't exist in camera.\n", cellNum);
     52                    psFree (cells);
     53                    return false;
     54                }
     55            }
     56        }
    5757    }
    5858    psFree (cells);
     
    259259                    cell->file_exists = false;
    260260                    culled++;
    261                     if (cell->concepts) {
    262                         psFree(cell->concepts);
    263                         cell->concepts = NULL;
    264                     }
     261                    if (cell->concepts) {
     262                        psFree(cell->concepts);
     263                        cell->concepts = NULL;
     264                    }
    265265                }
    266266            }
     
    268268                chip->data_exists = false;
    269269                chip->file_exists = false;
    270                 if (chip->concepts) {
    271                     psFree(chip->concepts);
    272                     chip->concepts = NULL;
    273                 }
     270                if (chip->concepts) {
     271                    psFree(chip->concepts);
     272                    chip->concepts = NULL;
     273                }
    274274            }
    275275        }
     
    298298    }
    299299
    300     // Output image
    301     pmFPA *fpa = pmFPAConstruct(config->camera, config->cameraName); // FPA to contain the output
    302     if (!fpa) {
    303         psError(PS_ERR_UNEXPECTED_NULL, false, "Unable to construct an FPA from camera configuration.");
    304         psFree(phuView);
    305         return false;
    306     }
    307 
    308300    psString outName = ppMergeOutputFile(config); // Name of output file
    309301
  • trunk/ppMerge/src/ppMergeLoop_Threaded.c

    r18862 r18967  
    1212#include "ppMerge.h"
    1313
    14 // XXX this function is now sufficiently different for the major types, it would make sense to just 
     14// XXX this function is now sufficiently different for the major types, it would make sense to just
    1515// split it into three: BASIC, SHUTTER, DARK
    1616
     
    5757    psMaskType markVal;
    5858    if (!pmConfigMaskSetBits (&maskVal, &markVal, config)) {
    59         psError (PS_ERR_UNKNOWN, true, "Unable to define the mask bit values");
    60         return false;
     59        psError (PS_ERR_UNKNOWN, true, "Unable to define the mask bit values");
     60        return false;
    6161    }
    6262
     
    151151            if (type == PPMERGE_TYPE_SHUTTER) {
    152152                shutterRef = pmShutterCorrectionReference(shutters->data[cellNum]);
    153                 pattern = pmReadoutAlloc(NULL);
     153                pattern = pmReadoutAlloc(NULL);
    154154            }
    155155
    156156            pmReadout *outRO = pmReadoutAlloc(outCell);
    157157
    158             // open the input files (we need to do the work ourselves)
     158            // open the input files (we need to do the work ourselves)
    159159            for (int i = 0; i < numFiles; i++) {
    160160                if (!ppMergeFileOpenInput(config, view, i)) {
     
    162162                    goto ERROR;
    163163                }
    164             }
    165 
    166             ppMergeFileGroup *fileGroup = NULL;
    167             psArray *fileGroups = psArrayAlloc (nThreads + 1);
    168 
    169             // generate readouts for each input file in each file group
    170             for (int i = 0; i < fileGroups->n; i++) {
    171                 psArray *readouts = psArrayAlloc(numFiles); // Input readouts
    172                 for (int j = 0; j < numFiles; j++) {
    173                     pmFPAfile *input = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT", i);
    174                     pmCell *inCell = pmFPAviewThisCell(view, input->fpa); // Input cell
    175                     readouts->data[j] = pmReadoutAlloc(inCell);
    176                 }
    177 
    178                 fileGroup = ppMergeFileGroupAlloc();
    179                 fileGroup->readouts = readouts;
    180                 fileGroup->read = false;
    181                 fileGroup->busy = false;
    182                 fileGroup->lastScan = 0;
    183                 fileGroup->firstScan = 0;
    184                 fileGroups->data[i] = fileGroup;
    185             }
    186 
    187             // call the init functions
    188             switch (type) {
    189               case PPMERGE_TYPE_BIAS:
    190               case PPMERGE_TYPE_FLAT:
    191               case PPMERGE_TYPE_FRINGE:
    192                 psAssert (fileGroups->n > 0, "no valid file groups defined");
    193                 fileGroup = fileGroups->data[0];
    194                 if (!pmReadoutCombinePrepare(outRO, fileGroup->readouts, combination)) {
    195                     goto ERROR;
    196                 }
    197                 break;
    198               case PPMERGE_TYPE_DARK:
    199                 psAssert (fileGroups->n > 0, "no valid file groups defined");
    200                 fileGroup = fileGroups->data[0];
    201                 if (!pmDarkCombinePrepare(outCell, fileGroup->readouts, darkOrdinates, darkNorm)) {
    202                     goto ERROR;
    203                 }
    204                 break;
    205               case PPMERGE_TYPE_SHUTTER:
    206                 psAssert (fileGroups->n > 0, "no valid file groups defined");
    207                 fileGroup = fileGroups->data[0];
    208                 if (!pmShutterCorrectionGeneratePrepare(outRO, pattern, fileGroup->readouts, maskVal)) {
    209                     goto ERROR;
    210                 }
    211                 break;
    212               default:
    213                 fprintf (stderr, "not yet ready");
    214                 goto ERROR;
    215             }
     164            }
     165
     166            ppMergeFileGroup *fileGroup = NULL;
     167            psArray *fileGroups = psArrayAlloc(nThreads + 1);
     168
     169            // Generate readouts for each input file in each file group
     170            for (int i = 0; i < fileGroups->n; i++) {
     171                psArray *readouts = psArrayAlloc(numFiles); // Input readouts
     172                for (int j = 0; j < numFiles; j++) {
     173                    pmFPAfile *input = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT", i);
     174                    pmCell *inCell = pmFPAviewThisCell(view, input->fpa); // Input cell
     175                    readouts->data[j] = pmReadoutAlloc(inCell);
     176                }
     177
     178                fileGroup = ppMergeFileGroupAlloc();
     179                fileGroup->readouts = readouts;
     180                fileGroup->read = false;
     181                fileGroup->busy = false;
     182                fileGroup->lastScan = 0;
     183                fileGroup->firstScan = 0;
     184                fileGroups->data[i] = fileGroup;
     185            }
     186
     187            // call the init functions
     188            switch (type) {
     189              case PPMERGE_TYPE_BIAS:
     190              case PPMERGE_TYPE_FLAT:
     191              case PPMERGE_TYPE_FRINGE:
     192                psAssert (fileGroups->n > 0, "no valid file groups defined");
     193                fileGroup = fileGroups->data[0];
     194                if (!pmReadoutCombinePrepare(outRO, fileGroup->readouts, combination)) {
     195                    goto ERROR;
     196                }
     197                break;
     198              case PPMERGE_TYPE_DARK:
     199                psAssert (fileGroups->n > 0, "no valid file groups defined");
     200                fileGroup = fileGroups->data[0];
     201                if (!pmDarkCombinePrepare(outCell, fileGroup->readouts, darkOrdinates, darkNorm)) {
     202                    goto ERROR;
     203                }
     204                break;
     205              case PPMERGE_TYPE_SHUTTER:
     206                psAssert (fileGroups->n > 0, "no valid file groups defined");
     207                fileGroup = fileGroups->data[0];
     208                if (!pmShutterCorrectionGeneratePrepare(outRO, pattern, fileGroup->readouts, maskVal)) {
     209                    goto ERROR;
     210                }
     211                break;
     212              default:
     213                psAbort("Should never get here.");
     214            }
    216215
    217216            // Read input data by chunks
    218             psTimerStart ("ppMergeLoop");
     217            psTimerStart("ppMergeLoop");
    219218            for (int numChunk = 0; true; numChunk++) {
    220219
    221                 bool status = false;
    222                 fileGroup = ppMergeReadChunk (&status, fileGroups, config, numChunk);
    223                 if (!status) goto ERROR;
    224                 if (!fileGroup) break;
    225 
    226                 psThreadJob *job = NULL;
    227 
     220                bool status = false;
     221                fileGroup = ppMergeReadChunk(&status, fileGroups, config, numChunk);
     222                if (!status) {
     223                    // Something went wrong
     224                    goto ERROR;
     225                }
     226                if (!fileGroup) {
     227                    // Nothing more to read
     228                    break;
     229                }
     230
     231                // Start a job
    228232                switch (type) {
    229233                  case PPMERGE_TYPE_BIAS:
    230234                  case PPMERGE_TYPE_FLAT:
    231                   case PPMERGE_TYPE_FRINGE:
    232                     // allocate a job
    233                     job = psThreadJobAlloc ("PPMERGE_READOUT_COMBINE");
    234 
    235                     // construct the arguments for this job
    236                     psArrayAdd (job->args, 1, outRO);
    237                     psArrayAdd (job->args, 1, fileGroup);
    238                     psArrayAdd (job->args, 1, zeros);
    239                     psArrayAdd (job->args, 1, scales);
    240                     psArrayAdd (job->args, 1, combination);
    241 
    242                     // call: pmReadoutCombine(outRO, fileGroup->readouts, zeros, scales, combination);
    243                     if (!psThreadJobAddPending (job)) {
    244                         goto ERROR;
    245                     }
    246                     break;
    247                   case PPMERGE_TYPE_DARK:
    248                     // allocate a job
    249                     job = psThreadJobAlloc ("PPMERGE_DARK_COMBINE");
    250 
    251                     // construct the arguments for this job
    252                     psArrayAdd (job->args, 1, outCell);
    253                     psArrayAdd (job->args, 1, fileGroup);
    254                     psArrayAdd (job->args, 1, psScalarAlloc(iter, PS_TYPE_S32));
    255                     psArrayAdd (job->args, 1, psScalarAlloc(rej, PS_TYPE_F32));
    256                     psArrayAdd (job->args, 1, psScalarAlloc(maskVal, PS_TYPE_U8));
    257 
    258                     // call: pmDarkCombine(outCell, fileGroup->readouts, iter, rej, maskVal);
    259                     if (!psThreadJobAddPending (job)) {                 
    260                         goto ERROR;
    261                     }
    262                     break;
    263                   case PPMERGE_TYPE_SHUTTER:
    264                     // allocate a job
    265                     job = psThreadJobAlloc ("PPMERGE_SHUTTER_CORRECTION");
    266 
    267                     // construct the arguments for this job
    268                     psArrayAdd (job->args, 1, outRO);
    269                     psArrayAdd (job->args, 1, pattern);
    270                     psArrayAdd (job->args, 1, fileGroup);
    271                     psArrayAdd (job->args, 1, psScalarAlloc(shutterRef, PS_TYPE_F32));
    272                     psArrayAdd (job->args, 1, shutters->data[cellNum]);
    273                     psArrayAdd (job->args, 1, psScalarAlloc(iter, PS_TYPE_S32));
    274                     psArrayAdd (job->args, 1, psScalarAlloc(rej, PS_TYPE_F32));
    275                     psArrayAdd (job->args, 1, psScalarAlloc(maskVal, PS_TYPE_U8));
    276 
    277                     // call: pmShutterCorrectionGenerate(outRO, pattern, fileGroup->readouts, shutterRef, shutters->data[cellNum], iter, rej, maskVal)
    278                     if (!psThreadJobAddPending (job)) {
    279                         goto ERROR;
    280                     }
    281                     break;
     235                  case PPMERGE_TYPE_FRINGE: {
     236                      psThreadJob *job = psThreadJobAlloc("PPMERGE_READOUT_COMBINE"); // Job to start
     237
     238                      // Construct the arguments for this job
     239                      psArrayAdd(job->args, 1, outRO);
     240                      psArrayAdd(job->args, 1, fileGroup);
     241                      psArrayAdd(job->args, 1, zeros);
     242                      psArrayAdd(job->args, 1, scales);
     243                      psArrayAdd(job->args, 1, combination);
     244
     245                      // call: pmReadoutCombine(outRO, fileGroup->readouts, zeros, scales, combination);
     246                      if (!psThreadJobAddPending(job)) {
     247                          goto ERROR;
     248                      }
     249                      break;
     250                  }
     251                  case PPMERGE_TYPE_DARK: {
     252                      psThreadJob *job = psThreadJobAlloc ("PPMERGE_DARK_COMBINE"); // Job to start
     253
     254                      // construct the arguments for this job
     255                      psArrayAdd(job->args, 1, outCell);
     256                      psArrayAdd(job->args, 1, fileGroup);
     257                      psArrayAdd(job->args, 1, psScalarAlloc(iter, PS_TYPE_S32));
     258                      psArrayAdd(job->args, 1, psScalarAlloc(rej, PS_TYPE_F32));
     259                      psArrayAdd(job->args, 1, psScalarAlloc(maskVal, PS_TYPE_U8));
     260
     261                      // call: pmDarkCombine(outCell, fileGroup->readouts, iter, rej, maskVal);
     262                      if (!psThreadJobAddPending(job)) {
     263                          goto ERROR;
     264                      }
     265                      break;
     266                  }
     267                  case PPMERGE_TYPE_SHUTTER: {
     268                      psThreadJob *job = psThreadJobAlloc ("PPMERGE_SHUTTER_CORRECTION");
     269
     270                      // construct the arguments for this job
     271                      psArrayAdd(job->args, 1, outRO);
     272                      psArrayAdd(job->args, 1, pattern);
     273                      psArrayAdd(job->args, 1, fileGroup);
     274                      psArrayAdd(job->args, 1, psScalarAlloc(shutterRef, PS_TYPE_F32));
     275                      psArrayAdd(job->args, 1, shutters->data[cellNum]);
     276                      psArrayAdd(job->args, 1, psScalarAlloc(iter, PS_TYPE_S32));
     277                      psArrayAdd(job->args, 1, psScalarAlloc(rej, PS_TYPE_F32));
     278                      psArrayAdd(job->args, 1, psScalarAlloc(maskVal, PS_TYPE_U8));
     279
     280                      // call: pmShutterCorrectionGenerate(outRO, pattern, fileGroup->readouts, shutterRef,
     281                      //                                   shutters->data[cellNum], iter, rej, maskVal);
     282                      if (!psThreadJobAddPending (job)) {
     283                          goto ERROR;
     284                      }
     285                      break;
     286                  }
    282287                  default:
    283288                    psAbort("Should never get here.");
     
    285290            }
    286291
    287             // wait for the threads to finish and manage results
    288             if (!psThreadPoolWait ()) {
    289                 psError(PS_ERR_UNKNOWN, false, "Unable to combine images.");
    290                 return false;
    291             }
    292 
    293             // we don't care about the results, just dump the done queue jobs
    294             psThreadJob *job = NULL;
    295             while ((job = psThreadJobGetDone()) != NULL) {
    296                 psFree (job);
    297             }
     292            // Wait for the threads to finish and manage results
     293            if (!psThreadPoolWait(false)) {
     294                psError(PS_ERR_UNKNOWN, false, "Unable to combine images.");
     295                return false;
     296            }
     297
     298            // we don't care about the results, just dump the done queue jobs
     299            psThreadJob *job = NULL;    // Job to dump
     300            while ((job = psThreadJobGetDone())) {
     301                psFree (job);
     302            }
    298303
    299304            psFree(fileGroups);
    300305
    301             // XXX eventually need to keep both the shutter and the pattern, as we do with dark
     306            // XXX eventually need to keep both the shutter and the pattern, as we do with dark
    302307            psFree(pattern);
    303308
     
    305310            psList *inCells = psListAlloc(NULL); // List of cells
    306311            for (int i = 0; i < numFiles; i++) {
    307                 pmFPAfile *input = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT", i);
    308                 pmCell *inCell = pmFPAviewThisCell(view, input->fpa); // Input cell
     312                pmFPAfile *input = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT", i);
     313                pmCell *inCell = pmFPAviewThisCell(view, input->fpa); // Input cell
    309314                psListAdd(inCells, PS_LIST_TAIL, inCell);
    310315            }
     
    316321            }
    317322            psFree(inCells);
    318             fprintf (stdout, "done ppMergeLoop for cell : %f\n", psTimerMark ("ppMergeLoop"));
    319323
    320324            // Plug supplementary images into their own FPAs
     
    364368            }
    365369
    366             if (!ppStatsFPA(stats, outFPA, view, maskVal, config)) {
     370            if (stats && !ppStatsFPA(stats, outFPA, view, maskVal, config)) {
    367371                psError(PS_ERR_UNKNOWN, true, "Unable to generate stats for image.");
    368372                goto ERROR;
  • trunk/ppMerge/src/ppMergeReadChunk.c

    r18839 r18967  
    11# include "ppMerge.h"
    22
    3 ppMergeFileGroup *ppMergeReadChunk (bool *status, psArray *fileGroups, pmConfig *config, int numChunk) {
     3#define THREAD_WAIT 10000               // Microseconds to wait if thread is not available
    44
     5ppMergeFileGroup *ppMergeReadChunk(bool *status, psArray *fileGroups, pmConfig *config, int numChunk)
     6{
    57    *status = true;
    68
    79    bool mdok;
    810    bool haveMasks = psMetadataLookupBool(&mdok, config->arguments, "INPUTS.MASKS"); // Do we have masks?
    9     bool haveWeights = psMetadataLookupBool(&mdok, config->arguments, "INPUTS.WEIGHTS"); // Do we have weights?
     11    bool haveWeights = psMetadataLookupBool(&mdok, config->arguments, "INPUTS.WEIGHTS");// Do we have weights?
    1012    int rows = psMetadataLookupS32(NULL, config->arguments, "ROWS"); // Number of rows to read per chunk
    1113
    1214    // select an available fileGroup
    1315    while (1) {
    14         // check for any fileGroups which can read data
    15         for (int j = 0; j < fileGroups->n; j++) {
    16             ppMergeFileGroup *fileGroup = fileGroups->data[j];
    17             if (fileGroup->read) continue;
     16        // check for any fileGroups which can read data
     17        for (int j = 0; j < fileGroups->n; j++) {
     18            ppMergeFileGroup *fileGroup = fileGroups->data[j];
     19            if (fileGroup->read) continue;
    1820
    19             // find max last scan so far
    20             int lastScan = 0;
    21             for (int i = 0; i < fileGroups->n; i++) {
    22                 ppMergeFileGroup *fileGroup = fileGroups->data[i];
    23                 lastScan = PS_MAX (fileGroup->lastScan, lastScan);
    24             }
    25             fileGroup->firstScan = lastScan;
    26             fileGroup->lastScan = lastScan + rows;
     21            // find max last scan so far
     22            int lastScan = 0;
     23            for (int i = 0; i < fileGroups->n; i++) {
     24                ppMergeFileGroup *fileGroup = fileGroups->data[i];
     25                lastScan = PS_MAX(fileGroup->lastScan, lastScan);
     26            }
     27            fileGroup->firstScan = lastScan;
     28            fileGroup->lastScan = lastScan + rows;
    2729
    28             psArray *readouts = fileGroup->readouts;
     30            psArray *readouts = fileGroup->readouts;
    2931
    30             psTimerStart ("ppMergeReadChunk");
     32            psTimerStart ("ppMergeReadChunk");
    3133
    32             psTrace("ppStack", 2, "Reading data for chunk %d into fileGroup %d....n", numChunk, j);
    33             for (int i = 0; i < readouts->n; i++) {
    34                 pmReadout *inRO = readouts->data[i]; // Input readout
     34            psTrace("ppStack", 2, "Reading data for chunk %d into fileGroup %d....n", numChunk, j);
     35            for (int i = 0; i < readouts->n; i++) {
     36                pmReadout *inRO = readouts->data[i]; // Input readout
    3537
    36                 // override the recorded last scan
    37                 inRO->thisImageScan  = fileGroup->firstScan;
    38                 inRO->thisWeightScan = fileGroup->firstScan;
    39                 inRO->thisMaskScan   = fileGroup->firstScan;
     38                // override the recorded last scan
     39                inRO->thisImageScan  = fileGroup->firstScan;
     40                inRO->thisWeightScan = fileGroup->firstScan;
     41                inRO->thisMaskScan   = fileGroup->firstScan;
    4042
    41                 // Read a chunk from a file
    42                 pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT", i);
     43                // Read a chunk from a file
     44                pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT", i);
    4345
    44                 bool keepReading = false;
    45                 if (pmReadoutMore(inRO, file->fits, 0, rows, config)) {
    46                     keepReading = true;
    47                     if (!pmReadoutReadChunk(inRO, file->fits, 0, rows, 0, config)) {
    48                         psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT %d", numChunk, i);
    49                         *status = false;
    50                         return NULL;
    51                     }                                                   
    52                 }
     46                bool keepReading = false;
     47                if (pmReadoutMore(inRO, file->fits, 0, rows, config)) {
     48                    keepReading = true;
     49                    if (!pmReadoutReadChunk(inRO, file->fits, 0, rows, 0, config)) {
     50                        psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT %d",
     51                                numChunk, i);
     52                        *status = false;
     53                        return NULL;
     54                    }
     55                }
    5356
    54                 if (haveMasks && pmReadoutMoreMask(inRO, file->fits, 0, rows, config)) {
    55                     keepReading = true;
    56                     pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT.MASK", i);
    57                     if (!pmReadoutReadChunkMask(inRO, file->fits, 0, rows, 0, config)) {
    58                         psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT.MASK %d", numChunk, i);
    59                         *status = false;
    60                         return NULL;
    61                     }                                                   
    62                 }
     57                if (haveMasks && pmReadoutMoreMask(inRO, file->fits, 0, rows, config)) {
     58                    keepReading = true;
     59                    pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT.MASK", i);
     60                    if (!pmReadoutReadChunkMask(inRO, file->fits, 0, rows, 0, config)) {
     61                        psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT.MASK %d",
     62                                numChunk, i);
     63                        *status = false;
     64                        return NULL;
     65                    }
     66                }
    6367
    64                 if (haveWeights && pmReadoutMoreWeight(inRO, file->fits, 0, rows, config)) {
    65                     keepReading = true;
    66                     pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT.WEIGHT", i);
    67                     if (!pmReadoutReadChunkWeight(inRO, file->fits, 0, rows, 0, config)) {
    68                         psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT.WEIGHT %d", numChunk, i);
    69                         *status = false;
    70                         return NULL;
    71                     }                                                   
    72                 }
    73                 if (!keepReading) {
    74                     return NULL;
    75                 }
    76             }
     68                if (haveWeights && pmReadoutMoreWeight(inRO, file->fits, 0, rows, config)) {
     69                    keepReading = true;
     70                    pmFPAfile *file = pmFPAfileSelectSingle(config->files, "PPMERGE.INPUT.WEIGHT", i);
     71                    if (!pmReadoutReadChunkWeight(inRO, file->fits, 0, rows, 0, config)) {
     72                        psError(PS_ERR_IO, false, "Unable to read chunk %d for file PPMERGE.INPUT.WEIGHT %d",
     73                                numChunk, i);
     74                        *status = false;
     75                        return NULL;
     76                    }
     77                }
     78                if (!keepReading) {
     79                    return NULL;
     80                }
     81            }
    7782
    78             fileGroup->read = fileGroup->busy = true;
    79             return fileGroup;
    80         }
     83            fileGroup->read = fileGroup->busy = true;
     84            return fileGroup;
     85        }
    8186
    82         // check for any fileGroups which are done processing
    83         bool wait = false;
    84         for (int j = 0; j < fileGroups->n; j++) {
    85             ppMergeFileGroup *fileGroup = fileGroups->data[j];
    86             if (!fileGroup->read || fileGroup->busy) continue;
    87             fileGroup->read = false;
    88             wait = true;
    89         }
    90         if (wait) usleep (10000);
     87        // Check for threads that are ready to read
     88        bool wait = true;
     89        for (int j = 0; j < fileGroups->n; j++) {
     90            ppMergeFileGroup *fileGroup = fileGroups->data[j];
     91            if (fileGroup->busy) {
     92                continue;
     93            }
     94            fileGroup->read = false;
     95            wait = false;
     96        }
     97        if (wait) {
     98            // No threads currently available
     99            usleep(THREAD_WAIT);
     100        }
    91101    }
    92102    return NULL;
  • trunk/ppMerge/src/ppMergeSetThreads.c

    r18862 r18967  
    22
    33// "PPMERGE_READOUT_COMBINE", 5
    4 bool ppMergeThread_pmReadoutCombine (psThreadJob *job) {
     4bool ppMergeThread_pmReadoutCombine(const psThreadJob *job)
     5{
     6    PS_ASSERT_THREAD_JOB_NON_NULL(job, false);
    57
    68    pmReadout *output           = job->args->data[0];
     
    1315
    1416    // after we are done, tell the I/O system that this file group is done
    15     fileGroup->busy = false; 
     17    fileGroup->busy = false;
    1618    return status;
    1719}
    1820
    19 bool ppMergeThread_pmDarkCombine (psThreadJob *job) {
     21bool ppMergeThread_pmDarkCombine(const psThreadJob *job)
     22{
     23    PS_ASSERT_THREAD_JOB_NON_NULL(job, false);
    2024
    2125    pmCell *outCell             = job->args->data[0];
    2226    ppMergeFileGroup *fileGroup = job->args->data[1];
    23     psScalar *iter              = job->args->data[2];
    24     psScalar *rej               = job->args->data[3];
    25     psScalar *maskVal           = job->args->data[4];
     27    psScalar *iter              = job->args->data[2];
     28    psScalar *rej               = job->args->data[3];
     29    psScalar *maskVal           = job->args->data[4];
    2630
    2731    bool status = pmDarkCombine(outCell, fileGroup->readouts, iter->data.S32, rej->data.F32, maskVal->data.U8);
     
    3236}
    3337
    34 bool ppMergeThread_pmShutterCorrectionGenerate (psThreadJob *job) {
     38bool ppMergeThread_pmShutterCorrectionGenerate(const psThreadJob *job)
     39{
     40    PS_ASSERT_THREAD_JOB_NON_NULL(job, false);
    3541
    3642    pmReadout *output             = job->args->data[0];
     
    3945    psScalar *shutterRef          = job->args->data[3];
    4046    pmShutterCorrectionData *data = job->args->data[4];
    41     psScalar *iter                = job->args->data[5];
    42     psScalar *rej                 = job->args->data[6];
    43     psScalar *maskVal             = job->args->data[7];
     47    psScalar *iter                = job->args->data[5];
     48    psScalar *rej                 = job->args->data[6];
     49    psScalar *maskVal             = job->args->data[7];
    4450
    4551    bool status = pmShutterCorrectionGenerate(output, pattern, fileGroup->readouts, shutterRef->data.F32, data, iter->data.S32, rej->data.F32, maskVal->data.U8);
     
    5056}
    5157
    52 bool ppMergeSetThreads () {
     58bool ppMergeSetThreads(void)
     59{
    5360
    54     psThreadTask *task = NULL;
     61    {
     62        psThreadTask *task = psThreadTaskAlloc("PPMERGE_READOUT_COMBINE", 5);
     63        task->function = &ppMergeThread_pmReadoutCombine;
     64        psThreadTaskAdd(task);
     65        psFree(task);
     66    }
    5567
    56     task = psThreadTaskAlloc ("PPMERGE_READOUT_COMBINE", 5);
    57     task->function = &ppMergeThread_pmReadoutCombine;
    58     psThreadTaskAdd (task);
     68    {
     69        psThreadTask *task = psThreadTaskAlloc("PPMERGE_DARK_COMBINE", 5);
     70        task->function = &ppMergeThread_pmDarkCombine;
     71        psThreadTaskAdd(task);
     72        psFree(task);
     73    }
    5974
    60     task = psThreadTaskAlloc ("PPMERGE_DARK_COMBINE", 5);
    61     task->function = &ppMergeThread_pmDarkCombine;
    62     psThreadTaskAdd (task);
    63 
    64     task = psThreadTaskAlloc ("PPMERGE_SHUTTER_CORRECTION", 8);
    65     task->function = &ppMergeThread_pmShutterCorrectionGenerate;
    66     psThreadTaskAdd (task);
     75    {
     76        psThreadTask *task = psThreadTaskAlloc("PPMERGE_SHUTTER_CORRECTION", 8);
     77        task->function = &ppMergeThread_pmShutterCorrectionGenerate;
     78        psThreadTaskAdd(task);
     79        psFree(task);
     80    }
    6781
    6882    return true;
  • trunk/pswarp/src/pswarpTransformReadout.c

    r18884 r18967  
    8484                return false;
    8585            }
    86             psFree (args);
     86            psFree(job);
     87            psFree(args);
    8788        }
    8889    }
     
    9091    // wait for the threads to finish and manage results
    9192    // wait here for the threaded jobs to finish
    92     if (!psThreadPoolWait ()) {
     93    if (!psThreadPoolWait (false)) {
    9394        psError(PS_ERR_UNKNOWN, false, "Unable to interpolate image.");
    9495        return false;
Note: See TracChangeset for help on using the changeset viewer.