IPP Software Navigation Tools IPP Links Communication Pan-STARRS Links

Changeset 18822


Ignore:
Timestamp:
Jul 31, 2008, 1:24:22 PM (18 years ago)
Author:
eugene
Message:

some threading simplifications: threaded and unthreaded use the same code; added concept of a thread task

Location:
branches/eam_branch_20080719
Files:
2 added
10 edited

Legend:

Unmodified
Added
Removed
  • branches/eam_branch_20080719/ppMerge/src/Makefile.am

    r18819 r18822  
    1313        ppMergeReadChunk.c      \
    1414        ppMergeLoop_Threaded.c  \
    15         ppMergeThreadLauncher.c \
     15        ppMergeSetThreads.c     \
    1616        ppMergeMask.c
    1717
  • branches/eam_branch_20080719/ppMerge/src/ppMerge.h

    r18818 r18822  
    105105void *ppMergeThreadLauncher (void *data);
    106106
     107bool ppMergeSetThreads ();
     108
    107109#endif
  • branches/eam_branch_20080719/ppMerge/src/ppMergeArguments.c

    r18759 r18822  
    171171    }
    172172
    173 # if (THREADED)
    174173    // Number of threads
    175174    if ((argnum = psArgumentGet(argc, argv, "-threads"))) {
     
    181180        // create the thread pool with number of desired threads, supplying our thread launcher function
    182181        // XXX need to determine the number of threads from the config data
    183         psThreadPoolInit (nThreads, &ppMergeThreadLauncher);
    184     }
    185 # endif
     182        psThreadPoolInit (nThreads);
     183    }
     184    ppMergeSetThreads();
    186185
    187186    if (argc == 1 || !psArgumentParse(arguments, &argc, argv) || argc != 3) {
  • branches/eam_branch_20080719/ppMerge/src/ppMergeLoop_Threaded.c

    r18815 r18822  
    197197
    198198            // Read input data by chunks
    199             psTimerStart ("ppMergeLoop");
     199            // psTimerStart ("ppMergeLoop");
    200200            for (int numChunk = 0; true; numChunk++) {
    201201
     
    205205                if (!fileGroup) break;
    206206
     207                psThreadJob *job = NULL;
     208
    207209                switch (type) {
    208210                  case PPMERGE_TYPE_SHUTTER:
    209                     if (nThreads) {
    210                         // allocate a job
    211                         psThreadJob *job = psThreadJobAlloc ("PPMERGE_SHUTTER_CORRECTION", 0);
    212 
    213                         // construct the arguments for this job
    214                         psArrayAdd (job->args, 1, outRO);
    215                         psArrayAdd (job->args, 1, fileGroup);
    216                         psArrayAdd (job->args, 1, psScalarAlloc(shutterRef, PS_TYPE_F32));
    217                         psArrayAdd (job->args, 1, shutters->data[cellNum]);
    218                         psArrayAdd (job->args, 1, psScalarAlloc(iter, PS_TYPE_S32));
    219                         psArrayAdd (job->args, 1, psScalarAlloc(rej, PS_TYPE_F32));
    220                         psArrayAdd (job->args, 1, psScalarAlloc(maskVal, PS_TYPE_U8));
    221 
    222                         psThreadJobAddPending (job);
    223                     } else {
    224                         if (!pmShutterCorrectionGenerate(outRO, NULL, fileGroup->readouts, shutterRef, shutters->data[cellNum], iter, rej, maskVal)) {
    225                             goto ERROR;
    226                         }
    227                         fileGroup->busy = false;
     211                    // allocate a job
     212                    job = psThreadJobAlloc ("PPMERGE_SHUTTER_CORRECTION");
     213
     214                    // construct the arguments for this job
     215                    psArrayAdd (job->args, 1, outRO);
     216                    psArrayAdd (job->args, 1, fileGroup);
     217                    psArrayAdd (job->args, 1, psScalarAlloc(shutterRef, PS_TYPE_F32));
     218                    psArrayAdd (job->args, 1, shutters->data[cellNum]);
     219                    psArrayAdd (job->args, 1, psScalarAlloc(iter, PS_TYPE_S32));
     220                    psArrayAdd (job->args, 1, psScalarAlloc(rej, PS_TYPE_F32));
     221                    psArrayAdd (job->args, 1, psScalarAlloc(maskVal, PS_TYPE_U8));
     222
     223                    // call: pmShutterCorrectionGenerate(outRO, NULL, fileGroup->readouts, shutterRef, shutters->data[cellNum], iter, rej, maskVal)
     224                    if (!psThreadJobAddPending (job)) {
     225                        goto ERROR;
    228226                    }
    229227                    break;
    230228                  case PPMERGE_TYPE_DARK:
    231                     if (nThreads) {
    232                         // allocate a job
    233                         psThreadJob *job = psThreadJobAlloc ("PPMERGE_DARK_COMBINE", 0);
    234 
    235                         // construct the arguments for this job
    236                         psArrayAdd (job->args, 1, outCell);
    237                         psArrayAdd (job->args, 1, fileGroup);
    238                         psArrayAdd (job->args, 1, darkOrdinates);
    239                         psArrayAdd (job->args, 1, darkNorm);
    240                         psArrayAdd (job->args, 1, psScalarAlloc(iter, PS_TYPE_S32));
    241                         psArrayAdd (job->args, 1, psScalarAlloc(rej, PS_TYPE_F32));
    242                         psArrayAdd (job->args, 1, psScalarAlloc(maskVal, PS_TYPE_U8));
    243 
    244                         psThreadJobAddPending (job);
    245                     } else {
    246                         if (!pmDarkCombine(outCell, fileGroup->readouts, darkOrdinates, darkNorm, iter, rej, maskVal)) {
    247                             goto ERROR;
    248                         }
    249                         fileGroup->busy = false;
     229                    // allocate a job
     230                    job = psThreadJobAlloc ("PPMERGE_DARK_COMBINE");
     231
     232                    // construct the arguments for this job
     233                    psArrayAdd (job->args, 1, outCell);
     234                    psArrayAdd (job->args, 1, fileGroup);
     235                    psArrayAdd (job->args, 1, darkOrdinates);
     236                    psArrayAdd (job->args, 1, darkNorm);
     237                    psArrayAdd (job->args, 1, psScalarAlloc(iter, PS_TYPE_S32));
     238                    psArrayAdd (job->args, 1, psScalarAlloc(rej, PS_TYPE_F32));
     239                    psArrayAdd (job->args, 1, psScalarAlloc(maskVal, PS_TYPE_U8));
     240
     241                    // call: pmDarkCombine(outCell, fileGroup->readouts, darkOrdinates, darkNorm, iter, rej, maskVal);
     242                    if (!psThreadJobAddPending (job)) {                 
     243                        goto ERROR;
    250244                    }
    251245                    break;
     
    253247                  case PPMERGE_TYPE_FLAT:
    254248                  case PPMERGE_TYPE_FRINGE:
    255                     if (nThreads) {
    256                         // allocate a job
    257                         psThreadJob *job = psThreadJobAlloc ("PPMERGE_READOUT_COMBINE", 0);
    258 
    259                         // construct the arguments for this job
    260                         psArrayAdd (job->args, 1, outRO);
    261                         psArrayAdd (job->args, 1, fileGroup);
    262                         psArrayAdd (job->args, 1, zeros);
    263                         psArrayAdd (job->args, 1, scales);
    264                         psArrayAdd (job->args, 1, combination);
    265 
    266                         psThreadJobAddPending (job);
    267                     } else {
    268                         if (!pmReadoutCombine(outRO, fileGroup->readouts, zeros, scales, combination)) {
    269                             goto ERROR;
    270                         }
    271                         fileGroup->busy = false;
     249                    // allocate a job
     250                    job = psThreadJobAlloc ("PPMERGE_READOUT_COMBINE");
     251
     252                    // construct the arguments for this job
     253                    psArrayAdd (job->args, 1, outRO);
     254                    psArrayAdd (job->args, 1, fileGroup);
     255                    psArrayAdd (job->args, 1, zeros);
     256                    psArrayAdd (job->args, 1, scales);
     257                    psArrayAdd (job->args, 1, combination);
     258
     259                    // call: pmReadoutCombine(outRO, fileGroup->readouts, zeros, scales, combination);
     260                    if (!psThreadJobAddPending (job)) {
     261                        goto ERROR;
    272262                    }
    273263                    break;
     
    278268
    279269            // wait for the threads to finish and manage results
    280             if (nThreads) {
    281                 // wait here for the threaded jobs to finish
    282                 if (!psThreadPoolWait ()) {
    283                     psError(PS_ERR_UNKNOWN, false, "Unable to interpolate image.");
    284                     return false;
    285                 }
    286                 fprintf (stderr, "success for threaded jobs\n");
    287 
    288                 // we don't care about the results, just dump the done queue jobs
    289                 psThreadJob *job = NULL;
    290                 while ((job = psThreadJobGetDone()) != NULL) {
    291                     psFree (job);
    292                 }
     270            if (!psThreadPoolWait ()) {
     271                psError(PS_ERR_UNKNOWN, false, "Unable to combine images.");
     272                return false;
    293273            }
     274
     275            // we don't care about the results, just dump the done queue jobs
     276            psThreadJob *job = NULL;
     277            while ((job = psThreadJobGetDone()) != NULL) {
     278                psFree (job);
     279            }
     280
    294281            psFree(fileGroups);
    295282
     
    308295            }
    309296            psFree(inCells);
    310             fprintf (stdout, "done ppMergeLoop for cell : %f\n", psTimerMark ("ppMergeLoop"));
     297            // fprintf (stdout, "done ppMergeLoop for cell : %f\n", psTimerMark ("ppMergeLoop"));
    311298
    312299            // Plug supplementary images into their own FPAs
  • branches/eam_branch_20080719/psLib/src/sys/psThread.c

    r18808 r18822  
    66#include <stdarg.h>
    77#include <unistd.h>
     8#include <string.h>
    89
    910#include "psAssert.h"
     
    1920static psList *pending = NULL;          // queue of pending jobs
    2021static psList *done = NULL;             // queue of done jobs
    21 
    2222static psArray *pool = NULL;            // array of defined threads
     23static psArray *tasks = NULL;           // queue of tasks
     24
     25/***** basic thread functions *****/
    2326
    2427void psThreadLock () {
     
    3033    pthread_mutex_unlock (&mutex);
    3134    return;
    32 }
    33 
    34 void psThreadJobFree (psThreadJob *job) {
    35 
    36     if (!job) return;
    37 
    38     psFree (job->type);
    39     psFree (job->args);
    40     return;
    41 }
    42 
    43 // allocate a psThreadJob with nArgs arguments
    44 psThreadJob *psThreadJobAlloc (char *type, int nArgs) {
    45    
    46     psThreadJob *job = (psThreadJob *)psAlloc(sizeof(psThreadJob));
    47     psMemSetDeallocator(job, (psFreeFunc)psThreadJobFree);
    48 
    49     job->type = psStringCopy (type);
    50     job->args = psArrayAllocEmpty (nArgs);
    51     return job;
    52 }
    53 
    54 // add a job to the queue of pending jobs
    55 bool psThreadJobAddPending (psThreadJob *job) {
    56 
    57     psThreadLock ();
    58     if (pending == NULL) {
    59         pending = psListAlloc(NULL);
    60     }
    61 
    62     psListAdd (pending, PS_LIST_TAIL, job);
    63     psThreadUnlock ();
    64     return true;
    65 }
    66 
    67 // this function is not locked -- see thread launder for example
    68 psThreadJob *psThreadJobGetPending () {
    69 
    70     if (!pending) return NULL;
    71 
    72     psThreadJob *job = psListGetAndRemove (pending, PS_LIST_HEAD);
    73 
    74     // jobs we pull off the pending queue get placed on the done queue
    75     if (job) {
    76         if (done == NULL) {
    77             done = psListAlloc(NULL);
    78         }
    79         psListAdd (done, PS_LIST_TAIL, job);
    80     }
    81     return job;
    82 }
    83 
    84 // this function is not locked -- see thread launder for example
    85 psThreadJob *psThreadJobGetDone () {
    86 
    87     if (!done) return NULL;
    88 
    89     psThreadJob *job = psListGetAndRemove (done, PS_LIST_HEAD);
    90     return job;
    9135}
    9236
     
    10650}
    10751
     52/***** thread job functions *****/
     53
     54void psThreadJobFree (psThreadJob *job) {
     55
     56    if (!job) return;
     57
     58    psFree (job->type);
     59    psFree (job->args);
     60    return;
     61}
     62
     63// allocate a psThreadJob of the given type
     64psThreadJob *psThreadJobAlloc (char *type) {
     65   
     66    psThreadJob *job = (psThreadJob *)psAlloc(sizeof(psThreadJob));
     67    psMemSetDeallocator(job, (psFreeFunc)psThreadJobFree);
     68
     69    job->type = psStringCopy (type);
     70    job->args = psArrayAlloc (0);
     71    return job;
     72}
     73
     74// add a job to the queue of pending jobs
     75bool psThreadJobAddPending (psThreadJob *job) {
     76
     77    // if we failed to call psThreadPoolInit, or we called it with nThreads == 0,
     78    // find the matching function and just run it.
     79    if (!pool || !pool->n) {
     80
     81        // in non-threaded operation, the job is placed on the done list and immediately run
     82        if (done == NULL) {
     83            done = psListAlloc(NULL);
     84        }
     85        psListAdd (done, PS_LIST_TAIL, job);
     86
     87        // find the corresponding task and run it
     88        for (int i = 0; i < tasks->n; i++) {
     89            psThreadTask *task = tasks->data[i];
     90            if (strcmp (job->type, task->type)) continue;
     91
     92            psAssert (job->args->n == task->nArgs, "invalid number of arguments to %s", task->type);
     93
     94            bool status = task->function(job);
     95            return status;
     96        }
     97        return false;
     98    }
     99
     100    psThreadLock ();
     101    if (pending == NULL) {
     102        pending = psListAlloc(NULL);
     103    }
     104
     105    psListAdd (pending, PS_LIST_TAIL, job);
     106    psThreadUnlock ();
     107    return true;
     108}
     109
     110// this function is not locked -- see thread launder for example
     111psThreadJob *psThreadJobGetPending () {
     112
     113    if (!pending) return NULL;
     114
     115    psThreadJob *job = psListGetAndRemove (pending, PS_LIST_HEAD);
     116
     117    // jobs we pull off the pending queue get placed on the done queue
     118    if (job) {
     119        if (done == NULL) {
     120            done = psListAlloc(NULL);
     121        }
     122        psListAdd (done, PS_LIST_TAIL, job);
     123    }
     124    return job;
     125}
     126
     127// this function is not locked -- see thread launder for example
     128psThreadJob *psThreadJobGetDone () {
     129
     130    if (!done) return NULL;
     131
     132    psThreadJob *job = psListGetAndRemove (done, PS_LIST_HEAD);
     133    return job;
     134}
     135
     136/***** thread task functions *****/
     137
     138void psThreadTaskFree (psThreadTask *task) {
     139
     140    if (!task) return;
     141
     142    psFree (task->type);
     143    return;
     144}
     145
     146// allocate a psThreadTask with nArgs arguments
     147psThreadTask *psThreadTaskAlloc (char *type, int nArgs) {
     148   
     149    psThreadTask *task = (psThreadTask *)psAlloc(sizeof(psThreadTask));
     150    psMemSetDeallocator(task, (psFreeFunc)psThreadTaskFree);
     151
     152    task->type = psStringCopy (type);
     153    task->nArgs = nArgs;
     154    task->function = NULL;
     155    return task;
     156}
     157
     158// add a task to the collection of tasks
     159bool psThreadTaskAdd (psThreadTask *task) {
     160
     161    if (tasks == NULL) {
     162        tasks = psArrayAllocEmpty(8);
     163    }
     164
     165    psArrayAdd (tasks, 1, task);
     166    return true;
     167}
     168
     169// each thread runs this function to choose the task functions
     170void *psThreadLauncher (void *data) {
     171
     172    psThread *self = data;
     173    psThreadJob *job = NULL;
     174
     175    while (1) {
     176
     177        // if we get an error, just wait until we are cleared or killed
     178        while (self->fault) {
     179            usleep (10000);
     180        }
     181
     182        // if no tasks are assigned, just wait until they are
     183        while (tasks == NULL) {
     184            usleep (10000);
     185        }
     186
     187        // request a new job, if there are none available, sleep a bit
     188        // we have to lock here so the job queue cannot be empty yet no threads busy
     189        psThreadLock();
     190        while ((job = psThreadJobGetPending ()) == NULL) {
     191            psThreadUnlock();
     192            usleep (10000);
     193            psThreadLock(); // XXX ???
     194        }
     195        self->busy = true;
     196
     197        for (int i = 0; i < tasks->n; i++) {
     198            psThreadTask *task = tasks->data[i];
     199            if (strcmp (job->type, task->type)) continue;
     200
     201            psThreadUnlock();
     202            psAssert (job->args->n == task->nArgs, "invalid number of arguments to %s", task->type);
     203
     204            bool status = task->function(job);
     205            if (!status) {
     206                self->fault = true;
     207            }
     208            self->busy = false; 
     209            break;
     210        }
     211        psThreadUnlock();
     212        // XXX what do we do if the job is unknown?
     213    }
     214}
     215
     216/***** thread pool functions *****/
     217
    108218// create a pool of Nthreads, each running the user's job-launcher function
    109 bool psThreadPoolInit (int nThreads, psThreadLaunchJobsFunction function) {
     219bool psThreadPoolInit (int nThreads) {
    110220
    111221    if (pool) psAbort ("psThreadsInit already called");
     
    114224    for (int i = 0; i < nThreads; i++) {
    115225        psThread *thread = psThreadAlloc();
    116         pthread_create (&thread->pt, NULL, function, thread);
     226        pthread_create (&thread->pt, NULL, psThreadLauncher, thread);
    117227        pool->data[i] = thread;
    118228    }
     
    126236    // an error is detected on one of the threads or until
    127237    // all threads are idle and no jobs are left on the queue
     238
     239    if (!pool) return true;
     240    if (!pool->n) return true;
    128241
    129242    while (1) {
     
    169282    pool = NULL;
    170283
    171     return true;
    172 }
    173 
     284    psFree (tasks);
     285    tasks = NULL;
     286
     287    return true;
     288}
     289
  • branches/eam_branch_20080719/psLib/src/sys/psThread.h

    r18754 r18822  
    44 *
    55 *  @author EAM, IFA
    6  *  @version $Revision: 1.1.2.3 $ $Name: not supported by cvs2svn $
    7  *  @date $Date: 2008-07-27 22:42:32 $
     6 *  @version $Revision: 1.1.2.4 $ $Name: not supported by cvs2svn $
     7 *  @date $Date: 2008-07-31 23:24:22 $
    88 *
    99 *  Copyright 2004-2005 Insitute for Astronomy, University of Hawaii
     
    2727} psThread;
    2828
    29 typedef void *(*psThreadLaunchJobsFunction)(void *data);
     29typedef bool (*psThreadTaskFunction)(psThreadJob *job);
     30
     31typedef struct {
     32    psString type;
     33    int nArgs;
     34    psThreadTaskFunction function;
     35} psThreadTask;
     36
     37// typedef void *(*psThreadLaunchJobsFunction)(void *data);
    3038
    3139void psThreadLock ();
    3240void psThreadUnlock ();
    3341
    34 psThreadJob *psThreadJobAlloc (char *type, int nArgs);
     42psThread *psThreadAlloc ();
     43
     44psThreadJob *psThreadJobAlloc (char *type);
    3545bool psThreadJobAddPending (psThreadJob *job);
    3646psThreadJob *psThreadJobGetPending ();
    3747psThreadJob *psThreadJobGetDone ();
    3848
    39 psThread *psThreadAlloc ();
     49psThreadTask *psThreadTaskAlloc (char *type, int nArgs);
     50bool psThreadTaskAdd (psThreadTask *task);
     51void *psThreadLauncher (void *data);
    4052
    41 bool psThreadPoolInit (int nThreads, psThreadLaunchJobsFunction function);
     53bool psThreadPoolInit (int nThreads);
    4254bool psThreadPoolWait ();
    4355bool psThreadPoolFinalize ();
  • branches/eam_branch_20080719/pswarp/src/Makefile.am

    r18753 r18822  
    1616        pswarpPixelFraction.c           \
    1717        pswarpSetMaskBits.c             \
    18         pswarpThreadLauncher.c          \
     18        pswarpSetThreads.c              \
    1919        pswarpTransformReadout.c        \
    2020        pswarpTransformSources.c \
  • branches/eam_branch_20080719/pswarp/src/pswarp.h

    r18753 r18822  
    9090    );
    9191
    92 // thread launcher for this program
    93 void *pswarpThreadLauncher (void *data);
     92// define threads for this program
     93bool pswarpSetThreads ();
  • branches/eam_branch_20080719/pswarp/src/pswarpArguments.c

    r18753 r18822  
    5050        // create the thread pool with number of desired threads, supplying our thread launcher function
    5151        // XXX need to determine the number of threads from the config data
    52         psThreadPoolInit (nThreads, &pswarpThreadLauncher);
     52        psThreadPoolInit (nThreads);
    5353    }
     54    pswarpSetThreads ();
    5455
    5556    // PSF determination?
  • branches/eam_branch_20080719/pswarp/src/pswarpTransformReadout.c

    r18753 r18822  
    7171            args->goodPixels = 0;
    7272
    73             if (nThreads) {
    74                 // allocate a job
    75                 psThreadJob *job = psThreadJobAlloc ("PSWARP_TRANSFORM_TILE", 0);
     73            // allocate a job
     74            psThreadJob *job = psThreadJobAlloc ("PSWARP_TRANSFORM_TILE");
    7675
    77                 // construct the arguments for this job
    78                 // job is pswarpTransformTile (gridX, gridY);
    79                 psArrayAdd (job->args, 1, args);
    80                 // fprintf (stderr, "adding job %d,%d, Nargs: %ld\n", gridX, gridY, job->args->n);
    81                 psThreadJobAddPending (job);
    82             } else {
    83                 pswarpTransformTile (args);
    84                 goodPixels += args->goodPixels;
     76            // construct the arguments for this job
     77            // job is pswarpTransformTile (gridX, gridY);
     78            psArrayAdd (job->args, 1, args);
     79            // fprintf (stderr, "adding job %d,%d, Nargs: %ld\n", gridX, gridY, job->args->n);
     80
     81            // call: pswarpTransformTile (args);
     82            if (!psThreadJobAddPending (job)) {
     83                psError(PS_ERR_UNKNOWN, false, "Unable to warp image.");
     84                return false;
    8585            }
    8686            psFree (args);
     
    8989
    9090    // wait for the threads to finish and manage results
    91     if (nThreads) {
    92         // wait here for the threaded jobs to finish
    93         if (!psThreadPoolWait ()) {
    94             psError(PS_ERR_UNKNOWN, false, "Unable to interpolate image.");
    95             return false;
     91    // wait here for the threaded jobs to finish
     92    if (!psThreadPoolWait ()) {
     93        psError(PS_ERR_UNKNOWN, false, "Unable to interpolate image.");
     94        return false;
     95    }
     96    fprintf (stderr, "success for threaded jobs\n");
     97
     98    // each job records its own goodPixel values; sum them here
     99    // we have only supplied one type of job, so we can assume the types here
     100    psThreadJob *job = NULL;
     101    while ((job = psThreadJobGetDone()) != NULL) {
     102        if (job->args->n < 1) {
     103            fprintf (stderr, "error with job\n");
     104        } else {
     105            pswarpTransformTileArgs *args = job->args->data[0];
     106            // fprintf (stderr, "finished job %d,%d, Nargs: %ld\n", args->gridX, args->gridY, job->args->n);
     107            goodPixels += args->goodPixels;
    96108        }
    97         fprintf (stderr, "success for threaded jobs\n");
    98 
    99         // each job records its own goodPixel values; sum them here
    100         // we have only supplied one type of job, so we can assume the types here
    101         psThreadJob *job = NULL;
    102         while ((job = psThreadJobGetDone()) != NULL) {
    103             if (job->args->n < 1) {
    104                 fprintf (stderr, "error with job\n");
    105             } else {
    106                 pswarpTransformTileArgs *args = job->args->data[0];
    107                 // fprintf (stderr, "finished job %d,%d, Nargs: %ld\n", args->gridX, args->gridY, job->args->n);
    108                 goodPixels += args->goodPixels;
    109             }
    110             psFree (job);
    111         }
     109        psFree (job);
    112110    }
    113111    psFree(interp);
Note: See TracChangeset for help on using the changeset viewer.