IPP Software Navigation Tools IPP Links Communication Pan-STARRS Links

Changeset 23484


Ignore:
Timestamp:
Mar 22, 2009, 9:39:06 PM (17 years ago)
Author:
eugene
Message:

major re-work of the threading / thread-locking design:

1) drop the input thread
2) merge job and task threads
3) define locks for:

  • Command : ops that use the opihi command parser and related
  • Client : ops to manage client connections
  • Control : ops that manipulate the controller elements
  • JobTask : ops that manipulate the Job and/or Task lists

The old threading / locking effectively serialized all operations,
except for client command; however the client and other ops that used
the opihi parse could collide. the new model allows for more parallel
operations, and prevents these thread conflicts.

Location:
branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks
Files:
1 added
38 edited

Legend:

Unmodified
Added
Removed
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/CheckController.c

    r14590 r23484  
    11# include "pantasks.h"
    22
    3 static struct timeval start;
    4 void TimerMark ();
    5 float TimerElapsed (int reset);
     3void TimerMark (struct timeval *start);
     4float TimerElapsed (struct timeval *start, int reset);
    65
    76int CheckController () {
     
    1110  Job *job;
    1211  IOBuffer buffer;
     12  struct timeval start;
    1313
    1414  /* get the list of completed jobs (exit / crash), update the job status */
     
    1717  /*** check EXIT jobs ***/
    1818  InitIOBuffer (&buffer, 0x100);
    19   // TimerMark ();
    20   // status = ControllerCommand ("stop", CONTROLLER_PROMPT, &buffer);
    21   // if (VerboseMode()) gprint (GP_ERR, "stop controller %f\n", TimerElapsed(TRUE));
    2219
    23   TimerMark ();
     20  TimerMark (&start);
    2421  FlushIOBuffer (&buffer);
     22
    2523  status = ControllerCommand ("jobstack exit", CONTROLLER_PROMPT, &buffer);
    26   if (VerboseMode()) gprint (GP_ERR, "check exit stack %f\n", TimerElapsed(TRUE));
     24  if (VerboseMode()) gprint (GP_ERR, "check exit stack %f\n", TimerElapsed(&start, TRUE));
    2725  if (!status) goto escape;
    2826
     
    3432  status = sscanf (buffer.buffer, "%*s %d", &Njobs);
    3533  if (status != 1) goto escape;
    36   if (VerboseMode()) gprint (GP_ERR, "parse %d jobs on stack %f\n", Njobs, TimerElapsed(TRUE));
     34  if (VerboseMode()) gprint (GP_ERR, "parse %d jobs on stack %f\n", Njobs, TimerElapsed(&start, TRUE));
    3735
    3836  p = buffer.buffer;
     
    4644    status = sscanf (p, "%d", &JobID);
    4745
     46    // the operations within this locked block only interact with the controller or
     47    // modify the properties of the selected job
     48    JobTaskLock();
    4849    job = FindControllerJob (JobID);
    4950    if (job == NULL) {
    5051      gprint (GP_ERR, "misplaced job? %d not in EXIT job list\n", JobID);
     52      JobTaskUnlock();
    5153      continue;
    5254    }
    5355    /* this checks the individual job status, grabs stdout/stderr */
    5456    CheckControllerJob (job);
     57    JobTaskUnlock();
    5558  }
    56   if (VerboseMode()) gprint (GP_ERR, "clear %d exit jobs %f\n", i, TimerElapsed(TRUE));
     59  if (VerboseMode()) gprint (GP_ERR, "clear %d exit jobs %f\n", i, TimerElapsed(&start, TRUE));
    5760
    5861  /*** check CRASH jobs ***/
     
    6770  status = sscanf (buffer.buffer, "%*s %d", &Njobs);
    6871  if (status != 1) goto escape;
    69   if (VerboseMode()) gprint (GP_ERR, "check crash stack %f\n", TimerElapsed(TRUE));
     72  if (VerboseMode()) gprint (GP_ERR, "check crash stack %f\n", TimerElapsed(&start, TRUE));
    7073
    7174  p = buffer.buffer;
     
    7780    }
    7881    p = q + 1;
    79    
    8082    status = sscanf (p, "%d", &JobID);
     83
     84    // the operations within this locked block only interact with the controller or
     85    // modify the properties of the selected job
     86    JobTaskLock();
    8187    job = FindControllerJob (JobID);
    8288    if (job == NULL) {
    8389      gprint (GP_ERR, "misplaced job? %d not in CRASH job list\n", JobID);
     90      JobTaskUnlock();
    8491      continue;
    8592    }
    8693    /* this checks the individual job status, grabs stdout/stderr */
    8794    CheckControllerJob (job);
     95    JobTaskUnlock();
    8896  }
    89   if (VerboseMode()) gprint (GP_ERR, "clear %d crash jobs %f\n", i, TimerElapsed(TRUE));
     97  if (VerboseMode()) gprint (GP_ERR, "clear %d crash jobs %f\n", i, TimerElapsed(&start, TRUE));
    9098
    9199  FlushIOBuffer (&buffer);
    92   // status = ControllerCommand ("run", CONTROLLER_PROMPT, &buffer);
    93100  FreeIOBuffer (&buffer);
    94101  return (TRUE);
     
    96103 escape:
    97104  FlushIOBuffer (&buffer);
    98   // status = ControllerCommand ("run", CONTROLLER_PROMPT, &buffer);
    99105  FreeIOBuffer (&buffer);
    100106  return (FALSE);
    101107}
    102108
    103 void TimerMark () {
    104     gettimeofday (&start, (void *) NULL);
     109void TimerMark (struct timeval *start) {
     110    gettimeofday (start, (void *) NULL);
    105111}
    106112
    107 float TimerElapsed (int reset) {
     113float TimerElapsed (struct timeval *start, int reset) {
    108114
    109115  float dtime;
     
    111117
    112118  gettimeofday (&stop, (void *) NULL);
    113   dtime = DTIME (stop, start);
    114   if (reset) gettimeofday (&start, (void *) NULL);
     119  dtime = DTIME (stop, start[0]);
     120  if (reset) gettimeofday (start, (void *) NULL);
    115121  return (dtime);
    116122}
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/CheckJobs.c

    r15871 r23484  
    1818  next_timeout = 1.0;
    1919
     20  JobTaskLock();
    2021  /** test all jobs: ready to test?  finished? **/
    2122  while ((job = NextJob ()) != NULL) {
     
    7778        /* set taskarg variables */
    7879        for (i = 0; i < job[0].argc; i++) {
    79             sprintf (varname, "taskarg:%d", i);
    80             set_str_variable (varname, job[0].argv[i]);
     80          sprintf (varname, "taskarg:%d", i);
     81          set_str_variable (varname, job[0].argv[i]);
    8182        }
    8283        set_int_variable ("taskarg:n", job[0].argc);
     
    8485        /* set options variables */
    8586        for (i = 0; i < job[0].optc; i++) {
    86             sprintf (varname, "options:%d", i);
    87             set_str_variable (varname, job[0].optv[i]);
     87          sprintf (varname, "options:%d", i);
     88          set_str_variable (varname, job[0].optv[i]);
    8889        }
    8990        set_int_variable ("options:n", job[0].optc);
     
    109110          if (VerboseMode()) gprint (GP_LOG, "job %s (%d) crash\n", task[0].name, job[0].JobID);
    110111          if (task[0].crash != NULL) {
     112            // we need to unlock JobTask since JobTaskLock is called inside CommandLock in the client thread
     113            JobTaskUnlock();
     114            CommandLock();
    111115            exec_loop (task[0].crash);
     116            CommandUnlock();
     117            JobTaskLock();
    112118          }
    113119        }
     
    133139            }
    134140          }
    135           if (macro != NULL) exec_loop (macro);
     141          if (macro != NULL) {
     142            // we need to unlock JobTask since JobTaskLock is called inside CommandLock in the client thread
     143            JobTaskUnlock();
     144            CommandLock();
     145            exec_loop (macro);
     146            CommandUnlock();
     147            JobTaskLock();
     148          }
    136149        }
    137150
     
    156169    /* check for timeout - (local jobs only)
    157170       we only check timeout after a poll (forces at least one poll)
    158      */
     171    */
    159172    if (job[0].mode == JOB_LOCAL) {
    160173      if (GetTaskTimer(job[0].start, FALSE) < task[0].timeout_period) {
     
    179192      /* set taskarg variables */
    180193      for (i = 0; i < job[0].argc; i++) {
    181           sprintf (varname, "taskarg:%d", i);
    182           set_str_variable (varname, job[0].argv[i]);
     194        sprintf (varname, "taskarg:%d", i);
     195        set_str_variable (varname, job[0].argv[i]);
    183196      }
    184197      set_int_variable ("taskarg:n", job[0].argc);
     
    193206      /* run task[0].timeout macro, if it exists */
    194207      if (task[0].timeout != NULL) {
     208        // we need to unlock JobTask since JobTaskLock is called inside CommandLock in the client thread
     209        JobTaskUnlock();
     210        CommandLock();
    195211        exec_loop (task[0].timeout);
     212        CommandUnlock();
     213        JobTaskLock();
    196214      }
    197215
     
    205223  }
    206224  // fprintf (stderr, "check %d jobs\n", Ncheck);
     225  JobTaskUnlock();
    207226  return (next_timeout);
    208227}
     
    210229/*
    211230
    212   job / task timeline:
    213 
    214   task:
    215   0           exec     
    216   start       create
    217   task clock  new job
    218 
    219   job:
    220   0           1xpoll     2xpoll     3xpoll
    221   start       check      check      check
    222   job clock   status     status     status
    223 
    224   .           .          .          timeout
    225                                     run
    226                                     timeout
    227 
    228   must be at least one poll before timeout
    229   (timeout >= poll)
     231   job / task timeline:
     232
     233   task:
     234   0           exec     
     235   start       create
     236   task clock  new job
     237
     238   job:
     239   0           1xpoll     2xpoll     3xpoll
     240   start       check      check      check
     241   job clock   status     status     status
     242
     243   .           .          .          timeout
     244   run
     245   timeout
     246
     247   must be at least one poll before timeout
     248   (timeout >= poll)
    230249*/
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/CheckPassword.c

    r8548 r23484  
    22# define DEBUG 0
    33
     4// this static var is only used by InitPassword and CheckPassword below.
     5// Both functions are only called by the main thread.
    46static char PASSWORD[256];
    57
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/CheckTasks.c

    r23329 r23484  
    1212  next_timeout = 1.0;
    1313
     14  JobTaskLock();
    1415  /** test all tasks: ready to test? ready to run? **/
    1516  while ((task = NextTask ()) != NULL) {
     
    5455    /* ready to run? : run task.exec macro */
    5556    if (task[0].exec != NULL) {
     57      // we need to unlock JobTask since JobTaskLock is called inside CommandLock in the client thread
     58      JobTaskUnlock();
     59      CommandLock();
    5660      status = exec_loop (task[0].exec);
     61      CommandUnlock();
     62      JobTaskLock();
    5763      if (!status) {
    5864        continue;
     
    94100    BumpTimeRanges (task[0].ranges, task[0].Nranges);
    95101  }
     102  JobTaskUnlock();
    96103  return (next_timeout);
    97104}
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/ControllerOps.c

    r20032 r23484  
    1212
    1313/* local static variables to track the controller host properties */
     14/* these are used by AddHost and DeleteHost, and are only called by controller_host.c (clientThread) */
     15/* or by RestartController : lock between these two? */
    1416static Host *hosts = NULL;
    1517static int Nhosts = 0;
     
    3133}
    3234
     35// XXX possible race condition problem: if we delete a host while the controller
     36// is being restarted.  to fix this, we need to keep the deletion in controller_thread,
     37// but perhaps mark it in the client_thread?
    3338int DeleteHost (char *hostname) {
    3439
     
    261266  }
    262267
     268  // This function is called by the JobTaskThread via SubmitJob.  We need to unlock the
     269  // JobTaskLock to avoid a dead lock with the JobTaskLock called in CheckController
     270  JobTaskUnlock();
     271  ControlLock(__func__);
    263272  InitIOBuffer (&buffer, 0x100);
    264273  status = ControllerCommand (cmd, CONTROLLER_PROMPT, &buffer);
    265274  free (cmd);
     275  ControlUnlock(__func__);
     276  JobTaskLock();
     277
    266278
    267279  /* extract the job PID from the controller response */
     
    411423  if ((status == -1) && (errno == EPIPE)) {
    412424    StopController ();
    413     gprint (GP_ERR, "controller is down (pipe closed), restarting\n");
     425    fprintf (stderr, "controller is down (pipe closed), restarting\n");
    414426    if (!RestartController ()) {
    415427      return (FALSE);
     
    434446    if (status ==  0) {
    435447      StopController ();
    436       gprint (GP_ERR, "controller is down (EOF), restarting\n");
     448      fprintf (stderr, "controller is down (EOF), restarting\n");
    437449      if (!RestartController ()) {
    438450        return (FALSE);
     
    441453    }
    442454    if (status == -1) {
    443       gprint (GP_ERR, "controller is not responding (%d tries)\n", j);
     455      fprintf (stderr, "controller is not responding (%d tries)\n", j);
    444456      gwrite (buffer[0].buffer, 1, buffer[0].Nbuffer, GP_ERR);
    445457    }
    446458  }
    447459  if (status == -1) {
    448     gprint (GP_ERR, "controller still not responding, giving up\n");
     460    fprintf (stderr, "controller still not responding, giving up\n");
    449461    return (FALSE);
    450462  }
     
    456468    bzero (buffer[0].buffer + buffer[0].Nbuffer, buffer[0].Nalloc - buffer[0].Nbuffer);
    457469  }
    458   /* if (VerboseMode()) gprint (GP_ERR, "message received, %d cycles\n", i); */
     470  if (VerboseMode()) fprintf (stderr, "message received, %d cycles\n", i);
    459471  return (TRUE);
    460472}
     
    492504}
    493505
     506void PrintControllerBusyJobs () {
     507
     508  int status;
     509  char command[1024];
     510  IOBuffer buffer;
     511
     512  /* check if controller is running */
     513  status = CheckControllerStatus ();
     514  if (!status) {
     515    return;
     516  }
     517
     518  sprintf (command, "jobstack busy");
     519  InitIOBuffer (&buffer, 0x100);
     520
     521  status = ControllerCommand (command, CONTROLLER_PROMPT, &buffer);
     522
     523  if (status) {
     524    gprint (GP_LOG, " jobs currently running remotely:\n");
     525    gwrite (buffer.buffer, 1, buffer.Nbuffer, GP_LOG);
     526  } else {
     527    gprint (GP_LOG, "controller is not responding\n");
     528  }
     529  FreeIOBuffer (&buffer);
     530  return;
     531}
     532
    494533int PrintControllerOutput () {
    495534
     
    581620  InitIOBuffer (&buffer, 0x100);
    582621
    583   // XXX lock the host table? SerialThreadLock ();
     622  // XXX lock the host table? no: that would risk a dead lock between client and controller threads:
    584623  gprint (GP_ERR, "pcontrol restarted, reloading hosts\n");
    585624  for (i = 0; i < Nhosts; i++) {
     
    588627    status = ControllerCommand (command, CONTROLLER_PROMPT, &buffer);
    589628  }
    590   // SerialThreadUnlock ();
    591629
    592630  if (status) gwrite (buffer.buffer, 1, buffer.Nbuffer, GP_LOG);
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/InputQueue.c

    r16449 r23484  
    11# include "pantasks.h"
     2// we use fprintf for DEBUG statements to avoid deadlocking issues
    23
    34static int Ninputs = 0;
     
    2324void AddNewInput (char *input) {
    2425
     26  // XXX define the InputMutex
    2527  SerialThreadLock ();
    26   if (DEBUG) gprint (GP_LOG, "adding a new input (%s)\n", input);
     28
     29  if (DEBUG) fprintf (stderr, "adding a new input (%s)\n", input);
    2730  inputs[Ninputs] = input;
    2831  Ninputs ++;
     
    3134    REALLOCATE (inputs, char *, NINPUTS);
    3235  }
    33   if (DEBUG) gprint (GP_LOG, "done new input (%s)\n", input);
     36  if (DEBUG) fprintf (stderr, "done new input (%s)\n", input);
    3437  SerialThreadUnlock ();
    3538}
     
    4043  int i, j;
    4144
    42   if (DEBUG) gprint (GP_LOG, "deleting an input (%s)\n", input);
     45  if (DEBUG) fprintf (stderr, "deleting an input (%s)\n", input);
     46
     47  // XXX lock here
    4348  for (i = 0; i < Ninputs; i++) {
    4449    if (inputs[i] == input) {
     
    5257        REALLOCATE (inputs, char *, NINPUTS);
    5358      }
    54       if (DEBUG) gprint (GP_LOG, "deleted an input\n");
     59      // XXX unlock here
     60      if (DEBUG) fprintf (stderr, "deleted an input\n");
    5561      return TRUE;
    5662    }
    5763  }
    5864  // did not find the input
     65  // XXX unlock here
    5966  return FALSE;
    6067}
     
    6673  char *input, *line, *outline, tmp;
    6774
    68   if (Ninputs < 1) return;
     75  // XXX lock here
     76  if (Ninputs < 1) {
     77    // XXX unlock here
     78    return;
     79  }
    6980
    7081  input = inputs[0];
    71   if (DEBUG) gprint (GP_LOG, "got an input (%s)\n", input);
     82  if (DEBUG) fprintf (stderr, "got an input (%s)\n", input);
    7283 
    7384  Nbytes = snprintf (&tmp, 0, "input %s", input);
    7485  ALLOCATE (line, char, Nbytes + 1);
    7586  snprintf (line, Nbytes + 1, "input %s", input);
     87  // XXX unlock here
    7688
    7789  status = command (line, &outline, TRUE);
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/JobOps.c

    r18160 r23484  
    8888  }
    8989
    90   /* check if controller is running */
    91   status = CheckControllerStatus ();
    92   if (!status) {
    93     return;
    94   }
    95 
    96   sprintf (command, "jobstack busy");
    97   InitIOBuffer (&buffer, 0x100);
    98 
    99   SerialThreadLock ();
    100   status = ControllerCommand (command, CONTROLLER_PROMPT, &buffer);
    101   SerialThreadUnlock ();
    102 
    103   if (status) {
    104     gprint (GP_LOG, " jobs currently running remotely:\n");
    105     gwrite (buffer.buffer, 1, buffer.Nbuffer, GP_LOG);
    106   } else {
    107     gprint (GP_LOG, "controller is not responding\n");
    108   }
    109   FreeIOBuffer (&buffer);
    11090  return;
    11191}
     
    285265  if (job[0].mode == JOB_LOCAL) {
    286266    CheckLocalJob (job);
    287   } else {
    288     /* controller jobs are now checked en masse by CheckController */
    289     /* CheckControllerJob (job); */
     267    /* controller jobs are checked en masse by CheckController */
    290268  }
    291269  return (job[0].state);
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/ListenClients.c

    r16905 r23484  
    22# define DEBUG 0
    33
     4// XXX make the calling functions thread-safe
    45static int NCLIENTS;
    56static int Nclients;
     
    910void InitClients () {
    1011
     12  ClientLock();
    1113  Nclients = 0;
    1214  NCLIENTS = 10;
    1315  ALLOCATE (clients, int, NCLIENTS);
    1416  ALLOCATE (buffers, IOBuffer *, NCLIENTS);
     17  ClientUnlock();
    1518}
    1619
     
    1821void AddNewClient (int client) {
    1922
    20   if (DEBUG) gprint (GP_LOG, "adding a new client (%d)\n", client);
     23  ClientLock();
     24  if (DEBUG) fprintf (stderr, "adding a new client (%d)\n", client);
    2125  clients[Nclients] = client;
    2226  ALLOCATE (buffers[Nclients], IOBuffer, 1);
     
    2832    REALLOCATE (buffers, IOBuffer *, NCLIENTS);
    2933  }
     34  ClientUnlock();
    3035}
    3136
     
    3540  int i, j;
    3641
    37   if (DEBUG) gprint (GP_LOG, "deleting a client (%d)\n", client);
     42  ClientLock();
     43  if (DEBUG) fprintf (stderr, "deleting a client (%d)\n", client);
    3844  for (i = 0; i < Nclients; i++) {
    3945    if (clients[i] == client) {
     
    5157        REALLOCATE (buffers, IOBuffer *, NCLIENTS);
    5258      }
     59      ClientUnlock();
    5360      return TRUE;
    5461    }
    5562  }
    5663  // did not find the client
     64  ClientUnlock();
    5765  return FALSE;
    5866}
     
    7078  gprintInit ();  // each thread needs to init the printing system
    7179
    72   // define server output log files
    73   if (VarConfig ("PANTASKS_SERVER_STDOUT", "%s", log_stdout) != NULL) {
    74     gprintSetFileThisThread (GP_LOG, log_stdout);
    75   } else {
    76     strcpy (log_stdout, "stdout");
    77   }
    78   if (VarConfig ("PANTASKS_SERVER_STDERR", "%s", log_stderr) != NULL) {
    79     gprintSetFileThisThread (GP_ERR, log_stderr);
    80   } else {
    81     strcpy (log_stderr, "stderr");
    82   }
     80  /* set buffers for the output for this client */
     81  gprintSetBuffer (GP_LOG);
     82  gprintSetBuffer (GP_ERR);
    8383
    8484  while (1) {
     
    9090
    9191    /* place all of the clients in the fdSet */
    92     Ncurrent = Nclients;
    9392    Nmax = 0;
    9493    FD_ZERO (&fdSet);
     94    ClientLock();
     95    Ncurrent = Nclients;
     96    ClientUnlock();
    9597    for (i = 0; i < Ncurrent; i++) {
    9698      Nmax = MAX (Nmax, clients[i]);
     
    100102
    101103    /* block until we have some data on the pipes (or timeout) */
    102     if (DEBUG) gprint (GP_ERR, "listening to %d clients\n", Ncurrent);
     104    if (DEBUG) fprintf (stderr, "listening to %d clients\n", Ncurrent);
    103105    status = select (Nmax, &fdSet, NULL, NULL, &timeout);
    104106    if (status == -1) {
     
    122124      if ((Nread == 0) || (Nread == -2)) {
    123125        /* error: do something */
    124         if (DEBUG && (Nread == 0)) gprint (GP_ERR, "socket is closed\n");
    125         if (DEBUG && (Nread == -2)) gprint (GP_ERR, "error reading from socket\n");
     126        if (DEBUG && (Nread == 0)) fprintf (stderr, "socket is closed\n");
     127        if (DEBUG && (Nread == -2)) fprintf (stderr, "error reading from socket\n");
    126128        DeleteClient (clients[i]);
    127         continue;
     129        break;  // the other thread could also have modified the list; restart with new Ncurrent
    128130      }
    129131
    130       if (DEBUG) gprint (GP_ERR, "read %d total bytes\n", buffers[i][0].Nbuffer);
     132      if (DEBUG) fprintf (stderr, "read %d total bytes\n", buffers[i][0].Nbuffer);
    131133
    132134      /* see if we have a complete message waiting; if not, keep waiting for messages */
     
    141143      if (*line) {
    142144
    143         /* set buffers for the output for this client */
    144         gprintSetBuffer (GP_LOG);
    145         gprintSetBuffer (GP_ERR);
    146 
    147145        /* run the command, return the exit status */
     146        CommandLock();
    148147        status = multicommand (line);
     148        CommandUnlock();
    149149        SendMessage (clients[i], "STATUS %d", status);
    150150
     
    156156          SendMessageFixed (clients[i], 0, "");
    157157        }         
     158        FlushIOBuffer (outbuffer);
    158159
    159160        // return the stdout messages first
     
    164165          SendMessageFixed (clients[i], 0, "");
    165166        }         
    166        
    167         /* clear and reset the output buffers to their last output file names */
    168         gprintSetFileAllThreads (GP_LOG, NULL);
    169         gprintSetFileAllThreads (GP_ERR, NULL);
     167        FlushIOBuffer (outbuffer);
    170168      }
    171169      free (line);
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/Makefile

    r18085 r23484  
    3434$(SRC)/pantasks.$(ARCH).o \
    3535$(SRC)/thread_locks.$(ARCH).o \
    36 $(SRC)/job_threads.$(ARCH).o \
    37 $(SRC)/task_threads.$(ARCH).o \
     36$(SRC)/jobs_and_tasks_thread.$(ARCH).o \
    3837$(SRC)/controller_threads.$(ARCH).o
    3938
     
    4140$(SRC)/pantasks_server.$(ARCH).o \
    4241$(SRC)/server_run.$(ARCH).o \
    43 $(SRC)/server_load.$(ARCH).o \
    44 $(SRC)/InputQueue.$(ARCH).o \
    4542$(SRC)/ListenClients.$(ARCH).o \
    4643$(SRC)/server.$(ARCH).o \
     
    4946$(SRC)/CheckPassword.$(ARCH).o \
    5047$(SRC)/thread_locks.$(ARCH).o \
    51 $(SRC)/job_threads.$(ARCH).o \
    52 $(SRC)/task_threads.$(ARCH).o \
    53 $(SRC)/controller_threads.$(ARCH).o \
    54 $(SRC)/input_threads.$(ARCH).o
     48$(SRC)/jobs_and_tasks_thread.$(ARCH).o \
     49$(SRC)/controller_threads.$(ARCH).o
    5550
    5651funcs = \
     
    6661
    6762cmds = \
    68 $(SRC)/pulse.$(ARCH).o \
    6963$(SRC)/status.$(ARCH).o \
    7064$(SRC)/flush.$(ARCH).o \
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/controller.c

    r18085 r23484  
    5050  }
    5151
     52  ControlLock(__func__);
    5253  status = (*func)(argc - 1, argv + 1);
     54  ControlUnlock(__func__);
    5355  return (status);
    5456}
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/controller_host.c

    r19125 r23484  
    1515
    1616  if (argc != 3) goto usage;
     17  if (max_threads && strcasecmp (argv[1], "ADD")) goto usage;
    1718
    1819  /* start controller connection (if needed) */
     
    2728  if (!strcasecmp (argv[1], "ADD")) {
    2829    AddHost (argv[2], max_threads);
    29   } else {
    30     if (max_threads) goto usage;
    31   }
     30  }
    3231
    3332  if (!strcasecmp (argv[1], "DELETE")) {
     
    4241  InitIOBuffer (&buffer, 0x100);
    4342
    44   SerialThreadLock ();
    4543  status = ControllerCommand (command, CONTROLLER_PROMPT, &buffer);
    46   SerialThreadUnlock ();
    4744
    4845  if (status) gwrite (buffer.buffer, 1, buffer.Nbuffer, GP_LOG);
     
    5754  return (FALSE);
    5855}
    59 
    60 /* should I keep an internal host table so I can reload the
    61    hosts if the controller exits?
    62 
    63    alternatively, that could be a user-level choice
    64 */
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/controller_jobstack.c

    r18085 r23484  
    2424  InitIOBuffer (&buffer, 0x100);
    2525
    26   SerialThreadLock ();
    2726  status = ControllerCommand (command, CONTROLLER_PROMPT, &buffer);
    28   SerialThreadUnlock ();
    2927
    3028  if (status) {
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/controller_status.c

    r8548 r23484  
    2323  InitIOBuffer (&buffer, 0x100);
    2424
    25   SerialThreadLock ();
    2625  status = ControllerCommand (command, CONTROLLER_PROMPT, &buffer);
    27   SerialThreadUnlock ();
    2826
    2927  if (status) {
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/controller_threads.c

    r15791 r23484  
    3434
    3535    // one run of the task checker
    36     SerialThreadLock ();
     36    ControlLock(__func__);
    3737    CheckController ();
     38    ControlUnlock(__func__);
     39
     40    ControlLock(__func__);
    3841    CheckControllerOutput ();
    39     SerialThreadUnlock ();
     42    ControlUnlock(__func__);
     43
    4044    if (VerboseMode() == 2) fprintf (stderr, "C");
    4145    // fprintf (stderr, "**** C ****");
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/controller_verbose.c

    r18085 r23484  
    2424  InitIOBuffer (&buffer, 0x100);
    2525
    26   SerialThreadLock ();
    2726  status = ControllerCommand (command, CONTROLLER_PROMPT, &buffer);
    28   SerialThreadUnlock ();
    2927
    3028  if (status) {
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/delete.c

    r13542 r23484  
    1111    JobID = atoi (argv[2]);
    1212
     13    JobTaskLock();
    1314    job = FindJob (JobID);
    1415    if (job == NULL) {
    1516      gprint (GP_LOG, "job not found\n");
     17      JobTaskUnlock();
    1618      return (TRUE);
    1719    }
     
    2123        job[0].state = JOB_HUNG;
    2224        if (VerboseMode()) gprint (GP_LOG, "child process %d is hung, cannot kill\n", job[0].pid);
     25        JobTaskUnlock();
    2326        return (FALSE);
    2427      }
     
    2730        job[0].state = JOB_HUNG;
    2831        if (VerboseMode()) gprint (GP_LOG, "child process %d is hung, cannot kill\n", job[0].pid);
     32        JobTaskUnlock();
    2933        return (FALSE);
    3034      }
     
    3236    DeleteJob (job);
    3337    gprint (GP_LOG, "job removed\n");
     38    JobTaskUnlock();
    3439    return (TRUE);
    3540  }
     
    4146
    4247usage:
    43     gprint (GP_ERR, "USAGE: delete job (JobID)\n");
    44     gprint (GP_ERR, "USAGE: delete task (TaskName)\n");
    45     return (FALSE);
     48  gprint (GP_ERR, "USAGE: delete job (JobID)\n");
     49  gprint (GP_ERR, "USAGE: delete task (TaskName)\n");
     50  return (FALSE);
    4651}
    4752
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/flush.c

    r14590 r23484  
    66
    77  if (!strcasecmp (argv[1], "jobs")) {
     8    JobTaskLock();
    89    FlushJobs ();
     10    JobTaskUnlock();
    911    return (TRUE);
    1012  }
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/init.c

    r16453 r23484  
    1818int halt            PROTO((int, char **));
    1919int flush_jobs      PROTO((int, char **));
    20 int pulse           PROTO((int, char **));
    2120int showtask        PROTO((int, char **));
    2221int status_sys      PROTO((int, char **));
     
    4039  {1, "options",    task_options,  "define optional variables associated with the job task"},
    4140  {1, "periods",    task_periods,  "define time scales for a task"},
    42   {1, "pulse",      pulse,         "set the scheduler update period"},
    4341  {1, "run",        run,           "run the scheduler"},
    4442  {1, "flush",      flush_jobs,    "flush all jobs from the queue"},
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/init_server.c

    r16903 r23484  
    1414int task_stdout     PROTO((int, char **));
    1515int task_stderr     PROTO((int, char **));
    16 int pulse           PROTO((int, char **));
    1716int flush_jobs      PROTO((int, char **));
    1817int status_server   PROTO((int, char **));
     
    4140  {1, "npending",   task_npending, "define maximum number of outstanding jobs for a task"},
    4241  {1, "periods",    task_periods,  "define time scales for a task"},
    43   {1, "pulse",      pulse,         "set the scheduler update period"},
    4442  {1, "flush",      flush_jobs,    "flush all jobs from the queue"},
    4543  {1, "server",     server,        "server-specific commands"},
     
    6765  InitJobs ();
    6866  InitJobIDs ();
    69   InitInputs ();
    7067
    7168  for (i = 0; i < sizeof (cmds) / sizeof (Command); i++) {
     
    7875  FreeJobs ();
    7976  FreeJobIDs ();
    80   FreeInputs ();
    8177}
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/kill.c

    r13542 r23484  
    1212  JobID = atoi (argv[1]);
    1313
     14  JobTaskLock();
    1415  job = FindJob (JobID);
    1516  if (job == NULL) {
    1617    gprint (GP_LOG, "job not found\n");
     18    JobTaskUnlock();
    1719    return (TRUE);
    1820  }
     
    2224      job[0].state = JOB_HUNG;
    2325      if (VerboseMode()) gprint (GP_LOG, "child process %d is hung, cannot kill\n", job[0].pid);
     26      JobTaskUnlock();
    2427      return (FALSE);
    2528    }
     
    2831      job[0].state = JOB_HUNG;
    2932      if (VerboseMode()) gprint (GP_LOG, "child process %d is hung, cannot kill\n", job[0].pid);
     33      JobTaskUnlock();
    3034      return (FALSE);
    3135    }
     
    3337  DeleteJob (job);
    3438  gprint (GP_LOG, "job removed\n");
     39  JobTaskUnlock();
    3540  return (TRUE);
    3641}
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/notes.txt

    r7892 r23484  
     1
     2stdout_cntl:
     3  GetJobOutout
     4    CheckControllerJob
     5      CheckController
     6  StartController
     7  ControllerCommand
     8    CheckController
     9    controller_check
     10    controller_host
     11    controller_jobstack
     12    controller_run
     13    controller_status
     14    controller_verbose
     15    DeleteControllerJob
     16    CheckControllerJobStatus
     17    SubmitControllerJob
     18    PrintControllerBusyJobs
     19    KillControllerJob
     20    QuitController
     21    RestartController   
     22  CheckControllerOutput
     23    controller_output
     24    controller_threads
     25
     26
     27
     28
     29
     30
     31
     3220090322
     33
     34  We've been surviving with a slightly broken threading / locking
     35  model.  I would like to clean it up.  Previously, the threads where
     36  blocking at a very coarse level, and there were certain interactions
     37  that were not thread-safe.  Here are my notes on re-working this:
     38
     39  * we have 6 threads:
     40
     41    * main (top-level parent) : after it spawns the 5 child threads,
     42      this thread waits for new clients and adds them to the list of
     43      active clients with 'AddNewClient'
     44
     45    * clientsThread (ListenClients): this thread is listening for
     46      commands from the clients; when it receives a complete command,
     47      it executes the command using 'multicommand'.
     48
     49    * tasksThread
     50      CheckTasks
     51        NextTask
     52
     53
     54
     55    * jobsThread
     56
     57    * controllerThread
     58      CheckControllerStatus : race condition is irrelevant
     59
     60    * inputsThread
     61      AddNewInput called by clientThread
     62      (not sure this is really being used:
     63           server input (filename) calls 'input'
     64           server module (filename) calls 'module'
     65
     66  * functions which need to be thread-safe:
     67    * AddNewClient
     68    * gprint and supporting functions must be thread safe (extensively
     69      used...)
     70
     71
     72* race condition is irrelevant for this variable
     73static int ControllerStatus = FALSE;
     74
     75static int stdin_cntl, stdout_cntl, stderr_cntl;
     76GetJobOutput -> CheckControllerJob -> CheckController -> controllerThread
     77ControllerCommand -> controllerThread / clientThread
     78CheckControllerOutput -> controllerThread / clientThread
     79StartController -> controllerThread / clientThread
     80StopController
     81
     82
     83
     84static IOBuffer stdout_buffer;
     85static IOBuffer stderr_buffer;
     86static int ControllerPID = 0;
     87
     88/* local static variables to track the controller host properties */
     89static Host *hosts = NULL;
     90static int Nhosts = 0;
     91static int NHOSTS = 0;
     92
     93
     94
     95
     96
     97
     98
     99
     100
     101
     102
     103
     104
     105
     106
     107
     108
     109
     110
     111
     112
     113
     114
    1115
    2116PanTasks Client / Server design
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/pantasks.c.in

    r16453 r23484  
    1010void program_init (int *argc, char **argv) {
    1111 
    12   pthread_t jobsThread;
    13   pthread_t tasksThread;
     12  pthread_t JobsAndTasksThread;
    1413  pthread_t controllerThread;
    1514
     
    5049
    5150  /* start up the background threads here */
    52   pthread_create (&tasksThread,      NULL, &CheckTasksThread,      NULL);
    53   pthread_create (&jobsThread,       NULL, &CheckJobsThread,       NULL);
    54   pthread_create (&controllerThread, NULL, &CheckControllerThread, NULL);
     51  // pthread_create (&tasksThread,      NULL, &CheckTasksThread,           NULL);
     52  // pthread_create (&jobsThread,       NULL, &CheckJobsThread,            NULL);
     53  pthread_create (&JobsAndTasksThread, NULL, &CheckJobsAndTasksThread, NULL);
     54  pthread_create (&controllerThread,   NULL, &CheckControllerThread,   NULL);
    5555  return;
    5656}
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/pantasks_server.c.in

    r16453 r23484  
    1717 
    1818  char log_stdout[128], log_stderr[128];
    19   pthread_t jobsThread;
    20   pthread_t tasksThread;
    21   pthread_t inputsThread;
     19  pthread_t JobsAndTasksThread;
    2220  pthread_t clientsThread;
    2321  pthread_t controllerThread;
     
    6361
    6462  /* start up the background threads here */
    65   pthread_create (&clientsThread,    NULL, &ListenClients,         NULL);
    66   pthread_create (&tasksThread,      NULL, &CheckTasksThread,      NULL);
    67   pthread_create (&jobsThread,       NULL, &CheckJobsThread,       NULL);
    68   pthread_create (&controllerThread, NULL, &CheckControllerThread, NULL);
    69   pthread_create (&inputsThread,     NULL, &CheckInputsThread,     NULL);
     63  pthread_create (&clientsThread,       NULL, &ListenClients,           NULL);
     64  pthread_create (&JobsAndTasksThread,  NULL, &CheckJobsAndTasksThread, NULL);
     65  pthread_create (&controllerThread,    NULL, &CheckControllerThread,   NULL);
    7066
    7167  /* in this loop, we listen for incoming connections, validate, and
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/server.c

    r16463 r23484  
    55int input           PROTO((int, char **));
    66int module          PROTO((int, char **));
    7 int server_load     PROTO((int, char **));
    87int server_run      PROTO((int, char **));
    98int server_stop     PROTO((int, char **));
     
    2019  {1, "input",  input,  "load input file on server"},
    2120  {1, "module", module, "load module file on server"},
    22   {1, "load",   server_load, "load input file on server"},
    2321  {1, "run",    server_run,  "run scheduler"},
    2422  {1, "stop",   server_stop, "stop scheduler"},
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/server_run.c

    r11084 r23484  
    1111  CheckJobsSetState (TRUE);
    1212  CheckControllerSetState (TRUE);
    13   CheckInputsSetState (TRUE);
    1413  return (TRUE);
    1514}
     
    2322
    2423  CheckTasksSetState (FALSE);
    25   // CheckJobsSetState (FALSE);
    26   // CheckControllerSetState (FALSE);
    27   CheckInputsSetState (FALSE);
    2824  return (TRUE);
    2925}
     
    3935  CheckJobsSetState (FALSE);
    4036  CheckControllerSetState (FALSE);
    41   CheckInputsSetState (FALSE);
    4237  return (TRUE);
    4338}
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/showtask.c

    r12468 r23484  
    88  }
    99
     10  JobTaskLock();
    1011  ShowTask (argv[1]);
     12  JobTaskUnlock();
     13
    1114  return (TRUE);
    1215}
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/status_server.c

    r16012 r23484  
    1111  if ((N = get_argument (argc, argv, "-tasks"))) {
    1212    remove_argument (N, &argc, argv);
     13    JobTaskLock();
    1314    ListTasks (FALSE);
     15    JobTaskUnlock();
    1416    return (TRUE);
    1517  }
     
    1719  if ((N = get_argument (argc, argv, "-taskinfo"))) {
    1820    remove_argument (N, &argc, argv);
     21    JobTaskLock();
    1922    ListTasks (TRUE);
     23    JobTaskUnlock();
    2024    return (TRUE);
    2125  }
     
    2327  if ((N = get_argument (argc, argv, "-taskstats"))) {
    2428    remove_argument (N, &argc, argv);
     29    JobTaskLock();
    2530    if (argc == 2) {
    2631      ListTaskStats (argv[N]);
     
    2833      ListTaskStats (NULL);
    2934    }     
     35    JobTaskUnlock();
    3036    return (TRUE);
    3137  }
     
    3339  if ((N = get_argument (argc, argv, "-taskstatsreset"))) {
    3440    remove_argument (N, &argc, argv);
     41    JobTaskLock();
    3542    if (argc == 2) {
    3643      ResetTaskStats (argv[N]);
     
    3845      ResetTaskStats (NULL);
    3946    }     
     47    JobTaskUnlock();
    4048    return (TRUE);
    4149  }
     
    5664    gprint (GP_LOG, " Controller is stopped\n");
    5765  }
     66
     67  JobTaskLock();
    5868  ListTasks (FALSE);
    5969  ListJobs ();
     70  JobTaskUnlock();
     71
     72  ControlLock(__func__);
     73  PrintControllerBusyJobs();
     74  ControlUnlock(__func__);
     75
    6076  return (TRUE);
    6177
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/task.c

    r14590 r23484  
    1111  if (argc != 2) goto usage;
    1212
     13  JobTaskLock();
    1314  task = FindTask (argv[1]);
    1415  if (task == NULL) { /**** new task ****/
     
    1819    SetNewTask (task);
    1920  }
     21  JobTaskUnlock();
     22
    2023  /* While a task is being defined, it is removed from the task list.  The new task is added to the task list
    2124     when the definition process is complete. 
     
    5558
    5659      case TASK_END:
    57         /* I need to add in a test here to see if all task elements
    58            have been defined.  delete the task if not */
    5960        free (input);
    6061        /* validate the new task: all mandatory elements defined? */
     
    6364          return (FALSE);
    6465        }
     66        JobTaskLock();
    6567        RegisterNewTask ();
     68        JobTaskUnlock();
    6669        return (TRUE);
    6770        break;
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/task_active.c

    r10734 r23484  
    77  if (argc != 2) goto usage;
    88
     9  JobTaskLock();
    910  task = GetNewTask ();
    1011  if (task == NULL) {
    1112    gprint (GP_ERR, "ERROR: not defining or running a task\n");
     13    JobTaskUnlock();
    1214    return (FALSE);
    1315  }
     
    1517  if (!strcasecmp (argv[1], "true")) {
    1618    task[0].active = TRUE;
     19    JobTaskUnlock();
    1720    return (TRUE);
    1821  }
    1922  if (!strcasecmp (argv[1], "false")) {
    2023    task[0].active = FALSE;
     24    JobTaskUnlock();
    2125    return (TRUE);
    2226  }
    2327
    2428  gprint (GP_ERR, "ERROR: invalid option: %s\n", argv[1]);
     29  JobTaskUnlock();
    2530  return (FALSE);
    2631
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/task_command.c

    r7917 r23484  
    1212  }
    1313
     14  JobTaskLock();
    1415  task = GetNewTask ();
    1516  if (task == NULL) {
     
    1718    if (task == NULL) {
    1819      gprint (GP_ERR, "ERROR: not defining or running a task\n");
     20      JobTaskUnlock();
    1921      return (FALSE);
    2022    }
     
    3537    task[0].argv[i] = strcreate (argv[i+1]);
    3638  }
     39  JobTaskUnlock();
    3740  return (TRUE);
    3841}
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/task_host.c

    r7917 r23484  
    2222
    2323  task = GetNewTask ();
     24  JobTaskLock();
    2425  if (task == NULL) {
    2526    task = GetActiveTask ();
    2627    if (task == NULL) {
    2728      gprint (GP_ERR, "ERROR: not defining or running a task\n");
     29      JobTaskUnlock();
    2830      return (FALSE);
    2931    }
     
    3436  task[0].host = NULL;
    3537
    36   if (!strcasecmp (argv[1], "LOCAL")) return (TRUE);
     38  if (!strcasecmp (argv[1], "LOCAL")) {
     39    JobTaskUnlock();
     40    return (TRUE);
     41  }
    3742
    3843  task[0].host = strcreate (argv[1]);
     44  JobTaskUnlock();
    3945  return (TRUE);
    4046}
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/task_macros.c

    r10647 r23484  
    2222  }
    2323
     24  JobTaskLock();
    2425  task = GetNewTask ();
    2526  if (task == NULL) {
    2627    gprint (GP_ERR, "ERROR: not defining or running a task\n");
     28    JobTaskUnlock();
    2729    return (FALSE);
    2830  }
     
    135137        free (input);
    136138        REALLOCATE (macro[0].line, char *, MAX (1, macro[0].Nlines));
     139        JobTaskUnlock();
    137140        return (TRUE);
    138141      }
     
    148151    }
    149152  }
     153  JobTaskUnlock();
    150154  return (TRUE);
    151155}
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/task_nmax.c

    r11055 r23484  
    77  if (argc != 2) goto usage;
    88
     9  JobTaskLock();
    910  task = GetNewTask ();
    1011  if (task == NULL) {
    1112    gprint (GP_ERR, "ERROR: not defining or running a task\n");
     13    JobTaskUnlock();
    1214    return (FALSE);
    1315  }
    1416
    1517  task[0].Nmax = atoi (argv[1]);
     18  JobTaskUnlock();
    1619  return (TRUE);
    1720
     
    2730  if (argc != 2) goto usage;
    2831
     32  JobTaskLock();
    2933  task = GetNewTask ();
    3034  if (task == NULL) {
    3135    gprint (GP_ERR, "ERROR: not defining or running a task\n");
     36    JobTaskUnlock();
    3237    return (FALSE);
    3338  }
    3439
    3540  task[0].NpendingMax = atoi (argv[1]);
     41  JobTaskUnlock();
    3642  return (TRUE);
    3743
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/task_options.c

    r8129 r23484  
    1212  }
    1313
     14  JobTaskLock();
    1415  task = GetNewTask ();
    1516  if (task == NULL) {
     
    1718    if (task == NULL) {
    1819      gprint (GP_ERR, "ERROR: not defining or running a task\n");
     20      JobTaskUnlock();
    1921      return (FALSE);
    2022    }
     
    3537    task[0].optv[i] = strcreate (argv[i+1]);
    3638  }
     39  JobTaskUnlock();
    3740  return (TRUE);
    3841}
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/task_periods.c

    r7917 r23484  
    4040  }
    4141
     42  JobTaskLock();
    4243  task = GetNewTask ();
    4344  if (task == NULL) {
     
    4546    if (task == NULL) {
    4647      gprint (GP_ERR, "ERROR: not defining or running a task\n");
     48      JobTaskUnlock();
    4749      return (FALSE);
    4850    }
     
    5355  if (Timeout) task[0].timeout_period = TimeoutValue;
    5456
     57  JobTaskUnlock();
    5558  return (TRUE);
    5659}
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/task_stdout.c

    r12332 r23484  
    1111  }
    1212
     13  JobTaskLock();
    1314  task = GetNewTask ();
    1415  if (task == NULL) {
     
    1617    if (task == NULL) {
    1718      gprint (GP_ERR, "ERROR: not defining or running a task\n");
     19      JobTaskUnlock();
    1820      return (FALSE);
    1921    }
     
    2123  if (task[0].stdout_dump != NULL) free (task[0].stdout_dump);
    2224  task[0].stdout_dump = strcreate (argv[1]);
     25  JobTaskUnlock();
    2326  return (TRUE);
    2427}
     
    3437  }
    3538
     39  JobTaskLock();
    3640  task = GetNewTask ();
    3741  if (task == NULL) {
     
    3943    if (task == NULL) {
    4044      gprint (GP_ERR, "ERROR: not defining or running a task\n");
     45      JobTaskUnlock();
    4146      return (FALSE);
    4247    }
     
    4449  if (task[0].stderr_dump != NULL) free (task[0].stderr_dump);
    4550  task[0].stderr_dump = strcreate (argv[1]);
     51  JobTaskUnlock();
    4652  return (TRUE);
    4753}
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/task_trange.c

    r15487 r23484  
    1212    if (argc != 1) goto usage;
    1313
     14    JobTaskLock();
    1415    task = GetNewTask ();
    1516    if (task == NULL) {
    1617      gprint (GP_ERR, "ERROR: not defining or running a task\n");
     18      JobTaskUnlock();
    1719      return (FALSE);
    1820    }
     
    2022    task[0].Nranges = 0;
    2123    REALLOCATE (task[0].ranges, TimeRange, 1);
     24    JobTaskUnlock();
    2225    return (TRUE);
    2326  }
     
    4043
    4144  if (argc != 3) goto usage;
    42 
    43   task = GetNewTask ();
    44   if (task == NULL) {
    45     gprint (GP_ERR, "ERROR: not defining or running a task\n");
    46     return (FALSE);
    47   }
    4845
    4946  /* test for Mon[@HH:MM:SS] - both must match */
     
    8380
    8481valid:
     82  JobTaskLock();
     83  task = GetNewTask ();
     84  if (task == NULL) {
     85    gprint (GP_ERR, "ERROR: not defining or running a task\n");
     86    JobTaskUnlock();
     87    return (FALSE);
     88  }
     89
    8590  N = task[0].Nranges;
    8691  task[0].Nranges ++;
     
    8893 
    8994  task[0].ranges[N] = range;
     95  JobTaskUnlock();
    9096  return (TRUE);
    9197
  • branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/thread_locks.c

    r11084 r23484  
    11# include "pantasks.h"
    22
    3 /* this mutex is used by the serialized threads */
     3/* mutex to lock Client table operations */
     4static pthread_mutex_t ClientMutex = PTHREAD_MUTEX_INITIALIZER;
    45
    5 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    6 
    7 void SerialThreadLock () {
    8     pthread_mutex_lock (&mutex);
     6void ClientLock () {
     7  pthread_mutex_lock (&ClientMutex);
    98}
    109
    11 void SerialThreadUnlock () {
    12     pthread_mutex_unlock (&mutex);
     10void ClientUnlock () {
     11  pthread_mutex_unlock (&ClientMutex);
    1312}
    1413
     14/* mutex to lock Command / Multicommand operations */
     15static pthread_mutex_t CommandMutex = PTHREAD_MUTEX_INITIALIZER;
     16
     17void CommandLock () {
     18  //  fprintf (stderr, "command lock\n");
     19  pthread_mutex_lock (&CommandMutex);
     20}
     21
     22void CommandUnlock () {
     23  //  fprintf (stderr, "command unlock\n");
     24  pthread_mutex_unlock (&CommandMutex);
     25}
     26
     27/* mutex to lock Control operations */
     28static pthread_mutex_t ControlMutex = PTHREAD_MUTEX_INITIALIZER;
     29
     30void ControlLock (char *func) {
     31  // fprintf (stderr, "control lock %s\n", func);
     32  pthread_mutex_lock (&ControlMutex);
     33}
     34
     35void ControlUnlock (char *func) {
     36  // fprintf (stderr, "control unlock %s\n", func);
     37  pthread_mutex_unlock (&ControlMutex);
     38}
     39
     40/* mutex to lock Job / Task operations */
     41static pthread_mutex_t JobTaskMutex = PTHREAD_MUTEX_INITIALIZER;
     42
     43void JobTaskLock () {
     44  //  fprintf (stderr, "jobtask lock\n");
     45  pthread_mutex_lock (&JobTaskMutex);
     46}
     47
     48void JobTaskUnlock () {
     49  //  fprintf (stderr, "jobtask unlock\n");
     50  pthread_mutex_unlock (&JobTaskMutex);
     51}
     52
Note: See TracChangeset for help on using the changeset viewer.