Index: trunk/psLib/src/sys/psThread.c
===================================================================
--- trunk/psLib/src/sys/psThread.c	(revision 18827)
+++ trunk/psLib/src/sys/psThread.c	(revision 18953)
@@ -1,4 +1,4 @@
 #ifdef HAVE_CONFIG_H
-# include "config.h"
+#include "config.h"
 #endif
 
@@ -17,31 +17,37 @@
 #include "psThread.h"
 
-static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
-static psList *pending = NULL;		// queue of pending jobs
-static psList *done = NULL;		// queue of done jobs
-static psArray *pool = NULL;		// array of defined threads
-static psArray *tasks = NULL;		// queue of tasks
+#define THREAD_WAIT 10000               // Microseconds to wait for threads
+
+static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // Mutex for locking threads
+static psList *pending = NULL;          // queue of pending jobs
+static psList *done = NULL;             // queue of done jobs
+static psArray *pool = NULL;            // array of defined threads
+static psArray *tasks = NULL;           // queue of tasks
 
 /***** basic thread functions *****/
 
-void psThreadLock () {
-    pthread_mutex_lock (&mutex);
-    return;
-}
-
-void psThreadUnlock () {
-    pthread_mutex_unlock (&mutex);
-    return;
-}
-
-void psThreadFree (psThread *thread) {
+void psThreadLock(void)
+{
+    pthread_mutex_lock(&mutex);
+    return;
+}
+
+void psThreadUnlock(void)
+{
+    pthread_mutex_unlock(&mutex);
+    return;
+}
+
+static void threadFree(psThread *thread)
+{
+    // Nothing to free; this function is merely provided for identification purposes
     return;
 }
 
 // allocate a psThread
-psThread *psThreadAlloc () {
-    
-    psThread *thread = (psThread *)psAlloc(sizeof(psThread));
-    psMemSetDeallocator(thread, (psFreeFunc)psThreadFree);
+psThread *psThreadAlloc(void)
+{
+    psThread *thread = (psThread*)psAlloc(sizeof(psThread));
+    psMemSetDeallocator(thread, (psFreeFunc)threadFree);
 
     thread->busy  = false;
@@ -52,26 +58,26 @@
 /***** thread job functions *****/
 
-void psThreadJobFree (psThreadJob *job) {
-
-    if (!job) return;
-
-    psFree (job->type);
-    psFree (job->args);
+static void threadJobFree(psThreadJob *job)
+{
+    psFree(job->type);
+    psFree(job->args);
     return;
 }
 
 // allocate a psThreadJob of the given type
-psThreadJob *psThreadJobAlloc (char *type) {
-    
+psThreadJob *psThreadJobAlloc(const char *type)
+{
     psThreadJob *job = (psThreadJob *)psAlloc(sizeof(psThreadJob));
-    psMemSetDeallocator(job, (psFreeFunc)psThreadJobFree);
-
-    job->type = psStringCopy (type);
-    job->args = psArrayAlloc (0);
+    psMemSetDeallocator(job, (psFreeFunc)threadJobFree);
+
+    job->type = psStringCopy(type);
+    job->args = psArrayAlloc(0);
     return job;
 }
 
 // add a job to the queue of pending jobs
-bool psThreadJobAddPending (psThreadJob *job) {
+bool psThreadJobAddPending(psThreadJob *job)
+{
+    PS_ASSERT_THREAD_JOB_NON_NULL(job, false);
 
     // if we failed to call psThreadPoolInit, or we called it with nThreads == 0,
@@ -79,56 +85,54 @@
     if (!pool || !pool->n) {
 
-	// in non-threaded operation, the job is placed on the done list and immediately run
-	if (done == NULL) {
-	    done = psListAlloc(NULL);
-	}
-	psListAdd (done, PS_LIST_TAIL, job);
-
-	// find the corresponding task and run it
-	for (int i = 0; i < tasks->n; i++) {
-	    psThreadTask *task = tasks->data[i];
-	    if (strcmp (job->type, task->type)) continue;
-
-	    psAssert (job->args->n == task->nArgs, "invalid number of arguments to %s", task->type);
-
-	    bool status = task->function(job);
-	    return status;
-	}
-	return false;
-    }
-
-    psThreadLock ();
-    if (pending == NULL) {
-	pending = psListAlloc(NULL);
-    }
-
-    psListAdd (pending, PS_LIST_TAIL, job);
-    psThreadUnlock ();
+        // in non-threaded operation, the job is placed on the done list and immediately run
+        if (!done) {
+            done = psListAlloc(NULL);
+        }
+        psListAdd(done, PS_LIST_TAIL, job);
+
+        // find the corresponding task and run it
+        for (int i = 0; i < tasks->n; i++) {
+            psThreadTask *task = tasks->data[i];
+            if (strcmp (job->type, task->type) != 0) {
+                continue;
+            }
+            psAssert(job->args->n == task->nArgs, "invalid number of arguments to %s", task->type);
+
+            bool status = task->function(job);
+            return status;
+        }
+        // Programming error
+        psAbort("Unable to find job %s", job->type);
+    }
+
+    psThreadLock();
+    if (!pending) {
+        pending = psListAlloc(NULL);
+    }
+    psListAdd(pending, PS_LIST_TAIL, job);
+    psThreadUnlock();
+
     return true;
 }
 
 // this function is not locked -- see thread launder for example
-psThreadJob *psThreadJobGetPending () {
-
-    if (!pending) return NULL;
-
-    psThreadJob *job = psListGetAndRemove (pending, PS_LIST_HEAD);
-
-    // jobs we pull off the pending queue get placed on the done queue
-    if (job) {
-	if (done == NULL) {
-	    done = psListAlloc(NULL);
-	}
-	psListAdd (done, PS_LIST_TAIL, job);
-    }
+psThreadJob *psThreadJobGetPending(void)
+{
+    if (!pending) {
+        return NULL;
+    }
+
+    psThreadJob *job = psListGetAndRemove(pending, PS_LIST_HEAD);
     return job;
 }
 
-// this function is not locked -- see thread launder for example
-psThreadJob *psThreadJobGetDone () {
-
-    if (!done) return NULL;
-
-    psThreadJob *job = psListGetAndRemove (done, PS_LIST_HEAD);
+// this function is not locked -- see thread launcher for example
+psThreadJob *psThreadJobGetDone(void)
+{
+    if (!done) {
+        return NULL;
+    }
+
+    psThreadJob *job = psListGetAndRemove(done, PS_LIST_HEAD);
     return job;
 }
@@ -136,19 +140,17 @@
 /***** thread task functions *****/
 
-void psThreadTaskFree (psThreadTask *task) {
-
-    if (!task) return;
-
-    psFree (task->type);
+static void threadTaskFree(psThreadTask *task)
+{
+    psFree(task->type);
     return;
 }
 
 // allocate a psThreadTask with nArgs arguments
-psThreadTask *psThreadTaskAlloc (char *type, int nArgs) {
-    
+psThreadTask *psThreadTaskAlloc(const char *type, int nArgs)
+{
     psThreadTask *task = (psThreadTask *)psAlloc(sizeof(psThreadTask));
-    psMemSetDeallocator(task, (psFreeFunc)psThreadTaskFree);
-
-    task->type = psStringCopy (type);
+    psMemSetDeallocator(task, (psFreeFunc)threadTaskFree);
+
+    task->type = psStringCopy(type);
     task->nArgs = nArgs;
     task->function = NULL;
@@ -157,58 +159,75 @@
 
 // add a task to the collection of tasks
-bool psThreadTaskAdd (psThreadTask *task) {
-
-    if (tasks == NULL) {
-	tasks = psArrayAllocEmpty(8);
-    }
-
-    psArrayAdd (tasks, 1, task);
+bool psThreadTaskAdd(psThreadTask *task)
+{
+    PS_ASSERT_THREAD_TASK_NON_NULL(task, false);
+
+    if (!tasks) {
+        tasks = psArrayAllocEmpty(8);
+    }
+
+    psArrayAdd(tasks, 1, task);
     return true;
 }
 
 // each thread runs this function to choose the task functions
-void *psThreadLauncher (void *data) {
-
-    psThread *self = data;
-    psThreadJob *job = NULL;
+void *psThreadLauncher(void *thread)
+{
+    psThread *self = thread;            // Thread that's running
 
     while (1) {
-
-	// if we get an error, just wait until we are cleared or killed
-	while (self->fault) {
-	    usleep (10000);
-	}
-
-	// if no tasks are assigned, just wait until they are
-	while (tasks == NULL) {
-	    usleep (10000);
-	}
-
-	// request a new job, if there are none available, sleep a bit
-	// we have to lock here so the job queue cannot be empty yet no threads busy
-	psThreadLock();
-	while ((job = psThreadJobGetPending ()) == NULL) {
-	    psThreadUnlock();
-	    usleep (10000);
-	    psThreadLock(); // XXX ???
-	}
-	self->busy = true;
-
-	for (int i = 0; i < tasks->n; i++) {
-	    psThreadTask *task = tasks->data[i];
-	    if (strcmp (job->type, task->type)) continue;
-
-	    psThreadUnlock();
-	    psAssert (job->args->n == task->nArgs, "invalid number of arguments to %s", task->type);
-
-	    bool status = task->function(job);
-	    if (!status) {
-		self->fault = true;
-	    }
-	    self->busy = false;  
-	    break;
-	}
-	psThreadUnlock();
-	// XXX what do we do if the job is unknown?
+        // if we get an error, just wait until we are cleared or killed
+        while (self->fault) {
+            usleep(THREAD_WAIT);
+        }
+
+        // if no tasks are assigned, just wait until they are
+        while (!tasks) {
+            usleep(THREAD_WAIT);
+        }
+
+        // request a new job, if there are none available, sleep a bit
+        // we have to lock here so the job queue cannot be empty yet no threads busy
+        psThreadLock();
+        psThreadJob *job = NULL;        // Job to process
+        while ((job = psThreadJobGetPending()) == NULL) {
+            psThreadUnlock();
+            usleep(THREAD_WAIT);
+            psThreadLock(); // XXX ???
+        }
+        self->busy = true;
+
+        bool found = false;             // Found the job?
+        for (int i = 0; i < tasks->n; i++) {
+            psThreadTask *task = tasks->data[i]; // Task to do
+            if (strcmp(job->type, task->type) != 0) {
+                continue;
+            }
+            found = true;
+
+            psThreadUnlock();
+            psAssert(job->args->n == task->nArgs, "invalid number of arguments to %s", task->type);
+
+            bool status = task->function(job); // Status of executing task
+
+            // Put the completed job on the 'done' queue
+            psThreadLock();
+            if (!done) {
+                done = psListAlloc(NULL);
+            }
+            psListAdd(done, PS_LIST_TAIL, job);
+            psFree(job);
+
+            if (!status) {
+                self->fault = true;
+            }
+            self->busy = false;
+            break;
+        }
+        psThreadUnlock();
+        if (!found) {
+            // Programming error
+            psAbort("Unable to find job %s", job->type);
+        }
     }
 }
@@ -217,70 +236,93 @@
 
 // create a pool of Nthreads, each running the user's job-launcher function
-bool psThreadPoolInit (int nThreads) {
-
-    if (pool) psAbort ("psThreadsInit already called"); 
-	
-    pool = psArrayAlloc (nThreads);
+bool psThreadPoolInit(int nThreads)
+{
+    if (pool) {
+        psAbort("psThreadsInit already called");
+    }
+
+    PS_ASSERT_INT_NONNEGATIVE(nThreads, false);
+    if (nThreads == 0) {
+        // No threading
+        return true;
+    }
+
+    pool = psArrayAlloc(nThreads);
     for (int i = 0; i < nThreads; i++) {
-	psThread *thread = psThreadAlloc();
-	pthread_create (&thread->pt, NULL, psThreadLauncher, thread);
-	pool->data[i] = thread;
+        psThread *thread = psThreadAlloc(); // Thread for pool
+        pthread_create(&thread->pt, NULL, psThreadLauncher, thread);
+        pool->data[i] = thread;
     }
     return true;
 }
 
-// call this function after you have added jobs to the queue and 
-bool psThreadPoolWait () {
-
-    // this function blocks (waits in usleep) until either
-    // an error is detected on one of the threads or until 
-    // all threads are idle and no jobs are left on the queue
-
-    if (!pool) return true;
-    if (!pool->n) return true;
+int psThreadPoolSize(void)
+{
+    return pool ? pool->n : 0;
+}
+
+// call this function after you have added jobs to the queue and
+bool psThreadPoolWait(bool harvest)
+{
+    if (!pool || pool->n == 0) {
+        // No threads initialised, so everything's done
+        return true;
+    }
 
     while (1) {
-
-	// check for an error
-	for (int i = 0; i < pool->n; i++) {
-	    psThread *thread = pool->data[i];
-	    if (thread->fault) return false;
-	}
-
-	// are all threads idle?
-	psThreadLock();
-	for (int i = 0; i < pool->n; i++) {
-	    psThread *thread = pool->data[i];
-	    if (thread->busy) goto SLEEP;
-	}
-
-	if (!pending) return true;
-
-	// is the queue empty?
-	if (pending->head == NULL) {
-	    psThreadUnlock();
-	    return true;
-	}
+        // check for an error
+        for (int i = 0; i < pool->n; i++) {
+            psThread *thread = pool->data[i];
+            if (thread->fault) {
+                return false;
+            }
+        }
+
+        // Harvest jobs, if requested
+        if (harvest) {
+            psThreadLock();
+            psThreadJob *job;           // Job from done queue
+            while ((job = psThreadJobGetDone())) {
+                psFree(job);
+            }
+            psThreadUnlock();
+        }
+
+        // are all threads idle?
+        psThreadLock();
+        for (int i = 0; i < pool->n; i++) {
+            psThread *thread = pool->data[i];
+            if (thread->busy) {
+                // At least one thread is busy: sleep.
+                goto SLEEP;
+            }
+        }
+
+        if (!pending || !pending->head) {
+            // Nothing in the queue
+            psThreadUnlock();
+            return true;
+        }
 
     SLEEP:
-	psThreadUnlock();
-	usleep (10000);
-    }
+        psThreadUnlock();
+        usleep(THREAD_WAIT);
+    }
+
     return false;
 }
 
-// create a pool of Nthreads, each running the user's job-launcher function
-bool psThreadPoolFinalize () {
-
-    psFree (pending);
+bool psThreadPoolFinalize(void)
+{
+    psFree(pending);
     pending = NULL;
 
-    psFree (done);
+    psFree(done);
     done = NULL;
 
-    psFree (pool);
+    psFree(pool);
     pool = NULL;
 
-    psFree (tasks);
+    psFree(tasks);
     tasks = NULL;
 
