IPP Software Navigation Tools IPP Links Communication Pan-STARRS Links

Ignore:
Timestamp:
Jul 31, 2008, 1:24:22 PM (18 years ago)
Author:
eugene
Message:

some threading simplifications: threaded and unthreaded use the same code; added concept of a thread task

File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/eam_branch_20080719/psLib/src/sys/psThread.c

    r18808 r18822  
    66#include <stdarg.h>
    77#include <unistd.h>
     8#include <string.h>
    89
    910#include "psAssert.h"
     
    1920static psList *pending = NULL;          // queue of pending jobs
    2021static psList *done = NULL;             // queue of done jobs
    21 
    2222static psArray *pool = NULL;            // array of defined threads
     23static psArray *tasks = NULL;           // queue of tasks
     24
     25/***** basic thread functions *****/
    2326
    2427void psThreadLock () {
     
    3033    pthread_mutex_unlock (&mutex);
    3134    return;
    32 }
    33 
    34 void psThreadJobFree (psThreadJob *job) {
    35 
    36     if (!job) return;
    37 
    38     psFree (job->type);
    39     psFree (job->args);
    40     return;
    41 }
    42 
    43 // allocate a psThreadJob with nArgs arguments
    44 psThreadJob *psThreadJobAlloc (char *type, int nArgs) {
    45    
    46     psThreadJob *job = (psThreadJob *)psAlloc(sizeof(psThreadJob));
    47     psMemSetDeallocator(job, (psFreeFunc)psThreadJobFree);
    48 
    49     job->type = psStringCopy (type);
    50     job->args = psArrayAllocEmpty (nArgs);
    51     return job;
    52 }
    53 
    54 // add a job to the queue of pending jobs
    55 bool psThreadJobAddPending (psThreadJob *job) {
    56 
    57     psThreadLock ();
    58     if (pending == NULL) {
    59         pending = psListAlloc(NULL);
    60     }
    61 
    62     psListAdd (pending, PS_LIST_TAIL, job);
    63     psThreadUnlock ();
    64     return true;
    65 }
    66 
    67 // this function is not locked -- see thread launder for example
    68 psThreadJob *psThreadJobGetPending () {
    69 
    70     if (!pending) return NULL;
    71 
    72     psThreadJob *job = psListGetAndRemove (pending, PS_LIST_HEAD);
    73 
    74     // jobs we pull off the pending queue get placed on the done queue
    75     if (job) {
    76         if (done == NULL) {
    77             done = psListAlloc(NULL);
    78         }
    79         psListAdd (done, PS_LIST_TAIL, job);
    80     }
    81     return job;
    82 }
    83 
    84 // this function is not locked -- see thread launder for example
    85 psThreadJob *psThreadJobGetDone () {
    86 
    87     if (!done) return NULL;
    88 
    89     psThreadJob *job = psListGetAndRemove (done, PS_LIST_HEAD);
    90     return job;
    9135}
    9236
     
    10650}
    10751
     52/***** thread job functions *****/
     53
     54void psThreadJobFree (psThreadJob *job) {
     55
     56    if (!job) return;
     57
     58    psFree (job->type);
     59    psFree (job->args);
     60    return;
     61}
     62
     63// allocate a psThreadJob of the given type
     64psThreadJob *psThreadJobAlloc (char *type) {
     65   
     66    psThreadJob *job = (psThreadJob *)psAlloc(sizeof(psThreadJob));
     67    psMemSetDeallocator(job, (psFreeFunc)psThreadJobFree);
     68
     69    job->type = psStringCopy (type);
     70    job->args = psArrayAlloc (0);
     71    return job;
     72}
     73
     74// add a job to the queue of pending jobs
     75bool psThreadJobAddPending (psThreadJob *job) {
     76
     77    // if we failed to call psThreadPoolInit, or we called it with nThreads == 0,
     78    // find the matching function and just run it.
     79    if (!pool || !pool->n) {
     80
     81        // in non-threaded operation, the job is placed on the done list and immediately run
     82        if (done == NULL) {
     83            done = psListAlloc(NULL);
     84        }
     85        psListAdd (done, PS_LIST_TAIL, job);
     86
     87        // find the corresponding task and run it
     88        for (int i = 0; i < tasks->n; i++) {
     89            psThreadTask *task = tasks->data[i];
     90            if (strcmp (job->type, task->type)) continue;
     91
     92            psAssert (job->args->n == task->nArgs, "invalid number of arguments to %s", task->type);
     93
     94            bool status = task->function(job);
     95            return status;
     96        }
     97        return false;
     98    }
     99
     100    psThreadLock ();
     101    if (pending == NULL) {
     102        pending = psListAlloc(NULL);
     103    }
     104
     105    psListAdd (pending, PS_LIST_TAIL, job);
     106    psThreadUnlock ();
     107    return true;
     108}
     109
     110// this function is not locked -- see thread launder for example
     111psThreadJob *psThreadJobGetPending () {
     112
     113    if (!pending) return NULL;
     114
     115    psThreadJob *job = psListGetAndRemove (pending, PS_LIST_HEAD);
     116
     117    // jobs we pull off the pending queue get placed on the done queue
     118    if (job) {
     119        if (done == NULL) {
     120            done = psListAlloc(NULL);
     121        }
     122        psListAdd (done, PS_LIST_TAIL, job);
     123    }
     124    return job;
     125}
     126
     127// this function is not locked -- see thread launder for example
     128psThreadJob *psThreadJobGetDone () {
     129
     130    if (!done) return NULL;
     131
     132    psThreadJob *job = psListGetAndRemove (done, PS_LIST_HEAD);
     133    return job;
     134}
     135
     136/***** thread task functions *****/
     137
     138void psThreadTaskFree (psThreadTask *task) {
     139
     140    if (!task) return;
     141
     142    psFree (task->type);
     143    return;
     144}
     145
     146// allocate a psThreadTask with nArgs arguments
     147psThreadTask *psThreadTaskAlloc (char *type, int nArgs) {
     148   
     149    psThreadTask *task = (psThreadTask *)psAlloc(sizeof(psThreadTask));
     150    psMemSetDeallocator(task, (psFreeFunc)psThreadTaskFree);
     151
     152    task->type = psStringCopy (type);
     153    task->nArgs = nArgs;
     154    task->function = NULL;
     155    return task;
     156}
     157
     158// add a task to the collection of tasks
     159bool psThreadTaskAdd (psThreadTask *task) {
     160
     161    if (tasks == NULL) {
     162        tasks = psArrayAllocEmpty(8);
     163    }
     164
     165    psArrayAdd (tasks, 1, task);
     166    return true;
     167}
     168
     169// each thread runs this function to choose the task functions
     170void *psThreadLauncher (void *data) {
     171
     172    psThread *self = data;
     173    psThreadJob *job = NULL;
     174
     175    while (1) {
     176
     177        // if we get an error, just wait until we are cleared or killed
     178        while (self->fault) {
     179            usleep (10000);
     180        }
     181
     182        // if no tasks are assigned, just wait until they are
     183        while (tasks == NULL) {
     184            usleep (10000);
     185        }
     186
     187        // request a new job, if there are none available, sleep a bit
     188        // we have to lock here so the job queue cannot be empty yet no threads busy
     189        psThreadLock();
     190        while ((job = psThreadJobGetPending ()) == NULL) {
     191            psThreadUnlock();
     192            usleep (10000);
     193            psThreadLock(); // XXX ???
     194        }
     195        self->busy = true;
     196
     197        for (int i = 0; i < tasks->n; i++) {
     198            psThreadTask *task = tasks->data[i];
     199            if (strcmp (job->type, task->type)) continue;
     200
     201            psThreadUnlock();
     202            psAssert (job->args->n == task->nArgs, "invalid number of arguments to %s", task->type);
     203
     204            bool status = task->function(job);
     205            if (!status) {
     206                self->fault = true;
     207            }
     208            self->busy = false; 
     209            break;
     210        }
     211        psThreadUnlock();
     212        // XXX what do we do if the job is unknown?
     213    }
     214}
     215
     216/***** thread pool functions *****/
     217
    108218// create a pool of Nthreads, each running the user's job-launcher function
    109 bool psThreadPoolInit (int nThreads, psThreadLaunchJobsFunction function) {
     219bool psThreadPoolInit (int nThreads) {
    110220
    111221    if (pool) psAbort ("psThreadsInit already called");
     
    114224    for (int i = 0; i < nThreads; i++) {
    115225        psThread *thread = psThreadAlloc();
    116         pthread_create (&thread->pt, NULL, function, thread);
     226        pthread_create (&thread->pt, NULL, psThreadLauncher, thread);
    117227        pool->data[i] = thread;
    118228    }
     
    126236    // an error is detected on one of the threads or until
    127237    // all threads are idle and no jobs are left on the queue
     238
     239    if (!pool) return true;
     240    if (!pool->n) return true;
    128241
    129242    while (1) {
     
    169282    pool = NULL;
    170283
    171     return true;
    172 }
    173 
     284    psFree (tasks);
     285    tasks = NULL;
     286
     287    return true;
     288}
     289
Note: See TracChangeset for help on using the changeset viewer.