IPP Software Navigation Tools IPP Links Communication Pan-STARRS Links

Changeset 23530


Ignore:
Timestamp:
Mar 25, 2009, 11:56:09 AM (17 years ago)
Author:
eugene
Message:

upgrade threading/locking model for pantasks: changes merged from eam_branch_20090322

Location:
trunk/Ohana/src/opihi
Files:
43 edited
3 copied

Legend:

Unmodified
Added
Removed
  • trunk/Ohana/src/opihi/include/pantasks.h

    r21153 r23530  
    244244int CheckControllerOutput (void);
    245245int PrintControllerOutput (void);
     246void PrintControllerBusyJobs ();
    246247
    247248int AddHost (char *hostname, int max_threads);
     
    260261
    261262// functions related to the server threads
     263void *CheckJobsAndTasksThread (void *data);
     264
    262265void CheckTasksSetState (int state);
    263266int CheckTasksGetState (void);
    264 void *CheckTasksThread (void *data);
    265267
    266268void CheckJobsSetState (int state);
    267269int CheckJobsGetState (void);
    268 void *CheckJobsThread (void *data);
     270
     271// void *CheckTasksThread (void *data);
     272// void *CheckJobsThread (void *data);
    269273
    270274void CheckControllerSetState (int state);
     
    283287void CheckInputs (void);
    284288
    285 void SerialThreadLock (void);
    286 void SerialThreadUnlock (void);
     289void ClientThreadLock (void);
     290void ClientThreadUnlock (void);
     291void CommandThreadLock (void);
     292void CommandThreadUnlock (void);
     293void ControlThreadLock (void);
     294void ControlThreadUnlock (void);
     295void JobTaskThreadLock (void);
     296void JobTaskThreadUnlock (void);
    287297
    288298int InitPassword (void);
  • trunk/Ohana/src/opihi/include/shell.h

    r21153 r23530  
    166166int           gprint                    PROTO((gpDest dest, char *format, ...));
    167167int           gwrite                    PROTO((char *buffer, int size, int N, gpDest dest));
     168int           gprint_syserror           PROTO((gpDest dest, int myError, char *format, ...));
     169int           gprintv                   PROTO((gpDest dest, char *format, va_list argp));
    168170
    169171/* socket functions */
  • trunk/Ohana/src/opihi/lib.shell/SocketOps.c

    r21041 r23530  
    55# define DEBUG 0
    66
     7// these three static variables are only modified in the setup command before
     8// the threads are started.  Thread-safety is not a problem for these.
    79static int NVALID;
    810static int Nvalid;
  • trunk/Ohana/src/opihi/lib.shell/gprint.c

    r22423 r23530  
    287287
    288288  int status;
    289   gpStream *stream;
    290289  va_list argp; 
    291290
     291  va_start (argp, format);
     292  status = gprintv (dest, format, argp);
     293  va_end (argp);
     294  return (status);
     295}
     296
     297int gprintv (gpDest dest, char *format, va_list argp) {
     298
     299  int status;
     300  gpStream *stream;
     301
    292302  // this thread only writes to its own stream
    293303  stream = gprintGetStream (dest);
    294 
    295   va_start (argp, format);
    296304
    297305  if (stream[0].mode == GP_FILE) {
     
    303311    vPrintIOBuffer (stream[0].buffer, format, argp);
    304312  }
    305   va_end (argp);
    306313  return (TRUE);
    307314}
     
    332339}
    333340
    334 /* I'm going to need to have different output targets for different threads
    335    we can do this with these functions:
    336 
    337    pthread_t pthread_self(void);
    338    int pthread_equal(pthread_t thread1, pthread_t thread2);
    339    // returns TRUE if equal, FALSE if not
    340    
    341 */
    342 
     341# define MAX_ERROR_LENGTH 256           // Maximum length string for error messages
     342
     343// print an error (based on errno values) to gprint destination
     344int gprint_syserror (gpDest dest, int myError, char *format, ...) {
     345
     346  char errorBuf[MAX_ERROR_LENGTH];
     347  char *errorMsg;
     348  va_list argp; 
     349
     350  // there are two strerror_r implementations; choose the right one:
     351#if ((_POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600) && ! _GNU_SOURCE)
     352  errorMsg = strerror_r (myError, errorBuf, MAX_ERROR_LENGTH);
     353#else
     354  strerror_r (myError, errorBuf, MAX_ERROR_LENGTH);
     355  errorMsg = errorBuf;
     356#endif
     357
     358  va_start (argp, format);
     359  gprintv (dest, format, argp);
     360  va_end (argp);
     361
     362  gprintv (dest, "%s\n", errorMsg);
     363  return TRUE;
     364}
  • trunk/Ohana/src/opihi/pantasks/CheckController.c

    r14590 r23530  
    11# include "pantasks.h"
    22
    3 static struct timeval start;
    4 void TimerMark ();
    5 float TimerElapsed (int reset);
     3void TimerMark (struct timeval *start);
     4float TimerElapsed (struct timeval *start, int reset);
    65
    76int CheckController () {
     
    1110  Job *job;
    1211  IOBuffer buffer;
     12  struct timeval start;
    1313
    1414  /* get the list of completed jobs (exit / crash), update the job status */
     
    1717  /*** check EXIT jobs ***/
    1818  InitIOBuffer (&buffer, 0x100);
    19   // TimerMark ();
    20   // status = ControllerCommand ("stop", CONTROLLER_PROMPT, &buffer);
    21   // if (VerboseMode()) gprint (GP_ERR, "stop controller %f\n", TimerElapsed(TRUE));
    2219
    23   TimerMark ();
     20  TimerMark (&start);
    2421  FlushIOBuffer (&buffer);
     22
    2523  status = ControllerCommand ("jobstack exit", CONTROLLER_PROMPT, &buffer);
    26   if (VerboseMode()) gprint (GP_ERR, "check exit stack %f\n", TimerElapsed(TRUE));
     24  if (VerboseMode()) gprint (GP_ERR, "check exit stack %f\n", TimerElapsed(&start, TRUE));
    2725  if (!status) goto escape;
    2826
     
    3432  status = sscanf (buffer.buffer, "%*s %d", &Njobs);
    3533  if (status != 1) goto escape;
    36   if (VerboseMode()) gprint (GP_ERR, "parse %d jobs on stack %f\n", Njobs, TimerElapsed(TRUE));
     34  if (VerboseMode()) gprint (GP_ERR, "parse %d jobs on stack %f\n", Njobs, TimerElapsed(&start, TRUE));
    3735
    3836  p = buffer.buffer;
     
    4644    status = sscanf (p, "%d", &JobID);
    4745
     46    // the operations within this locked block only interact with the controller or
     47    // modify the properties of the selected job
     48    JobTaskLock();
    4849    job = FindControllerJob (JobID);
    4950    if (job == NULL) {
    5051      gprint (GP_ERR, "misplaced job? %d not in EXIT job list\n", JobID);
     52      JobTaskUnlock();
    5153      continue;
    5254    }
    5355    /* this checks the individual job status, grabs stdout/stderr */
    5456    CheckControllerJob (job);
     57    JobTaskUnlock();
    5558  }
    56   if (VerboseMode()) gprint (GP_ERR, "clear %d exit jobs %f\n", i, TimerElapsed(TRUE));
     59  if (VerboseMode()) gprint (GP_ERR, "clear %d exit jobs %f\n", i, TimerElapsed(&start, TRUE));
    5760
    5861  /*** check CRASH jobs ***/
     
    6770  status = sscanf (buffer.buffer, "%*s %d", &Njobs);
    6871  if (status != 1) goto escape;
    69   if (VerboseMode()) gprint (GP_ERR, "check crash stack %f\n", TimerElapsed(TRUE));
     72  if (VerboseMode()) gprint (GP_ERR, "check crash stack %f\n", TimerElapsed(&start, TRUE));
    7073
    7174  p = buffer.buffer;
     
    7780    }
    7881    p = q + 1;
    79    
    8082    status = sscanf (p, "%d", &JobID);
     83
     84    // the operations within this locked block only interact with the controller or
     85    // modify the properties of the selected job
     86    JobTaskLock();
    8187    job = FindControllerJob (JobID);
    8288    if (job == NULL) {
    8389      gprint (GP_ERR, "misplaced job? %d not in CRASH job list\n", JobID);
     90      JobTaskUnlock();
    8491      continue;
    8592    }
    8693    /* this checks the individual job status, grabs stdout/stderr */
    8794    CheckControllerJob (job);
     95    JobTaskUnlock();
    8896  }
    89   if (VerboseMode()) gprint (GP_ERR, "clear %d crash jobs %f\n", i, TimerElapsed(TRUE));
     97  if (VerboseMode()) gprint (GP_ERR, "clear %d crash jobs %f\n", i, TimerElapsed(&start, TRUE));
    9098
    9199  FlushIOBuffer (&buffer);
    92   // status = ControllerCommand ("run", CONTROLLER_PROMPT, &buffer);
    93100  FreeIOBuffer (&buffer);
    94101  return (TRUE);
     
    96103 escape:
    97104  FlushIOBuffer (&buffer);
    98   // status = ControllerCommand ("run", CONTROLLER_PROMPT, &buffer);
    99105  FreeIOBuffer (&buffer);
    100106  return (FALSE);
    101107}
    102108
    103 void TimerMark () {
    104     gettimeofday (&start, (void *) NULL);
     109void TimerMark (struct timeval *start) {
     110    gettimeofday (start, (void *) NULL);
    105111}
    106112
    107 float TimerElapsed (int reset) {
     113float TimerElapsed (struct timeval *start, int reset) {
    108114
    109115  float dtime;
     
    111117
    112118  gettimeofday (&stop, (void *) NULL);
    113   dtime = DTIME (stop, start);
    114   if (reset) gettimeofday (&start, (void *) NULL);
     119  dtime = DTIME (stop, start[0]);
     120  if (reset) gettimeofday (start, (void *) NULL);
    115121  return (dtime);
    116122}
  • trunk/Ohana/src/opihi/pantasks/CheckJobs.c

    r15871 r23530  
    1818  next_timeout = 1.0;
    1919
     20  JobTaskLock();
    2021  /** test all jobs: ready to test?  finished? **/
    2122  while ((job = NextJob ()) != NULL) {
     
    7778        /* set taskarg variables */
    7879        for (i = 0; i < job[0].argc; i++) {
    79             sprintf (varname, "taskarg:%d", i);
    80             set_str_variable (varname, job[0].argv[i]);
     80          sprintf (varname, "taskarg:%d", i);
     81          set_str_variable (varname, job[0].argv[i]);
    8182        }
    8283        set_int_variable ("taskarg:n", job[0].argc);
     
    8485        /* set options variables */
    8586        for (i = 0; i < job[0].optc; i++) {
    86             sprintf (varname, "options:%d", i);
    87             set_str_variable (varname, job[0].optv[i]);
     87          sprintf (varname, "options:%d", i);
     88          set_str_variable (varname, job[0].optv[i]);
    8889        }
    8990        set_int_variable ("options:n", job[0].optc);
     
    109110          if (VerboseMode()) gprint (GP_LOG, "job %s (%d) crash\n", task[0].name, job[0].JobID);
    110111          if (task[0].crash != NULL) {
     112            // we need to unlock JobTask since JobTaskLock is called inside CommandLock in the client thread
     113            JobTaskUnlock();
     114            CommandLock();
    111115            exec_loop (task[0].crash);
     116            CommandUnlock();
     117            JobTaskLock();
    112118          }
    113119        }
     
    133139            }
    134140          }
    135           if (macro != NULL) exec_loop (macro);
     141          if (macro != NULL) {
     142            // we need to unlock JobTask since JobTaskLock is called inside CommandLock in the client thread
     143            JobTaskUnlock();
     144            CommandLock();
     145            exec_loop (macro);
     146            CommandUnlock();
     147            JobTaskLock();
     148          }
    136149        }
    137150
     
    146159        DeleteJob (job);
    147160        continue;
    148         break;
    149161
    150162      default:
     
    156168    /* check for timeout - (local jobs only)
    157169       we only check timeout after a poll (forces at least one poll)
    158      */
     170    */
    159171    if (job[0].mode == JOB_LOCAL) {
    160172      if (GetTaskTimer(job[0].start, FALSE) < task[0].timeout_period) {
     
    179191      /* set taskarg variables */
    180192      for (i = 0; i < job[0].argc; i++) {
    181           sprintf (varname, "taskarg:%d", i);
    182           set_str_variable (varname, job[0].argv[i]);
     193        sprintf (varname, "taskarg:%d", i);
     194        set_str_variable (varname, job[0].argv[i]);
    183195      }
    184196      set_int_variable ("taskarg:n", job[0].argc);
     
    193205      /* run task[0].timeout macro, if it exists */
    194206      if (task[0].timeout != NULL) {
     207        // we need to unlock JobTask since JobTaskLock is called inside CommandLock in the client thread
     208        JobTaskUnlock();
     209        CommandLock();
    195210        exec_loop (task[0].timeout);
     211        CommandUnlock();
     212        JobTaskLock();
    196213      }
    197214
     
    205222  }
    206223  // fprintf (stderr, "check %d jobs\n", Ncheck);
     224  JobTaskUnlock();
    207225  return (next_timeout);
    208226}
     
    210228/*
    211229
    212   job / task timeline:
    213 
    214   task:
    215   0           exec     
    216   start       create
    217   task clock  new job
    218 
    219   job:
    220   0           1xpoll     2xpoll     3xpoll
    221   start       check      check      check
    222   job clock   status     status     status
    223 
    224   .           .          .          timeout
    225                                     run
    226                                     timeout
    227 
    228   must be at least one poll before timeout
    229   (timeout >= poll)
     230   job / task timeline:
     231
     232   task:
     233   0           exec     
     234   start       create
     235   task clock  new job
     236
     237   job:
     238   0           1xpoll     2xpoll     3xpoll
     239   start       check      check      check
     240   job clock   status     status     status
     241
     242   .           .          .          timeout
     243   run
     244   timeout
     245
     246   must be at least one poll before timeout
     247   (timeout >= poll)
    230248*/
  • trunk/Ohana/src/opihi/pantasks/CheckPassword.c

    r8548 r23530  
    22# define DEBUG 0
    33
     4// this static var is only used by InitPassword and CheckPassword below.
     5// Both functions are only called by the main thread.
    46static char PASSWORD[256];
    57
  • trunk/Ohana/src/opihi/pantasks/CheckTasks.c

    r23329 r23530  
    77  int status;
    88  float time_running, next_timeout, fuzz;
    9   // struct timeval now;
    109
    1110  // actual maximum delay is controlled in job_threads.c
    1211  next_timeout = 1.0;
    1312
     13  JobTaskLock();
    1414  /** test all tasks: ready to test? ready to run? **/
    1515  while ((task = NextTask ()) != NULL) {
     
    5454    /* ready to run? : run task.exec macro */
    5555    if (task[0].exec != NULL) {
     56      // we need to unlock JobTask since JobTaskLock is called inside CommandLock in the client thread
     57      JobTaskUnlock();
     58      CommandLock();
    5659      status = exec_loop (task[0].exec);
     60      CommandUnlock();
     61      JobTaskLock();
    5762      if (!status) {
    5863        continue;
    5964      }
    6065    }
    61 
    62     // gettimeofday (&now, (void *) NULL);
    63     // fprintf (stderr, "t1: %d %6d  - \n", now.tv_sec, now.tv_usec);
    6466
    6567    /* check if there are errors with this task */
     
    6870    }
    6971   
    70     // gettimeofday (&now, (void *) NULL);
    71     // fprintf (stderr, "t2: %d %6d  - \n", now.tv_sec, now.tv_usec);
    72 
    7372    /* construct job from task */
    7473    job = CreateJob (task);
    7574    if (DEBUG) fprintf (stderr, "create job: (%zx) %d of %d\n", (size_t) job[0].stdout_buff.buffer, job[0].stdout_buff.Nbuffer, job[0].stdout_buff.Nalloc);
    7675
    77     // gettimeofday (&now, (void *) NULL);
    78     // fprintf (stderr, "t3: %d %6d  - \n", now.tv_sec, now.tv_usec);
    79 
    80     /* execute job - XXX add status test */
    81     SubmitJob (job);
    82 
    83     // fprintf (stderr, "nl: %d %6d  - ",
    84     // task[0].last.tv_sec, task[0].last.tv_usec);
    85 
    86     /* increment job counters */
    87     task[0].Njobs ++;
    88     task[0].Npending ++;
    89 
    90     // fprintf (stderr, "%d %6d\n",
    91     // task[0].last.tv_sec, task[0].last.tv_usec);
     76    /* execute job */
     77    if (!SubmitJob (job)) {
     78      DeleteJob (job);
     79      continue;
     80    }
     81    task[0].Njobs ++; // number of jobs successfully submitted
    9282
    9383    /* increment Nrun for inclusive ranges with Nmax */
    9484    BumpTimeRanges (task[0].ranges, task[0].Nranges);
    9585  }
     86  JobTaskUnlock();
    9687  return (next_timeout);
    9788}
  • trunk/Ohana/src/opihi/pantasks/ControllerOps.c

    r20032 r23530  
    1212
    1313/* local static variables to track the controller host properties */
     14/* these are used by AddHost and DeleteHost, and are only called by controller_host.c (clientThread) */
     15/* or by RestartController : lock between these two? */
    1416static Host *hosts = NULL;
    1517static int Nhosts = 0;
     
    3133}
    3234
     35// XXX possible race condition problem: if we delete a host while the controller
     36// is being restarted.  to fix this, we need to keep the deletion in controller_thread,
     37// but perhaps mark it in the client_thread?
    3338int DeleteHost (char *hostname) {
    3439
     
    261266  }
    262267
     268  // This function is called by the JobTaskThread via SubmitJob.  We need to unlock the
     269  // JobTaskLock to avoid a dead lock with the JobTaskLock called in CheckController
     270  JobTaskUnlock();
     271  ControlLock(__func__);
    263272  InitIOBuffer (&buffer, 0x100);
    264273  status = ControllerCommand (cmd, CONTROLLER_PROMPT, &buffer);
    265274  free (cmd);
     275  ControlUnlock(__func__);
     276  JobTaskLock();
     277
    266278
    267279  /* extract the job PID from the controller response */
     
    411423  if ((status == -1) && (errno == EPIPE)) {
    412424    StopController ();
    413     gprint (GP_ERR, "controller is down (pipe closed), restarting\n");
     425    fprintf (stderr, "controller is down (pipe closed), restarting\n");
    414426    if (!RestartController ()) {
    415427      return (FALSE);
     
    434446    if (status ==  0) {
    435447      StopController ();
    436       gprint (GP_ERR, "controller is down (EOF), restarting\n");
     448      fprintf (stderr, "controller is down (EOF), restarting\n");
    437449      if (!RestartController ()) {
    438450        return (FALSE);
     
    441453    }
    442454    if (status == -1) {
    443       gprint (GP_ERR, "controller is not responding (%d tries)\n", j);
     455      fprintf (stderr, "controller is not responding (%d tries)\n", j);
    444456      gwrite (buffer[0].buffer, 1, buffer[0].Nbuffer, GP_ERR);
    445457    }
    446458  }
    447459  if (status == -1) {
    448     gprint (GP_ERR, "controller still not responding, giving up\n");
     460    fprintf (stderr, "controller still not responding, giving up\n");
    449461    return (FALSE);
    450462  }
     
    456468    bzero (buffer[0].buffer + buffer[0].Nbuffer, buffer[0].Nalloc - buffer[0].Nbuffer);
    457469  }
    458   /* if (VerboseMode()) gprint (GP_ERR, "message received, %d cycles\n", i); */
     470  if (VerboseMode()) fprintf (stderr, "message received, %d cycles\n", i);
    459471  return (TRUE);
    460472}
     
    492504}
    493505
     506void PrintControllerBusyJobs () {
     507
     508  int status;
     509  char command[1024];
     510  IOBuffer buffer;
     511
     512  /* check if controller is running */
     513  status = CheckControllerStatus ();
     514  if (!status) {
     515    return;
     516  }
     517
     518  sprintf (command, "jobstack busy");
     519  InitIOBuffer (&buffer, 0x100);
     520
     521  status = ControllerCommand (command, CONTROLLER_PROMPT, &buffer);
     522
     523  if (status) {
     524    gprint (GP_LOG, " jobs currently running remotely:\n");
     525    gwrite (buffer.buffer, 1, buffer.Nbuffer, GP_LOG);
     526  } else {
     527    gprint (GP_LOG, "controller is not responding\n");
     528  }
     529  FreeIOBuffer (&buffer);
     530  return;
     531}
     532
    494533int PrintControllerOutput () {
    495534
     
    581620  InitIOBuffer (&buffer, 0x100);
    582621
    583   // XXX lock the host table? SerialThreadLock ();
     622  // XXX lock the host table? no: that would risk a dead lock between client and controller threads:
    584623  gprint (GP_ERR, "pcontrol restarted, reloading hosts\n");
    585624  for (i = 0; i < Nhosts; i++) {
     
    588627    status = ControllerCommand (command, CONTROLLER_PROMPT, &buffer);
    589628  }
    590   // SerialThreadUnlock ();
    591629
    592630  if (status) gwrite (buffer.buffer, 1, buffer.Nbuffer, GP_LOG);
  • trunk/Ohana/src/opihi/pantasks/InputQueue.c

    r16449 r23530  
    11# include "pantasks.h"
     2// we use fprintf for DEBUG statements to avoid deadlocking issues
    23
    34static int Ninputs = 0;
     
    2324void AddNewInput (char *input) {
    2425
     26  // XXX define the InputMutex
    2527  SerialThreadLock ();
    26   if (DEBUG) gprint (GP_LOG, "adding a new input (%s)\n", input);
     28
     29  if (DEBUG) fprintf (stderr, "adding a new input (%s)\n", input);
    2730  inputs[Ninputs] = input;
    2831  Ninputs ++;
     
    3134    REALLOCATE (inputs, char *, NINPUTS);
    3235  }
    33   if (DEBUG) gprint (GP_LOG, "done new input (%s)\n", input);
     36  if (DEBUG) fprintf (stderr, "done new input (%s)\n", input);
    3437  SerialThreadUnlock ();
    3538}
     
    4043  int i, j;
    4144
    42   if (DEBUG) gprint (GP_LOG, "deleting an input (%s)\n", input);
     45  if (DEBUG) fprintf (stderr, "deleting an input (%s)\n", input);
     46
     47  // XXX lock here
    4348  for (i = 0; i < Ninputs; i++) {
    4449    if (inputs[i] == input) {
     
    5257        REALLOCATE (inputs, char *, NINPUTS);
    5358      }
    54       if (DEBUG) gprint (GP_LOG, "deleted an input\n");
     59      // XXX unlock here
     60      if (DEBUG) fprintf (stderr, "deleted an input\n");
    5561      return TRUE;
    5662    }
    5763  }
    5864  // did not find the input
     65  // XXX unlock here
    5966  return FALSE;
    6067}
     
    6673  char *input, *line, *outline, tmp;
    6774
    68   if (Ninputs < 1) return;
     75  // XXX lock here
     76  if (Ninputs < 1) {
     77    // XXX unlock here
     78    return;
     79  }
    6980
    7081  input = inputs[0];
    71   if (DEBUG) gprint (GP_LOG, "got an input (%s)\n", input);
     82  if (DEBUG) fprintf (stderr, "got an input (%s)\n", input);
    7283 
    7384  Nbytes = snprintf (&tmp, 0, "input %s", input);
    7485  ALLOCATE (line, char, Nbytes + 1);
    7586  snprintf (line, Nbytes + 1, "input %s", input);
     87  // XXX unlock here
    7688
    7789  status = command (line, &outline, TRUE);
  • trunk/Ohana/src/opihi/pantasks/JobOps.c

    r18160 r23530  
    8888  }
    8989
    90   /* check if controller is running */
    91   status = CheckControllerStatus ();
    92   if (!status) {
    93     return;
    94   }
    95 
    96   sprintf (command, "jobstack busy");
    97   InitIOBuffer (&buffer, 0x100);
    98 
    99   SerialThreadLock ();
    100   status = ControllerCommand (command, CONTROLLER_PROMPT, &buffer);
    101   SerialThreadUnlock ();
    102 
    103   if (status) {
    104     gprint (GP_LOG, " jobs currently running remotely:\n");
    105     gwrite (buffer.buffer, 1, buffer.Nbuffer, GP_LOG);
    106   } else {
    107     gprint (GP_LOG, "controller is not responding\n");
    108   }
    109   FreeIOBuffer (&buffer);
    11090  return;
    11191}
     
    169149    REALLOCATE (jobs, Job *, NJOBS);
    170150  }
     151
     152  /* increment job counters */
     153  task[0].Npending ++;
     154
    171155  return (jobs[Njobs-1]);
    172156}
     
    265249int SubmitJob (Job *job) {
    266250
     251  int status;
     252
    267253  if (job[0].mode == JOB_LOCAL) {
    268254    if (DEBUG) fprintf (stderr, "submit job: (%zx) %d of %d\n", (size_t) job[0].stdout_buff.buffer, job[0].stdout_buff.Nbuffer, job[0].stdout_buff.Nalloc);
    269     SubmitLocalJob (job);
     255    status = SubmitLocalJob (job);
    270256  } else {
    271     SubmitControllerJob (job);
     257    status = SubmitControllerJob (job);
     258  }
     259  if (!status) {
     260    return FALSE;
    272261  }
    273262
     
    285274  if (job[0].mode == JOB_LOCAL) {
    286275    CheckLocalJob (job);
    287   } else {
    288     /* controller jobs are now checked en masse by CheckController */
    289     /* CheckControllerJob (job); */
     276    /* controller jobs are checked en masse by CheckController */
    290277  }
    291278  return (job[0].state);
  • trunk/Ohana/src/opihi/pantasks/ListenClients.c

    r16905 r23530  
    22# define DEBUG 0
    33
     4// XXX make the calling functions thread-safe
    45static int NCLIENTS;
    56static int Nclients;
     
    910void InitClients () {
    1011
     12  ClientLock();
    1113  Nclients = 0;
    1214  NCLIENTS = 10;
    1315  ALLOCATE (clients, int, NCLIENTS);
    1416  ALLOCATE (buffers, IOBuffer *, NCLIENTS);
     17  ClientUnlock();
    1518}
    1619
     
    1821void AddNewClient (int client) {
    1922
    20   if (DEBUG) gprint (GP_LOG, "adding a new client (%d)\n", client);
     23  ClientLock();
     24  if (DEBUG) fprintf (stderr, "adding a new client (%d)\n", client);
    2125  clients[Nclients] = client;
    2226  ALLOCATE (buffers[Nclients], IOBuffer, 1);
     
    2832    REALLOCATE (buffers, IOBuffer *, NCLIENTS);
    2933  }
     34  ClientUnlock();
    3035}
    3136
     
    3540  int i, j;
    3641
    37   if (DEBUG) gprint (GP_LOG, "deleting a client (%d)\n", client);
     42  ClientLock();
     43  if (DEBUG) fprintf (stderr, "deleting a client (%d)\n", client);
    3844  for (i = 0; i < Nclients; i++) {
    3945    if (clients[i] == client) {
     
    5157        REALLOCATE (buffers, IOBuffer *, NCLIENTS);
    5258      }
     59      ClientUnlock();
    5360      return TRUE;
    5461    }
    5562  }
    5663  // did not find the client
     64  ClientUnlock();
    5765  return FALSE;
    5866}
     
    7078  gprintInit ();  // each thread needs to init the printing system
    7179
    72   // define server output log files
    73   if (VarConfig ("PANTASKS_SERVER_STDOUT", "%s", log_stdout) != NULL) {
    74     gprintSetFileThisThread (GP_LOG, log_stdout);
    75   } else {
    76     strcpy (log_stdout, "stdout");
    77   }
    78   if (VarConfig ("PANTASKS_SERVER_STDERR", "%s", log_stderr) != NULL) {
    79     gprintSetFileThisThread (GP_ERR, log_stderr);
    80   } else {
    81     strcpy (log_stderr, "stderr");
    82   }
     80  /* set buffers for the output for this client */
     81  gprintSetBuffer (GP_LOG);
     82  gprintSetBuffer (GP_ERR);
    8383
    8484  while (1) {
     
    9090
    9191    /* place all of the clients in the fdSet */
    92     Ncurrent = Nclients;
    9392    Nmax = 0;
    9493    FD_ZERO (&fdSet);
     94    ClientLock();
     95    Ncurrent = Nclients;
     96    ClientUnlock();
    9597    for (i = 0; i < Ncurrent; i++) {
    9698      Nmax = MAX (Nmax, clients[i]);
     
    100102
    101103    /* block until we have some data on the pipes (or timeout) */
    102     if (DEBUG) gprint (GP_ERR, "listening to %d clients\n", Ncurrent);
     104    if (DEBUG) fprintf (stderr, "listening to %d clients\n", Ncurrent);
    103105    status = select (Nmax, &fdSet, NULL, NULL, &timeout);
    104106    if (status == -1) {
     
    122124      if ((Nread == 0) || (Nread == -2)) {
    123125        /* error: do something */
    124         if (DEBUG && (Nread == 0)) gprint (GP_ERR, "socket is closed\n");
    125         if (DEBUG && (Nread == -2)) gprint (GP_ERR, "error reading from socket\n");
     126        if (DEBUG && (Nread == 0)) fprintf (stderr, "socket is closed\n");
     127        if (DEBUG && (Nread == -2)) fprintf (stderr, "error reading from socket\n");
    126128        DeleteClient (clients[i]);
    127         continue;
     129        break;  // the other thread could also have modified the list; restart with new Ncurrent
    128130      }
    129131
    130       if (DEBUG) gprint (GP_ERR, "read %d total bytes\n", buffers[i][0].Nbuffer);
     132      if (DEBUG) fprintf (stderr, "read %d total bytes\n", buffers[i][0].Nbuffer);
    131133
    132134      /* see if we have a complete message waiting; if not, keep waiting for messages */
     
    141143      if (*line) {
    142144
    143         /* set buffers for the output for this client */
    144         gprintSetBuffer (GP_LOG);
    145         gprintSetBuffer (GP_ERR);
    146 
    147145        /* run the command, return the exit status */
     146        CommandLock();
    148147        status = multicommand (line);
     148        CommandUnlock();
    149149        SendMessage (clients[i], "STATUS %d", status);
    150150
     
    156156          SendMessageFixed (clients[i], 0, "");
    157157        }         
     158        FlushIOBuffer (outbuffer);
    158159
    159160        // return the stdout messages first
     
    164165          SendMessageFixed (clients[i], 0, "");
    165166        }         
    166        
    167         /* clear and reset the output buffers to their last output file names */
    168         gprintSetFileAllThreads (GP_LOG, NULL);
    169         gprintSetFileAllThreads (GP_ERR, NULL);
     167        FlushIOBuffer (outbuffer);
    170168      }
    171169      free (line);
  • trunk/Ohana/src/opihi/pantasks/LocalJob.c

    r18161 r23530  
    127127
    128128  pid = fork ();
     129  if (pid == -1) {
     130    gprint_syserror (GP_ERR, errno, "error starting local job: ");
     131    goto pipe_error;
     132  }
     133
    129134  if (!pid) { /* must be child process */
    130135    if (VerboseMode()) gprint (GP_ERR, "starting local job\n");
  • trunk/Ohana/src/opihi/pantasks/Makefile

    r18085 r23530  
    3434$(SRC)/pantasks.$(ARCH).o \
    3535$(SRC)/thread_locks.$(ARCH).o \
    36 $(SRC)/job_threads.$(ARCH).o \
    37 $(SRC)/task_threads.$(ARCH).o \
     36$(SRC)/jobs_and_tasks_thread.$(ARCH).o \
    3837$(SRC)/controller_threads.$(ARCH).o
    3938
     
    4140$(SRC)/pantasks_server.$(ARCH).o \
    4241$(SRC)/server_run.$(ARCH).o \
    43 $(SRC)/server_load.$(ARCH).o \
    44 $(SRC)/InputQueue.$(ARCH).o \
    4542$(SRC)/ListenClients.$(ARCH).o \
    4643$(SRC)/server.$(ARCH).o \
     
    4946$(SRC)/CheckPassword.$(ARCH).o \
    5047$(SRC)/thread_locks.$(ARCH).o \
    51 $(SRC)/job_threads.$(ARCH).o \
    52 $(SRC)/task_threads.$(ARCH).o \
    53 $(SRC)/controller_threads.$(ARCH).o \
    54 $(SRC)/input_threads.$(ARCH).o
     48$(SRC)/jobs_and_tasks_thread.$(ARCH).o \
     49$(SRC)/controller_threads.$(ARCH).o
    5550
    5651funcs = \
     
    6661
    6762cmds = \
    68 $(SRC)/pulse.$(ARCH).o \
    6963$(SRC)/status.$(ARCH).o \
    7064$(SRC)/flush.$(ARCH).o \
  • trunk/Ohana/src/opihi/pantasks/controller.c

    r18085 r23530  
    5050  }
    5151
     52  ControlLock(__func__);
    5253  status = (*func)(argc - 1, argv + 1);
     54  ControlUnlock(__func__);
    5355  return (status);
    5456}
  • trunk/Ohana/src/opihi/pantasks/controller_host.c

    r19125 r23530  
    1515
    1616  if (argc != 3) goto usage;
     17  if (max_threads && strcasecmp (argv[1], "ADD")) goto usage;
    1718
    1819  /* start controller connection (if needed) */
     
    2728  if (!strcasecmp (argv[1], "ADD")) {
    2829    AddHost (argv[2], max_threads);
    29   } else {
    30     if (max_threads) goto usage;
    31   }
     30  }
    3231
    3332  if (!strcasecmp (argv[1], "DELETE")) {
     
    4241  InitIOBuffer (&buffer, 0x100);
    4342
    44   SerialThreadLock ();
    4543  status = ControllerCommand (command, CONTROLLER_PROMPT, &buffer);
    46   SerialThreadUnlock ();
    4744
    4845  if (status) gwrite (buffer.buffer, 1, buffer.Nbuffer, GP_LOG);
     
    5754  return (FALSE);
    5855}
    59 
    60 /* should I keep an internal host table so I can reload the
    61    hosts if the controller exits?
    62 
    63    alternatively, that could be a user-level choice
    64 */
  • trunk/Ohana/src/opihi/pantasks/controller_jobstack.c

    r18085 r23530  
    2424  InitIOBuffer (&buffer, 0x100);
    2525
    26   SerialThreadLock ();
    2726  status = ControllerCommand (command, CONTROLLER_PROMPT, &buffer);
    28   SerialThreadUnlock ();
    2927
    3028  if (status) {
  • trunk/Ohana/src/opihi/pantasks/controller_status.c

    r8548 r23530  
    2323  InitIOBuffer (&buffer, 0x100);
    2424
    25   SerialThreadLock ();
    2625  status = ControllerCommand (command, CONTROLLER_PROMPT, &buffer);
    27   SerialThreadUnlock ();
    2826
    2927  if (status) {
  • trunk/Ohana/src/opihi/pantasks/controller_threads.c

    r15791 r23530  
    3434
    3535    // one run of the task checker
    36     SerialThreadLock ();
     36    ControlLock(__func__);
    3737    CheckController ();
     38    ControlUnlock(__func__);
     39
     40    ControlLock(__func__);
    3841    CheckControllerOutput ();
    39     SerialThreadUnlock ();
     42    ControlUnlock(__func__);
     43
    4044    if (VerboseMode() == 2) fprintf (stderr, "C");
    4145    // fprintf (stderr, "**** C ****");
  • trunk/Ohana/src/opihi/pantasks/controller_verbose.c

    r18085 r23530  
    2424  InitIOBuffer (&buffer, 0x100);
    2525
    26   SerialThreadLock ();
    2726  status = ControllerCommand (command, CONTROLLER_PROMPT, &buffer);
    28   SerialThreadUnlock ();
    2927
    3028  if (status) {
  • trunk/Ohana/src/opihi/pantasks/delete.c

    r13542 r23530  
    1111    JobID = atoi (argv[2]);
    1212
     13    JobTaskLock();
    1314    job = FindJob (JobID);
    1415    if (job == NULL) {
    1516      gprint (GP_LOG, "job not found\n");
     17      JobTaskUnlock();
    1618      return (TRUE);
    1719    }
     
    2123        job[0].state = JOB_HUNG;
    2224        if (VerboseMode()) gprint (GP_LOG, "child process %d is hung, cannot kill\n", job[0].pid);
     25        JobTaskUnlock();
    2326        return (FALSE);
    2427      }
     
    2730        job[0].state = JOB_HUNG;
    2831        if (VerboseMode()) gprint (GP_LOG, "child process %d is hung, cannot kill\n", job[0].pid);
     32        JobTaskUnlock();
    2933        return (FALSE);
    3034      }
     
    3236    DeleteJob (job);
    3337    gprint (GP_LOG, "job removed\n");
     38    JobTaskUnlock();
    3439    return (TRUE);
    3540  }
     
    4146
    4247usage:
    43     gprint (GP_ERR, "USAGE: delete job (JobID)\n");
    44     gprint (GP_ERR, "USAGE: delete task (TaskName)\n");
    45     return (FALSE);
     48  gprint (GP_ERR, "USAGE: delete job (JobID)\n");
     49  gprint (GP_ERR, "USAGE: delete task (TaskName)\n");
     50  return (FALSE);
    4651}
    4752
  • trunk/Ohana/src/opihi/pantasks/flush.c

    r14590 r23530  
    66
    77  if (!strcasecmp (argv[1], "jobs")) {
     8    JobTaskLock();
    89    FlushJobs ();
     10    JobTaskUnlock();
    911    return (TRUE);
    1012  }
  • trunk/Ohana/src/opihi/pantasks/init.c

    r16453 r23530  
    1818int halt            PROTO((int, char **));
    1919int flush_jobs      PROTO((int, char **));
    20 int pulse           PROTO((int, char **));
    2120int showtask        PROTO((int, char **));
    2221int status_sys      PROTO((int, char **));
     
    4039  {1, "options",    task_options,  "define optional variables associated with the job task"},
    4140  {1, "periods",    task_periods,  "define time scales for a task"},
    42   {1, "pulse",      pulse,         "set the scheduler update period"},
    4341  {1, "run",        run,           "run the scheduler"},
    4442  {1, "flush",      flush_jobs,    "flush all jobs from the queue"},
  • trunk/Ohana/src/opihi/pantasks/init_server.c

    r16903 r23530  
    1414int task_stdout     PROTO((int, char **));
    1515int task_stderr     PROTO((int, char **));
    16 int pulse           PROTO((int, char **));
    1716int flush_jobs      PROTO((int, char **));
    1817int status_server   PROTO((int, char **));
     
    4140  {1, "npending",   task_npending, "define maximum number of outstanding jobs for a task"},
    4241  {1, "periods",    task_periods,  "define time scales for a task"},
    43   {1, "pulse",      pulse,         "set the scheduler update period"},
    4442  {1, "flush",      flush_jobs,    "flush all jobs from the queue"},
    4543  {1, "server",     server,        "server-specific commands"},
     
    6765  InitJobs ();
    6866  InitJobIDs ();
    69   InitInputs ();
    7067
    7168  for (i = 0; i < sizeof (cmds) / sizeof (Command); i++) {
     
    7875  FreeJobs ();
    7976  FreeJobIDs ();
    80   FreeInputs ();
    8177}
  • trunk/Ohana/src/opihi/pantasks/kill.c

    r13542 r23530  
    1212  JobID = atoi (argv[1]);
    1313
     14  JobTaskLock();
    1415  job = FindJob (JobID);
    1516  if (job == NULL) {
    1617    gprint (GP_LOG, "job not found\n");
     18    JobTaskUnlock();
    1719    return (TRUE);
    1820  }
     
    2224      job[0].state = JOB_HUNG;
    2325      if (VerboseMode()) gprint (GP_LOG, "child process %d is hung, cannot kill\n", job[0].pid);
     26      JobTaskUnlock();
    2427      return (FALSE);
    2528    }
     
    2831      job[0].state = JOB_HUNG;
    2932      if (VerboseMode()) gprint (GP_LOG, "child process %d is hung, cannot kill\n", job[0].pid);
     33      JobTaskUnlock();
    3034      return (FALSE);
    3135    }
     
    3337  DeleteJob (job);
    3438  gprint (GP_LOG, "job removed\n");
     39  JobTaskUnlock();
    3540  return (TRUE);
    3641}
  • trunk/Ohana/src/opihi/pantasks/notes.txt

    r7892 r23530  
     1
     2stdout_cntl:
     3  GetJobOutout
     4    CheckControllerJob
     5      CheckController
     6        controller_threads (controlThread)
     7  StartController
     8  ControllerCommand
     9    CheckController
     10      controller_threads (controlThread)
     11    controller_check (clientThread)
     12    controller_host (clientThread)
     13    controller_jobstack (clientThread)
     14    controller_run (clientThread)
     15    controller_status (clientThread)
     16    controller_verbose (clientThread)
     17    DeleteControllerJob
     18      CheckControllerJob
     19        CheckController
     20          controller_threads (controlThread)
     21    CheckControllerJobStatus
     22      CheckControllerJob
     23        CheckController
     24          controller_threads (controlThread)
     25    SubmitControllerJob
     26      SubmitJob
     27        CheckTasks (JobTaskThread)
     28    PrintControllerBusyJobs
     29    KillControllerJob
     30    QuitController
     31    RestartController   
     32  CheckControllerOutput
     33    controller_output (clientThread)
     34    controller_threads (controlThread)
     35
     36
     37
     38
     39
     40
     41
     4220090322
     43
     44  We've been surviving with a slightly broken threading / locking
     45  model.  I would like to clean it up.  Previously, the threads where
     46  blocking at a very coarse level, and there were certain interactions
     47  that were not thread-safe.  Here are my notes on re-working this:
     48
     49  * we have 6 threads:
     50
     51    * main (top-level parent) : after it spawns the 5 child threads,
     52      this thread waits for new clients and adds them to the list of
     53      active clients with 'AddNewClient'
     54
     55    * clientsThread (ListenClients): this thread is listening for
     56      commands from the clients; when it receives a complete command,
     57      it executes the command using 'multicommand'.
     58
     59    * tasksThread
     60      CheckTasks
     61        NextTask
     62
     63
     64
     65    * jobsThread
     66
     67    * controllerThread
     68      CheckControllerStatus : race condition is irrelevant
     69
     70    * inputsThread
     71      AddNewInput called by clientThread
     72      (not sure this is really being used:
     73           server input (filename) calls 'input'
     74           server module (filename) calls 'module'
     75
     76  * functions which need to be thread-safe:
     77    * AddNewClient
     78    * gprint and supporting functions must be thread safe (extensively
     79      used...)
     80
     81
     82* race condition is irrelevant for this variable
     83static int ControllerStatus = FALSE;
     84
     85static int stdin_cntl, stdout_cntl, stderr_cntl;
     86GetJobOutput -> CheckControllerJob -> CheckController -> controllerThread
     87ControllerCommand -> controllerThread / clientThread
     88CheckControllerOutput -> controllerThread / clientThread
     89StartController -> controllerThread / clientThread
     90StopController
     91
     92
     93
     94static IOBuffer stdout_buffer;
     95static IOBuffer stderr_buffer;
     96static int ControllerPID = 0;
     97
     98/* local static variables to track the controller host properties */
     99static Host *hosts = NULL;
     100static int Nhosts = 0;
     101static int NHOSTS = 0;
     102
     103
     104
     105
     106
     107
     108
     109
     110
     111
     112
     113
     114
     115
     116
     117
     118
     119
     120
     121
     122
     123
     124
    1125
    2126PanTasks Client / Server design
  • trunk/Ohana/src/opihi/pantasks/pantasks.c.in

    r16453 r23530  
    1010void program_init (int *argc, char **argv) {
    1111 
    12   pthread_t jobsThread;
    13   pthread_t tasksThread;
     12  pthread_t JobsAndTasksThread;
    1413  pthread_t controllerThread;
    1514
     
    5049
    5150  /* start up the background threads here */
    52   pthread_create (&tasksThread,      NULL, &CheckTasksThread,      NULL);
    53   pthread_create (&jobsThread,       NULL, &CheckJobsThread,       NULL);
    54   pthread_create (&controllerThread, NULL, &CheckControllerThread, NULL);
     51  // pthread_create (&tasksThread,      NULL, &CheckTasksThread,           NULL);
     52  // pthread_create (&jobsThread,       NULL, &CheckJobsThread,            NULL);
     53  pthread_create (&JobsAndTasksThread, NULL, &CheckJobsAndTasksThread, NULL);
     54  pthread_create (&controllerThread,   NULL, &CheckControllerThread,   NULL);
    5555  return;
    5656}
  • trunk/Ohana/src/opihi/pantasks/pantasks_server.c.in

    r16453 r23530  
    1717 
    1818  char log_stdout[128], log_stderr[128];
    19   pthread_t jobsThread;
    20   pthread_t tasksThread;
    21   pthread_t inputsThread;
     19  pthread_t JobsAndTasksThread;
    2220  pthread_t clientsThread;
    2321  pthread_t controllerThread;
     
    6361
    6462  /* start up the background threads here */
    65   pthread_create (&clientsThread,    NULL, &ListenClients,         NULL);
    66   pthread_create (&tasksThread,      NULL, &CheckTasksThread,      NULL);
    67   pthread_create (&jobsThread,       NULL, &CheckJobsThread,       NULL);
    68   pthread_create (&controllerThread, NULL, &CheckControllerThread, NULL);
    69   pthread_create (&inputsThread,     NULL, &CheckInputsThread,     NULL);
     63  pthread_create (&clientsThread,       NULL, &ListenClients,           NULL);
     64  pthread_create (&JobsAndTasksThread,  NULL, &CheckJobsAndTasksThread, NULL);
     65  pthread_create (&controllerThread,    NULL, &CheckControllerThread,   NULL);
    7066
    7167  /* in this loop, we listen for incoming connections, validate, and
  • trunk/Ohana/src/opihi/pantasks/server.c

    r16463 r23530  
    55int input           PROTO((int, char **));
    66int module          PROTO((int, char **));
    7 int server_load     PROTO((int, char **));
    87int server_run      PROTO((int, char **));
    98int server_stop     PROTO((int, char **));
     
    2019  {1, "input",  input,  "load input file on server"},
    2120  {1, "module", module, "load module file on server"},
    22   {1, "load",   server_load, "load input file on server"},
    2321  {1, "run",    server_run,  "run scheduler"},
    2422  {1, "stop",   server_stop, "stop scheduler"},
  • trunk/Ohana/src/opihi/pantasks/server_run.c

    r11084 r23530  
    1111  CheckJobsSetState (TRUE);
    1212  CheckControllerSetState (TRUE);
    13   CheckInputsSetState (TRUE);
    1413  return (TRUE);
    1514}
     
    2322
    2423  CheckTasksSetState (FALSE);
    25   // CheckJobsSetState (FALSE);
    26   // CheckControllerSetState (FALSE);
    27   CheckInputsSetState (FALSE);
    2824  return (TRUE);
    2925}
     
    3935  CheckJobsSetState (FALSE);
    4036  CheckControllerSetState (FALSE);
    41   CheckInputsSetState (FALSE);
    4237  return (TRUE);
    4338}
  • trunk/Ohana/src/opihi/pantasks/showtask.c

    r12468 r23530  
    88  }
    99
     10  JobTaskLock();
    1011  ShowTask (argv[1]);
     12  JobTaskUnlock();
     13
    1114  return (TRUE);
    1215}
  • trunk/Ohana/src/opihi/pantasks/status_server.c

    r16012 r23530  
    1111  if ((N = get_argument (argc, argv, "-tasks"))) {
    1212    remove_argument (N, &argc, argv);
     13    JobTaskLock();
    1314    ListTasks (FALSE);
     15    JobTaskUnlock();
    1416    return (TRUE);
    1517  }
     
    1719  if ((N = get_argument (argc, argv, "-taskinfo"))) {
    1820    remove_argument (N, &argc, argv);
     21    JobTaskLock();
    1922    ListTasks (TRUE);
     23    JobTaskUnlock();
    2024    return (TRUE);
    2125  }
     
    2327  if ((N = get_argument (argc, argv, "-taskstats"))) {
    2428    remove_argument (N, &argc, argv);
     29    JobTaskLock();
    2530    if (argc == 2) {
    2631      ListTaskStats (argv[N]);
     
    2833      ListTaskStats (NULL);
    2934    }     
     35    JobTaskUnlock();
    3036    return (TRUE);
    3137  }
     
    3339  if ((N = get_argument (argc, argv, "-taskstatsreset"))) {
    3440    remove_argument (N, &argc, argv);
     41    JobTaskLock();
    3542    if (argc == 2) {
    3643      ResetTaskStats (argv[N]);
     
    3845      ResetTaskStats (NULL);
    3946    }     
     47    JobTaskUnlock();
    4048    return (TRUE);
    4149  }
     
    5664    gprint (GP_LOG, " Controller is stopped\n");
    5765  }
     66
     67  JobTaskLock();
    5868  ListTasks (FALSE);
    5969  ListJobs ();
     70  JobTaskUnlock();
     71
     72  ControlLock(__func__);
     73  PrintControllerBusyJobs();
     74  ControlUnlock(__func__);
     75
    6076  return (TRUE);
    6177
  • trunk/Ohana/src/opihi/pantasks/task.c

    r14590 r23530  
    1111  if (argc != 2) goto usage;
    1212
     13  JobTaskLock();
    1314  task = FindTask (argv[1]);
    1415  if (task == NULL) { /**** new task ****/
     
    1819    SetNewTask (task);
    1920  }
     21  JobTaskUnlock();
     22
    2023  /* While a task is being defined, it is removed from the task list.  The new task is added to the task list
    2124     when the definition process is complete. 
     
    5558
    5659      case TASK_END:
    57         /* I need to add in a test here to see if all task elements
    58            have been defined.  delete the task if not */
    5960        free (input);
    6061        /* validate the new task: all mandatory elements defined? */
     
    6364          return (FALSE);
    6465        }
     66        JobTaskLock();
    6567        RegisterNewTask ();
     68        JobTaskUnlock();
    6669        return (TRUE);
    6770        break;
  • trunk/Ohana/src/opihi/pantasks/task_active.c

    r10734 r23530  
    77  if (argc != 2) goto usage;
    88
     9  JobTaskLock();
    910  task = GetNewTask ();
    1011  if (task == NULL) {
    1112    gprint (GP_ERR, "ERROR: not defining or running a task\n");
     13    JobTaskUnlock();
    1214    return (FALSE);
    1315  }
     
    1517  if (!strcasecmp (argv[1], "true")) {
    1618    task[0].active = TRUE;
     19    JobTaskUnlock();
    1720    return (TRUE);
    1821  }
    1922  if (!strcasecmp (argv[1], "false")) {
    2023    task[0].active = FALSE;
     24    JobTaskUnlock();
    2125    return (TRUE);
    2226  }
    2327
    2428  gprint (GP_ERR, "ERROR: invalid option: %s\n", argv[1]);
     29  JobTaskUnlock();
    2530  return (FALSE);
    2631
  • trunk/Ohana/src/opihi/pantasks/task_command.c

    r7917 r23530  
    1212  }
    1313
     14  JobTaskLock();
    1415  task = GetNewTask ();
    1516  if (task == NULL) {
     
    1718    if (task == NULL) {
    1819      gprint (GP_ERR, "ERROR: not defining or running a task\n");
     20      JobTaskUnlock();
    1921      return (FALSE);
    2022    }
     
    3537    task[0].argv[i] = strcreate (argv[i+1]);
    3638  }
     39  JobTaskUnlock();
    3740  return (TRUE);
    3841}
  • trunk/Ohana/src/opihi/pantasks/task_host.c

    r7917 r23530  
    2222
    2323  task = GetNewTask ();
     24  JobTaskLock();
    2425  if (task == NULL) {
    2526    task = GetActiveTask ();
    2627    if (task == NULL) {
    2728      gprint (GP_ERR, "ERROR: not defining or running a task\n");
     29      JobTaskUnlock();
    2830      return (FALSE);
    2931    }
     
    3436  task[0].host = NULL;
    3537
    36   if (!strcasecmp (argv[1], "LOCAL")) return (TRUE);
     38  if (!strcasecmp (argv[1], "LOCAL")) {
     39    JobTaskUnlock();
     40    return (TRUE);
     41  }
    3742
    3843  task[0].host = strcreate (argv[1]);
     44  JobTaskUnlock();
    3945  return (TRUE);
    4046}
  • trunk/Ohana/src/opihi/pantasks/task_macros.c

    r10647 r23530  
    2222  }
    2323
     24  JobTaskLock();
    2425  task = GetNewTask ();
    2526  if (task == NULL) {
    2627    gprint (GP_ERR, "ERROR: not defining or running a task\n");
     28    JobTaskUnlock();
    2729    return (FALSE);
    2830  }
     
    135137        free (input);
    136138        REALLOCATE (macro[0].line, char *, MAX (1, macro[0].Nlines));
     139        JobTaskUnlock();
    137140        return (TRUE);
    138141      }
     
    148151    }
    149152  }
     153  JobTaskUnlock();
    150154  return (TRUE);
    151155}
  • trunk/Ohana/src/opihi/pantasks/task_nmax.c

    r11055 r23530  
    77  if (argc != 2) goto usage;
    88
     9  JobTaskLock();
    910  task = GetNewTask ();
    1011  if (task == NULL) {
    1112    gprint (GP_ERR, "ERROR: not defining or running a task\n");
     13    JobTaskUnlock();
    1214    return (FALSE);
    1315  }
    1416
    1517  task[0].Nmax = atoi (argv[1]);
     18  JobTaskUnlock();
    1619  return (TRUE);
    1720
     
    2730  if (argc != 2) goto usage;
    2831
     32  JobTaskLock();
    2933  task = GetNewTask ();
    3034  if (task == NULL) {
    3135    gprint (GP_ERR, "ERROR: not defining or running a task\n");
     36    JobTaskUnlock();
    3237    return (FALSE);
    3338  }
    3439
    3540  task[0].NpendingMax = atoi (argv[1]);
     41  JobTaskUnlock();
    3642  return (TRUE);
    3743
  • trunk/Ohana/src/opihi/pantasks/task_options.c

    r8129 r23530  
    1212  }
    1313
     14  JobTaskLock();
    1415  task = GetNewTask ();
    1516  if (task == NULL) {
     
    1718    if (task == NULL) {
    1819      gprint (GP_ERR, "ERROR: not defining or running a task\n");
     20      JobTaskUnlock();
    1921      return (FALSE);
    2022    }
     
    3537    task[0].optv[i] = strcreate (argv[i+1]);
    3638  }
     39  JobTaskUnlock();
    3740  return (TRUE);
    3841}
  • trunk/Ohana/src/opihi/pantasks/task_periods.c

    r7917 r23530  
    4040  }
    4141
     42  JobTaskLock();
    4243  task = GetNewTask ();
    4344  if (task == NULL) {
     
    4546    if (task == NULL) {
    4647      gprint (GP_ERR, "ERROR: not defining or running a task\n");
     48      JobTaskUnlock();
    4749      return (FALSE);
    4850    }
     
    5355  if (Timeout) task[0].timeout_period = TimeoutValue;
    5456
     57  JobTaskUnlock();
    5558  return (TRUE);
    5659}
  • trunk/Ohana/src/opihi/pantasks/task_stdout.c

    r12332 r23530  
    1111  }
    1212
     13  JobTaskLock();
    1314  task = GetNewTask ();
    1415  if (task == NULL) {
     
    1617    if (task == NULL) {
    1718      gprint (GP_ERR, "ERROR: not defining or running a task\n");
     19      JobTaskUnlock();
    1820      return (FALSE);
    1921    }
     
    2123  if (task[0].stdout_dump != NULL) free (task[0].stdout_dump);
    2224  task[0].stdout_dump = strcreate (argv[1]);
     25  JobTaskUnlock();
    2326  return (TRUE);
    2427}
     
    3437  }
    3538
     39  JobTaskLock();
    3640  task = GetNewTask ();
    3741  if (task == NULL) {
     
    3943    if (task == NULL) {
    4044      gprint (GP_ERR, "ERROR: not defining or running a task\n");
     45      JobTaskUnlock();
    4146      return (FALSE);
    4247    }
     
    4449  if (task[0].stderr_dump != NULL) free (task[0].stderr_dump);
    4550  task[0].stderr_dump = strcreate (argv[1]);
     51  JobTaskUnlock();
    4652  return (TRUE);
    4753}
  • trunk/Ohana/src/opihi/pantasks/task_trange.c

    r15487 r23530  
    1212    if (argc != 1) goto usage;
    1313
     14    JobTaskLock();
    1415    task = GetNewTask ();
    1516    if (task == NULL) {
    1617      gprint (GP_ERR, "ERROR: not defining or running a task\n");
     18      JobTaskUnlock();
    1719      return (FALSE);
    1820    }
     
    2022    task[0].Nranges = 0;
    2123    REALLOCATE (task[0].ranges, TimeRange, 1);
     24    JobTaskUnlock();
    2225    return (TRUE);
    2326  }
     
    4043
    4144  if (argc != 3) goto usage;
    42 
    43   task = GetNewTask ();
    44   if (task == NULL) {
    45     gprint (GP_ERR, "ERROR: not defining or running a task\n");
    46     return (FALSE);
    47   }
    4845
    4946  /* test for Mon[@HH:MM:SS] - both must match */
     
    8380
    8481valid:
     82  JobTaskLock();
     83  task = GetNewTask ();
     84  if (task == NULL) {
     85    gprint (GP_ERR, "ERROR: not defining or running a task\n");
     86    JobTaskUnlock();
     87    return (FALSE);
     88  }
     89
    8590  N = task[0].Nranges;
    8691  task[0].Nranges ++;
     
    8893 
    8994  task[0].ranges[N] = range;
     95  JobTaskUnlock();
    9096  return (TRUE);
    9197
  • trunk/Ohana/src/opihi/pantasks/thread_locks.c

    r11084 r23530  
    11# include "pantasks.h"
    22
    3 /* this mutex is used by the serialized threads */
     3/* mutex to lock Client table operations */
     4static pthread_mutex_t ClientMutex = PTHREAD_MUTEX_INITIALIZER;
    45
    5 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    6 
    7 void SerialThreadLock () {
    8     pthread_mutex_lock (&mutex);
     6void ClientLock () {
     7  pthread_mutex_lock (&ClientMutex);
    98}
    109
    11 void SerialThreadUnlock () {
    12     pthread_mutex_unlock (&mutex);
     10void ClientUnlock () {
     11  pthread_mutex_unlock (&ClientMutex);
    1312}
    1413
     14/* mutex to lock Command / Multicommand operations */
     15static pthread_mutex_t CommandMutex = PTHREAD_MUTEX_INITIALIZER;
     16
     17void CommandLock () {
     18  //  fprintf (stderr, "command lock\n");
     19  pthread_mutex_lock (&CommandMutex);
     20}
     21
     22void CommandUnlock () {
     23  //  fprintf (stderr, "command unlock\n");
     24  pthread_mutex_unlock (&CommandMutex);
     25}
     26
     27/* mutex to lock Control operations */
     28static pthread_mutex_t ControlMutex = PTHREAD_MUTEX_INITIALIZER;
     29
     30void ControlLock (char *func) {
     31  // fprintf (stderr, "control lock %s\n", func);
     32  pthread_mutex_lock (&ControlMutex);
     33}
     34
     35void ControlUnlock (char *func) {
     36  // fprintf (stderr, "control unlock %s\n", func);
     37  pthread_mutex_unlock (&ControlMutex);
     38}
     39
     40/* mutex to lock Job / Task operations */
     41static pthread_mutex_t JobTaskMutex = PTHREAD_MUTEX_INITIALIZER;
     42
     43void JobTaskLock () {
     44  //  fprintf (stderr, "jobtask lock\n");
     45  pthread_mutex_lock (&JobTaskMutex);
     46}
     47
     48void JobTaskUnlock () {
     49  //  fprintf (stderr, "jobtask unlock\n");
     50  pthread_mutex_unlock (&JobTaskMutex);
     51}
     52
Note: See TracChangeset for help on using the changeset viewer.