IPP Software Navigation Tools IPP Links Communication Pan-STARRS Links

Changeset 18953


Ignore:
Timestamp:
Aug 8, 2008, 8:05:09 AM (18 years ago)
Author:
Paul Price
Message:

Cleaning up thread functions, documenting header. Added 'harvest' option to psThreadPoolWait to save user from the trouble of harvesting. Fixed memory problem with psListGetAndRemove.

Location:
trunk/psLib/src/sys
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/psLib/src/sys/psConfigure.c

    r18827 r18953  
    1313 *  @author Robert DeSonia, MHPCC
    1414 *
    15  *  @version $Revision: 1.27 $ $Name: not supported by cvs2svn $
    16  *  @date $Date: 2008-07-31 23:56:29 $
     15 *  @version $Revision: 1.28 $ $Name: not supported by cvs2svn $
     16 *  @date $Date: 2008-08-08 18:05:08 $
    1717 *
    1818 *  Copyright 2004-2005 Maui High Performance Computing Center, University of Hawaii
     
    3434#include "psEarthOrientation.h"
    3535#include "psError.h"
     36#include "psFFT.h"
    3637#include "psConfigure.h"
    3738#include "psMemory.h"
     
    125126    psTimerStop();
    126127
     128    // Clean up FFTW threads
     129    psFFTThreads(0);
     130
     131    // Clean up threads
    127132    psThreadPoolFinalize();
    128133
  • trunk/psLib/src/sys/psThread.c

    r18827 r18953  
    11#ifdef HAVE_CONFIG_H
    2 # include "config.h"
     2#include "config.h"
    33#endif
    44
     
    1717#include "psThread.h"
    1818
    19 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    20 static psList *pending = NULL;          // queue of pending jobs
    21 static psList *done = NULL;             // queue of done jobs
    22 static psArray *pool = NULL;            // array of defined threads
    23 static psArray *tasks = NULL;           // queue of tasks
     19#define THREAD_WAIT 10000               // Microseconds to wait for threads
     20
     21static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // Mutex for locking threads
     22static psList *pending = NULL;          // queue of pending jobs
     23static psList *done = NULL;             // queue of done jobs
     24static psArray *pool = NULL;            // array of defined threads
     25static psArray *tasks = NULL;           // queue of tasks
    2426
    2527/***** basic thread functions *****/
    2628
    27 void psThreadLock () {
    28     pthread_mutex_lock (&mutex);
    29     return;
    30 }
    31 
    32 void psThreadUnlock () {
    33     pthread_mutex_unlock (&mutex);
    34     return;
    35 }
    36 
    37 void psThreadFree (psThread *thread) {
     29void psThreadLock(void)
     30{
     31    pthread_mutex_lock(&mutex);
     32    return;
     33}
     34
     35void psThreadUnlock(void)
     36{
     37    pthread_mutex_unlock(&mutex);
     38    return;
     39}
     40
     41static void threadFree(psThread *thread)
     42{
     43    // Nothing to free; this function is merely provided for identification purposes
    3844    return;
    3945}
    4046
    4147// allocate a psThread
    42 psThread *psThreadAlloc () {
    43    
    44     psThread *thread = (psThread *)psAlloc(sizeof(psThread));
    45     psMemSetDeallocator(thread, (psFreeFunc)psThreadFree);
     48psThread *psThreadAlloc(void)
     49{
     50    psThread *thread = (psThread*)psAlloc(sizeof(psThread));
     51    psMemSetDeallocator(thread, (psFreeFunc)threadFree);
    4652
    4753    thread->busy  = false;
     
    5258/***** thread job functions *****/
    5359
    54 void psThreadJobFree (psThreadJob *job) {
    55 
    56     if (!job) return;
    57 
    58     psFree (job->type);
    59     psFree (job->args);
     60static void threadJobFree(psThreadJob *job)
     61{
     62    psFree(job->type);
     63    psFree(job->args);
    6064    return;
    6165}
    6266
    6367// allocate a psThreadJob of the given type
    64 psThreadJob *psThreadJobAlloc (char *type) {
    65    
     68psThreadJob *psThreadJobAlloc(const char *type)
     69{
    6670    psThreadJob *job = (psThreadJob *)psAlloc(sizeof(psThreadJob));
    67     psMemSetDeallocator(job, (psFreeFunc)psThreadJobFree);
    68 
    69     job->type = psStringCopy (type);
    70     job->args = psArrayAlloc (0);
     71    psMemSetDeallocator(job, (psFreeFunc)threadJobFree);
     72
     73    job->type = psStringCopy(type);
     74    job->args = psArrayAlloc(0);
    7175    return job;
    7276}
    7377
    7478// add a job to the queue of pending jobs
    75 bool psThreadJobAddPending (psThreadJob *job) {
     79bool psThreadJobAddPending(psThreadJob *job)
     80{
     81    PS_ASSERT_THREAD_JOB_NON_NULL(job, false);
    7682
    7783    // if we failed to call psThreadPoolInit, or we called it with nThreads == 0,
     
    7985    if (!pool || !pool->n) {
    8086
    81         // in non-threaded operation, the job is placed on the done list and immediately run
    82         if (done == NULL) {
    83             done = psListAlloc(NULL);
    84         }
    85         psListAdd (done, PS_LIST_TAIL, job);
    86 
    87         // find the corresponding task and run it
    88         for (int i = 0; i < tasks->n; i++) {
    89             psThreadTask *task = tasks->data[i];
    90             if (strcmp (job->type, task->type)) continue;
    91 
    92             psAssert (job->args->n == task->nArgs, "invalid number of arguments to %s", task->type);
    93 
    94             bool status = task->function(job);
    95             return status;
    96         }
    97         return false;
    98     }
    99 
    100     psThreadLock ();
    101     if (pending == NULL) {
    102         pending = psListAlloc(NULL);
    103     }
    104 
    105     psListAdd (pending, PS_LIST_TAIL, job);
    106     psThreadUnlock ();
     87        // in non-threaded operation, the job is placed on the done list and immediately run
     88        if (!done) {
     89            done = psListAlloc(NULL);
     90        }
     91        psListAdd(done, PS_LIST_TAIL, job);
     92
     93        // find the corresponding task and run it
     94        for (int i = 0; i < tasks->n; i++) {
     95            psThreadTask *task = tasks->data[i];
     96            if (strcmp (job->type, task->type) != 0) {
     97                continue;
     98            }
     99            psAssert(job->args->n == task->nArgs, "invalid number of arguments to %s", task->type);
     100
     101            bool status = task->function(job);
     102            return status;
     103        }
     104        // Programming error
     105        psAbort("Unable to find job %s", job->type);
     106    }
     107
     108    psThreadLock();
     109    if (!pending) {
     110        pending = psListAlloc(NULL);
     111    }
     112    psListAdd(pending, PS_LIST_TAIL, job);
     113    psThreadUnlock();
     114
    107115    return true;
    108116}
    109117
    110118// this function is not locked -- see thread launder for example
    111 psThreadJob *psThreadJobGetPending () {
    112 
    113     if (!pending) return NULL;
    114 
    115     psThreadJob *job = psListGetAndRemove (pending, PS_LIST_HEAD);
    116 
    117     // jobs we pull off the pending queue get placed on the done queue
    118     if (job) {
    119         if (done == NULL) {
    120             done = psListAlloc(NULL);
    121         }
    122         psListAdd (done, PS_LIST_TAIL, job);
    123     }
     119psThreadJob *psThreadJobGetPending(void)
     120{
     121    if (!pending) {
     122        return NULL;
     123    }
     124
     125    psThreadJob *job = psListGetAndRemove(pending, PS_LIST_HEAD);
    124126    return job;
    125127}
    126128
    127 // this function is not locked -- see thread launder for example
    128 psThreadJob *psThreadJobGetDone () {
    129 
    130     if (!done) return NULL;
    131 
    132     psThreadJob *job = psListGetAndRemove (done, PS_LIST_HEAD);
     129// this function is not locked -- see thread launcher for example
     130psThreadJob *psThreadJobGetDone(void)
     131{
     132    if (!done) {
     133        return NULL;
     134    }
     135
     136    psThreadJob *job = psListGetAndRemove(done, PS_LIST_HEAD);
    133137    return job;
    134138}
     
    136140/***** thread task functions *****/
    137141
    138 void psThreadTaskFree (psThreadTask *task) {
    139 
    140     if (!task) return;
    141 
    142     psFree (task->type);
     142static void threadTaskFree(psThreadTask *task)
     143{
     144    psFree(task->type);
    143145    return;
    144146}
    145147
    146148// allocate a psThreadTask with nArgs arguments
    147 psThreadTask *psThreadTaskAlloc (char *type, int nArgs) {
    148    
     149psThreadTask *psThreadTaskAlloc(const char *type, int nArgs)
     150{
    149151    psThreadTask *task = (psThreadTask *)psAlloc(sizeof(psThreadTask));
    150     psMemSetDeallocator(task, (psFreeFunc)psThreadTaskFree);
    151 
    152     task->type = psStringCopy (type);
     152    psMemSetDeallocator(task, (psFreeFunc)threadTaskFree);
     153
     154    task->type = psStringCopy(type);
    153155    task->nArgs = nArgs;
    154156    task->function = NULL;
     
    157159
    158160// add a task to the collection of tasks
    159 bool psThreadTaskAdd (psThreadTask *task) {
    160 
    161     if (tasks == NULL) {
    162         tasks = psArrayAllocEmpty(8);
    163     }
    164 
    165     psArrayAdd (tasks, 1, task);
     161bool psThreadTaskAdd(psThreadTask *task)
     162{
     163    PS_ASSERT_THREAD_TASK_NON_NULL(task, false);
     164
     165    if (!tasks) {
     166        tasks = psArrayAllocEmpty(8);
     167    }
     168
     169    psArrayAdd(tasks, 1, task);
    166170    return true;
    167171}
    168172
    169173// each thread runs this function to choose the task functions
    170 void *psThreadLauncher (void *data) {
    171 
    172     psThread *self = data;
    173     psThreadJob *job = NULL;
     174void *psThreadLauncher(void *thread)
     175{
     176    psThread *self = thread;            // Thread that's running
    174177
    175178    while (1) {
    176 
    177         // if we get an error, just wait until we are cleared or killed
    178         while (self->fault) {
    179             usleep (10000);
    180         }
    181 
    182         // if no tasks are assigned, just wait until they are
    183         while (tasks == NULL) {
    184             usleep (10000);
    185         }
    186 
    187         // request a new job, if there are none available, sleep a bit
    188         // we have to lock here so the job queue cannot be empty yet no threads busy
    189         psThreadLock();
    190         while ((job = psThreadJobGetPending ()) == NULL) {
    191             psThreadUnlock();
    192             usleep (10000);
    193             psThreadLock(); // XXX ???
    194         }
    195         self->busy = true;
    196 
    197         for (int i = 0; i < tasks->n; i++) {
    198             psThreadTask *task = tasks->data[i];
    199             if (strcmp (job->type, task->type)) continue;
    200 
    201             psThreadUnlock();
    202             psAssert (job->args->n == task->nArgs, "invalid number of arguments to %s", task->type);
    203 
    204             bool status = task->function(job);
    205             if (!status) {
    206                 self->fault = true;
    207             }
    208             self->busy = false; 
    209             break;
    210         }
    211         psThreadUnlock();
    212         // XXX what do we do if the job is unknown?
     179        // if we get an error, just wait until we are cleared or killed
     180        while (self->fault) {
     181            usleep(THREAD_WAIT);
     182        }
     183
     184        // if no tasks are assigned, just wait until they are
     185        while (!tasks) {
     186            usleep(THREAD_WAIT);
     187        }
     188
     189        // request a new job, if there are none available, sleep a bit
     190        // we have to lock here so the job queue cannot be empty yet no threads busy
     191        psThreadLock();
     192        psThreadJob *job = NULL;        // Job to process
     193        while ((job = psThreadJobGetPending()) == NULL) {
     194            psThreadUnlock();
     195            usleep(THREAD_WAIT);
     196            psThreadLock(); // XXX ???
     197        }
     198        self->busy = true;
     199
     200        bool found = false;             // Found the job?
     201        for (int i = 0; i < tasks->n; i++) {
     202            psThreadTask *task = tasks->data[i]; // Task to do
     203            if (strcmp(job->type, task->type) != 0) {
     204                continue;
     205            }
     206            found = true;
     207
     208            psThreadUnlock();
     209            psAssert(job->args->n == task->nArgs, "invalid number of arguments to %s", task->type);
     210
     211            bool status = task->function(job); // Status of executing task
     212
     213            // Put the completed job on the 'done' queue
     214            psThreadLock();
     215            if (!done) {
     216                done = psListAlloc(NULL);
     217            }
     218            psListAdd(done, PS_LIST_TAIL, job);
     219            psFree(job);
     220
     221            if (!status) {
     222                self->fault = true;
     223            }
     224            self->busy = false;
     225            break;
     226        }
     227        psThreadUnlock();
     228        if (!found) {
     229            // Programming error
     230            psAbort("Unable to find job %s", job->type);
     231        }
    213232    }
    214233}
     
    217236
    218237// create a pool of Nthreads, each running the user's job-launcher function
    219 bool psThreadPoolInit (int nThreads) {
    220 
    221     if (pool) psAbort ("psThreadsInit already called");
    222        
    223     pool = psArrayAlloc (nThreads);
     238bool psThreadPoolInit(int nThreads)
     239{
     240    if (pool) {
     241        psAbort("psThreadsInit already called");
     242    }
     243
     244    PS_ASSERT_INT_NONNEGATIVE(nThreads, false);
     245    if (nThreads == 0) {
     246        // No threading
     247        return true;
     248    }
     249
     250    pool = psArrayAlloc(nThreads);
    224251    for (int i = 0; i < nThreads; i++) {
    225         psThread *thread = psThreadAlloc();
    226         pthread_create (&thread->pt, NULL, psThreadLauncher, thread);
    227         pool->data[i] = thread;
     252        psThread *thread = psThreadAlloc(); // Thread for pool
     253        pthread_create(&thread->pt, NULL, psThreadLauncher, thread);
     254        pool->data[i] = thread;
    228255    }
    229256    return true;
    230257}
    231258
    232 // call this function after you have added jobs to the queue and
    233 bool psThreadPoolWait () {
    234 
    235     // this function blocks (waits in usleep) until either
    236     // an error is detected on one of the threads or until
    237     // all threads are idle and no jobs are left on the queue
    238 
    239     if (!pool) return true;
    240     if (!pool->n) return true;
     259int psThreadPoolSize(void)
     260{
     261    return pool ? pool->n : 0;
     262}
     263
     264// call this function after you have added jobs to the queue and
     265bool psThreadPoolWait(bool harvest)
     266{
     267    if (!pool || pool->n == 0) {
     268        // No threads initialised, so everything's done
     269        return true;
     270    }
    241271
    242272    while (1) {
    243 
    244         // check for an error
    245         for (int i = 0; i < pool->n; i++) {
    246             psThread *thread = pool->data[i];
    247             if (thread->fault) return false;
    248         }
    249 
    250         // are all threads idle?
    251         psThreadLock();
    252         for (int i = 0; i < pool->n; i++) {
    253             psThread *thread = pool->data[i];
    254             if (thread->busy) goto SLEEP;
    255         }
    256 
    257         if (!pending) return true;
    258 
    259         // is the queue empty?
    260         if (pending->head == NULL) {
    261             psThreadUnlock();
    262             return true;
    263         }
     273        // check for an error
     274        for (int i = 0; i < pool->n; i++) {
     275            psThread *thread = pool->data[i];
     276            if (thread->fault) {
     277                return false;
     278            }
     279        }
     280
     281        // Harvest jobs, if requested
     282        if (harvest) {
     283            psThreadLock();
     284            psThreadJob *job;           // Job from done queue
     285            while ((job = psThreadJobGetDone())) {
     286                psFree(job);
     287            }
     288            psThreadUnlock();
     289        }
     290
     291        // are all threads idle?
     292        psThreadLock();
     293        for (int i = 0; i < pool->n; i++) {
     294            psThread *thread = pool->data[i];
     295            if (thread->busy) {
     296                // At least one thread is busy: sleep.
     297                goto SLEEP;
     298            }
     299        }
     300
     301        if (!pending || !pending->head) {
     302            // Nothing in the queue
     303            psThreadUnlock();
     304            return true;
     305        }
    264306
    265307    SLEEP:
    266         psThreadUnlock();
    267         usleep (10000);
    268     }
     308        psThreadUnlock();
     309        usleep(THREAD_WAIT);
     310    }
     311
    269312    return false;
    270313}
    271314
    272 // create a pool of Nthreads, each running the user's job-launcher function
    273 bool psThreadPoolFinalize () {
    274 
    275     psFree (pending);
     315bool psThreadPoolFinalize(void)
     316{
     317    psFree(pending);
    276318    pending = NULL;
    277319
    278     psFree (done);
     320    psFree(done);
    279321    done = NULL;
    280322
    281     psFree (pool);
     323    psFree(pool);
    282324    pool = NULL;
    283325
    284     psFree (tasks);
     326    psFree(tasks);
    285327    tasks = NULL;
    286328
  • trunk/psLib/src/sys/psThread.h

    r18827 r18953  
    44 *
    55 *  @author EAM, IFA
    6  *  @version $Revision: 1.2 $ $Name: not supported by cvs2svn $
    7  *  @date $Date: 2008-07-31 23:56:29 $
     6 *  @version $Revision: 1.3 $ $Name: not supported by cvs2svn $
     7 *  @date $Date: 2008-08-08 18:05:09 $
    88 *
    99 *  Copyright 2004-2005 Insitute for Astronomy, University of Hawaii
     
    1616/// @{
    1717
     18/// Job to be executed on a thread
     19///
     20/// This job is passed to the function that executes it
    1821typedef struct {
    19     psString type;
    20     psArray *args;
     22    psString type;                      // Type of thread
     23    psArray *args;                      // Arguments to job
    2124} psThreadJob;
    2225
     26#define PS_ASSERT_THREAD_JOB_NON_NULL(JOB, RVAL) \
     27if (!(JOB) || !(JOB)->type || !(JOB)->args) { \
     28    psError(PS_ERR_UNEXPECTED_NULL, true, "Thread job %s or one of its components is NULL.", #JOB); \
     29    return RVAL; \
     30}
     31
     32/// A thread, which executes a job
     33///
     34/// Wraps pthread with a few extra conveniences
    2335typedef struct {
    24     bool busy;
    25     bool fault;
    26     pthread_t pt;
     36    bool busy;                          // Is the thread busy?
     37    bool fault;                         // Has the thread faulted?
     38    pthread_t pt;                       // The thread itself
    2739} psThread;
    2840
    29 typedef bool (*psThreadTaskFunction)(psThreadJob *job);
     41/// Function to execute a thread job
     42typedef bool (*psThreadTaskFunction)(const psThreadJob *job);
    3043
     44/// Task that is executed on a thread
    3145typedef struct {
    32     psString type;
    33     int nArgs;
    34     psThreadTaskFunction function;
     46    psString type;                      // Type of task
     47    int nArgs;                          // Number of arguments that function takes
     48    psThreadTaskFunction function;      // Function to execute
    3549} psThreadTask;
    3650
    37 // typedef void *(*psThreadLaunchJobsFunction)(void *data);
     51#define PS_ASSERT_THREAD_TASK_NON_NULL(TASK, RVAL) \
     52if (!(TASK) || !(TASK)->type || (TASK)->nArgs < 0 || !(TASK)->function) { \
     53    psError(PS_ERR_UNEXPECTED_NULL, true, "Thread task %s or one of its components is NULL.", #TASK); \
     54    return RVAL; \
     55}
    3856
    39 void psThreadLock ();
    40 void psThreadUnlock ();
    4157
    42 psThread *psThreadAlloc ();
     58/// Lock the thread mutex
     59void psThreadLock(void);
    4360
    44 psThreadJob *psThreadJobAlloc (char *type);
    45 bool psThreadJobAddPending (psThreadJob *job);
    46 psThreadJob *psThreadJobGetPending ();
    47 psThreadJob *psThreadJobGetDone ();
     61/// Unlock the thread mutex
     62void psThreadUnlock(void);
    4863
    49 psThreadTask *psThreadTaskAlloc (char *type, int nArgs);
    50 bool psThreadTaskAdd (psThreadTask *task);
    51 void *psThreadLauncher (void *data);
     64/// Allocate a thread
     65psThread *psThreadAlloc(void);
    5266
    53 bool psThreadPoolInit (int nThreads);
    54 bool psThreadPoolWait ();
    55 bool psThreadPoolFinalize ();
     67/// Allocate a thread job
     68psThreadJob *psThreadJobAlloc(const char *type);
     69
     70/// Add a pending job to the queue
     71bool psThreadJobAddPending(psThreadJob *job);
     72
     73/// Get a job off the queue of pending jobs
     74psThreadJob *psThreadJobGetPending(void);
     75
     76/// Get a job off the queue of done jobs
     77psThreadJob *psThreadJobGetDone(void);
     78
     79/// Allocate a thread task
     80psThreadTask *psThreadTaskAlloc(const char *type, // Type of task
     81                                int nArgs // Number of arguments
     82    );
     83
     84/// Add a task to the list
     85bool psThreadTaskAdd(psThreadTask *task // Task to add
     86    );
     87
     88/// Launch jobs on a thread
     89void *psThreadLauncher(void *thread     // Thread (of type psThread)
     90    );
     91
     92/// Initialise a pool of threads
     93bool psThreadPoolInit(int nThreads      // Number of threads
     94    );
     95
     96/// Return size of thread pool
     97int psThreadPoolSize(void);
     98
     99/// Wait for the thread pool to finish
     100///
     101/// This function blocks (waits in usleep) until either an error is detected on one of the threads or until ll
     102/// threads are idle and no jobs are left on the queue
     103bool psThreadPoolWait(bool harvest      // Harvest the jobs from the queue?
     104    );
     105
     106/// Clean up the thread pool
     107bool psThreadPoolFinalize(void);
    56108
    57109/// @}
Note: See TracChangeset for help on using the changeset viewer.