IPP Software Navigation Tools IPP Links Communication Pan-STARRS Links

Changeset 4450


Ignore:
Timestamp:
Jul 4, 2005, 5:35:47 PM (21 years ago)
Author:
eugene
Message:

substantial dev work on scheduler/pcontrol/pclient

Location:
trunk/Ohana/src/opihi
Files:
11 added
34 edited

Legend:

Unmodified
Added
Removed
  • trunk/Ohana/src/opihi/doc/pcontrol.txt

    r3335 r4450  
    8080can set the CheckTask function to this hook (& unset it).
    8181
    82 
    8382---
    8483
     
    116115 status -queues
    117116
     117pcontrol may be given a timeout for each job.  pcontrol will monitor a
     118job and kill/crash it if the timeout expires.  the timeout only
     119governs how long it is allowed to execute, not how long it can sit in
     120the queue.  (the scheduler / operator should decide if a job has been
     121on pcontrol for too long -- this probably means there are no
     122appropriate machines ).
     123
     124pcontrol currently does not distinguish between multiple instances of
     125a single host.  all have the same name.  if you want to bring down a
     126host, you need to issue N host -down commands.  perhaps this is
     127silly.  a simple alternative would be for the host [-on -off -start
     128-stop] commands to apply to all defined entries which match the
     129hostname.  In this case, a command like 'host foo -off' would find and
     130halt all connections to the machine 'foo', while 'host foo -on' would
     131restart them all (or rather, given the functionality of pcontrol,
     132would allow pcontrol to attempt to bring them on).
     133
     134It is not clear why a user should be able to execute 'start' (down ->
     135idle) or 'stop' (idle -> down).  The transition down -> idle is
     136automatically performed by pcontrol for any machines which are
     137currently down, while the transition idle -> down is immediately
     138followed by an attempt by pcontrol to move the host from down -> idle.
     139
     140does it makes sense to kill all jobs on a host?  this would only have
     141the effect of clearing the host for a moment until pcontrol decided to
     142start another job on that host.  the desired effect is gained putting
     143the host to 'off'.
    118144 
     145currently the command 'host (hostname)' puts the host in 'down'
     146state.  pcontrol then immediately tries to connect to the host, moving
     147it to 'idle' state (and then 'busy' if any jobs are available).  it
     148might be useful to be able to add a host in 'off' state as a starting
     149point. 
     150
     151it is not obvious that the user should be able to run 'CheckHost',
     152unless this gets expanded to return state information on the host.
     153
    119154---
    120155
     
    137172PENDING -> BUSY    : StartJob
    138173PENDING -> DEL     : DelJob
    139 BUSY    -> EXIT    : CheckJob
    140 BUSY    -> CRASH   : CheckJob | KillJob
     174BUSY    -> DONE    : CheckBusyJob | KillJob
     175DONE    -> EXIT    : CheckDoneJob
     176DONE    -> CRASH   : CheckDoneJob
    141177BUSY    -> PENDING : CheckJob | CheckHost
    142178EXIT    -> DEL     : DelJob
  • trunk/Ohana/src/opihi/doc/scheduler.txt

    r3335 r4450  
     1
     2---
     3
     4sched / pcontrol todo:
     5
     6- sched: validate task hosts with controller
     7- pcontrol: host (name) -check should return state
     8- sched: start / stop controller connection?
     9- pcontrol: add command to check status of a job
     10- pcontrol: add command to dump stdout/stderr for a job
     11
     12---
    113
    214scheduler commands:
     
    2436   (in task, command line is static; in task.macro, command line is expanded for each instance)
    2537
    26 host (machine)
     38host (machine) [-required]
    2739 - defines preferred host
    2840 - may be in task or in task.macro (exit/exec)
     
    6173---
    6274
     75scheduler / controller / local interactions
     76
     77scheduler can execute local jobs and jobs on the controller.
     78scheduler needs to initiate the connection to the controller, if the
     79controller is being used.  it also needs to fork each local command.
     80
     81we need a command to define the controller hosts to be used.  if a
     82task or job defines a host which is not known, how is this caught?
     83does sched need to keep a list of valid hosts, or should the host be
     84passed down to the controller.  Also, the scheduler should not
     85initiate a connection to the controller unless it is requested.  is
     86this implicit when defining a host? 
     87
     88- controller host foobar
     89 
     90  this should setup the controller interface (if it does not currently
     91  exist), and send the command 'host foobar'.  scheduler should not
     92  care if the connection is successful or not (we can know about hosts
     93  which are currently off). 
     94
     95
     96---
    6397controller interaction:
    6498
  • trunk/Ohana/src/opihi/include/pcontrol.h

    r3212 r4450  
    7272  int          exit_status;
    7373  int          Reset;
     74  int          stdout_size;
     75  int          stderr_size;
    7476  JobMode      mode;
    7577  JobStat      state;
     
    128130Host *GetHost (int StackID, int where);
    129131int FindHost (IDtype HostID, int StackID);
     132Host *FindHostPtr (IDtype HostID, int StackID);
     133Host *FindHostStack (IDtype HostID);
     134Host *PullHost (IDtype HostID, int StackID);
    130135int FindNamedHost (char *name, int StackID);
    131136void AddHost (char *hostname);
     
    149154int CheckDoneHost (Host *host);
    150155int CheckDoneJob (Job *job);
    151 int GetJobOutput (Job *job, char *channel);
     156int GetJobOutput (char *command, Host *host, IOBuffer *buffer, int Nbytes);
    152157int ResetJob (Job *job);
    153158int PclientCommand (Host *host, char *command, char *response, IOBuffer *buffer);
     
    159164Job *GetJob (int StackID, int where);
    160165int FindJob (IDtype JobID, int StackID);
     166Job *FindJobPtr (IDtype JobID, int StackID);
     167Job *FindJobStack (IDtype JobID);
     168Job *PullJob (IDtype JobID, int StackID);
    161169IDtype AddJob (char *hostname, JobMode mode, int timeout, int argc, char **argv);
    162 void KillJob (Job *job);
     170int KillJob (Job *job);
    163171void DelJob (Job *job);
    164172Host *UnlinkJobAndHost (Job *job);
  • trunk/Ohana/src/opihi/include/scheduler.h

    r3143 r4450  
     1# include "external.h"
     2# include "shell.h"
     3# include "dvomath.h"
     4# include "convert.h"
     5# include "display.h"
     6# include <sys/types.h>
     7# include <sys/wait.h>
    18
    2 enum {JOB_BUSY, JOB_EXIT, JOB_CRASH};
     9typedef enum {
     10  JOB_NONE,
     11  JOB_BUSY,
     12  JOB_EXIT,
     13  JOB_CRASH,
     14  JOB_PENDING,
     15} JobStat;
     16
     17typedef enum {
     18  JOB_LOCAL,
     19  JOB_CONTROLLER,
     20} JobMode;
     21
     22typedef enum {
     23  CONTROLLER_HUNG = -1,
     24  CONTROLLER_DOWN = 0,
     25  CONTROLLER_GOOD = 1,
     26} ControllerStat;
     27
     28/* socket / pipe communication buffer */
     29typedef struct {
     30  char *buffer;
     31  int   Nalloc;
     32  int   Nreset;
     33  int   Nblock;
     34  int   Nbuffer;
     35} IOBuffer;
    336
    437/* a task is a description of the wrapping of a job */
     
    1649  char  **argv;
    1750  char   *host;
     51  int     host_required;
    1852
    1953  char   *name;
     
    2862typedef struct {
    2963  int JobID;                            /* internal ID for job */
    30   int PID;                              /* external ID for job */
    31 
    32   Task task;
     64  int pid;                              /* external ID for job */
    3365
    3466  struct timeval last;
     
    3668  int state;
    3769  int exit_status;
     70
     71  int     argc;
     72  char  **argv;
     73  Task   *task;
     74
     75  IOBuffer    stdout;                   /* stdout storage buffer */
     76  IOBuffer    stderr;                   /* stderr storage buffer */
     77  JobMode     mode;                     /* local or controller? */
     78
     79  int         stdout_size;              /* stdout pipe (local only) */
     80  int         stderr_size;              /* stderr pipe (local only) */
     81  int         stdout_fd;                /* stdout pipe (local only) */
     82  int         stderr_fd;                /* stderr pipe (local only) */
    3883} Job;
    3984
     85# define CONTROLLER_PROMPT "pclient:"
    4086
    4187/* scheduler prototypes */
     
    5096int TaskHash (char *input);
    5197int ShowTask (char *name);
     98char *memstr (char *m1, char *m2, int n);
    5299
    53100Job *NextJob ();
     
    65112void SetTaskTimer (struct timeval *timer);
    66113double GetTaskTimer (struct timeval start);
     114
     115int CheckJobs ();
     116int CheckTasks ();
     117int CheckSystem ();
     118int GetJobOutput (char *channel, int pid, IOBuffer *buffer, int Nbytes);
     119int CheckControllerJob (Job *job);
     120int CheckControllerJobStatus (Job *job);
     121int SubmitControllerJob (Job *job);
     122int StartController ();
     123int ControllerCommand (char *command, char *response, IOBuffer *buffer);
     124int SubmitLocalJob (Job *job);
     125int CheckLocalJob (Job *job);
     126int CheckLocalJobStatus (Job *job);
     127void InitTaskTimers ();
  • trunk/Ohana/src/opihi/pantasks/JobOps.c

    r3140 r4450  
    1 # include "opihi.h"
    21# include "scheduler.h"
    32
     
    98
    109static Job *jobs;
    11 static int   Njobs;
    12 static int   NJOBS;
     10static int  Njobs;
     11static int  NJOBS;
    1312
    1413static char *JobName = dot;
     
    6564
    6665  for (i = 0; i < Njobs; i++) {
    67     fprintf (stderr, "%d: %-15s %5d %20s (%x)\n", Njobs, jobs[i].task.name, jobs[i].JobID, jobs[i].task.argv[0], jobs[i].task.argv);
     66    fprintf (stderr, "%d: %-15s %5d %20s (%x)\n", Njobs, jobs[i].task[0].name, jobs[i].JobID, jobs[i].argv[0], jobs[i].argv);
    6867  }
    6968  return;
     
    7574  int i;
    7675
    77   /* try for an exact match first */
     76  /* return job with matching JobID */
    7877  for (i = 0; i < Njobs; i++) {
    7978    if (jobs[i].JobID == JobID) {
     
    9089
    9190  jobs[Njobs].JobID = NextJobID ();
    92   jobs[Njobs].PID = 0;
    93   jobs[Njobs].task = task[0];
    94 
    95   /* we need our own copy of task[0].argv */
    96   ALLOCATE (jobs[Njobs].task.argv, char *, MAX (jobs[Njobs].task.argc, 1));
    97   for (i = 0; i < jobs[Njobs].task.argc; i++) {
    98     jobs[Njobs].task.argv[i] = strcreate (task[0].argv[i]);
    99   }
    100   jobs[Njobs].task.host = strcreate (task[0].host);
     91  jobs[Njobs].pid = 0;
     92  jobs[Njobs].mode = JOB_LOCAL;
     93  if (task[0].host != NULL) {
     94    jobs[Njobs].mode = JOB_CONTROLLER;
     95  }
     96
     97  /* we need our own copy of task[0].argv
     98   *  argc is the number of valid args, like the usual command line.
     99   *  we allocate one extra element, with value 0 to be passed to execvp
     100   */
     101  jobs[Njobs].argc = task[0].argc;
     102  ALLOCATE (jobs[Njobs].argv, char *, MAX (task[0].argc + 1, 1));
     103  for (i = 0; i < task[0].argc; i++) {
     104    jobs[Njobs].argv[i] = strcreate (task[0].argv[i]);
     105  }
     106  jobs[Njobs].argv[i] = 0;
     107
     108  /* other data from the task is needed by the job
     109     we carry a pointer back to the task.  this means we
     110     cannot modify the task once a job is created, or the changes will
     111     be applied to the existing jobs */
     112
     113  jobs[Njobs].task = task;
     114 
     115  /* if we decide we need to be able to dynamically set task qualities
     116     (like host, timeouts, etc), the we will need to have matched
     117     entries to these quantites in the job structure */
    101118
    102119  Njobs ++;
     
    108125}
    109126
     127void FreeJob (Job *job) {
     128 
     129  int i;
     130
     131  if (job == NULL) return;
     132
     133  if ((job[0].JobID >= 0) || (job[0].JobID < MAX_N_JOBS)) {
     134    JobIDList[job[0].JobID] = FALSE;
     135  }
     136
     137  for (i = 0; i < job[0].argc; i++) {
     138    free (job[0].argv[i]);
     139  }
     140  free (job[0].argv);
     141  return;
     142}
     143
    110144void SetCurrentJob (char *name) {
    111145  JobName = name;
     
    114148char *GetCurrentJob () {
    115149  return (JobName);
    116 }
    117 
    118 void FreeJob (Job *job) {
    119  
    120   int i;
    121 
    122   if (job == NULL) return;
    123 
    124   if ((job[0].JobID >= 0) || (job[0].JobID < MAX_N_JOBS)) {
    125     JobIDList[job[0].JobID] = FALSE;
    126   }
    127 
    128   for (i = 0; i < job[0].task.argc; i++) {
    129     free (job[0].task.argv[i]);
    130   }
    131   free (job[0].task.argv);
    132   free (job[0].task.host);
    133   return;
    134150}
    135151
     
    161177}
    162178
    163 /* this needs to:
    164    1) distinguish local from controller jobs
    165    2) fork the local jobs in the background
    166    3) send the controller jobs to the controller */
    167 
    168179int SubmitJob (Job *job) {
    169180
    170   int i, Nchar;
    171   char *string;
    172 
    173   Nchar = 0;
    174   for (i = 0; i < job[0].task.argc; i++) {
    175     Nchar += strlen (job[0].task.argv[i]) + 1;
    176   }
    177   ALLOCATE (string, char, Nchar);
    178   bzero (string, Nchar);
    179 
    180   strcat (string, job[0].task.argv[0]);
    181   for (i = 1; i < job[0].task.argc; i++) {
    182     strcat (string, " ");
    183     strcat (string, job[0].task.argv[i]);
    184   }
    185  
    186   fprintf (stderr, "executing job %d ...%s...\n", job[0].JobID, string);
    187   free (string);
     181  if (job[0].mode == JOB_LOCAL) {
     182    SubmitLocalJob (job);
     183  } else {
     184    SubmitControllerJob (job);
     185  }
    188186
    189187  /* reset clock for start and poll-test */
     
    196194int CheckJob (Job *job) {
    197195
    198   float f;
    199   f = drand48 ();
    200 
    201   if ((0.00 < f) && (f < 0.25)) {
    202     job[0].state = JOB_BUSY;
    203     job[0].exit_status = -1;
    204     return (JOB_BUSY);
    205   }
    206   if ((0.25 < f) && (f < 0.50)) {
    207     job[0].state = JOB_BUSY;
    208     job[0].exit_status = -1;
    209     return (JOB_CRASH);
    210   }
    211   if ((0.50 < f) && (f < 0.75)) {
    212     job[0].state = JOB_BUSY;
    213     job[0].exit_status = 0;
    214     return (JOB_EXIT);
    215   }
    216   if ((0.75 < f) && (f < 1.00)) {
    217     job[0].state = JOB_BUSY;
    218     job[0].exit_status = 1;
    219     return (JOB_EXIT);
    220   }
    221 }
     196  /* add checks for timeouts */
     197
     198  if (job[0].mode == JOB_LOCAL) {
     199    CheckLocalJob (job);
     200  } else {
     201    CheckControllerJob (job);
     202  }
     203  return (job[0].state);
     204}
  • trunk/Ohana/src/opihi/pantasks/LocalJob.c

    r3392 r4450  
     1# include "scheduler.h"
    12
    2 /* local jobs are forked in the background */
     3/* this could be written a just a one-way pipe */
     4int SubmitLocalJob (Job *job) {
    35
    4 SubmitLocalJob (Job *job) {
     6  int status, pid;
     7  int stdout_fd[2], stderr_fd[2];
    58
    6   /*
    7      construct the command line
    8      fork the command, get back the PID
    9      increment local job counter
    10   */
     9  bzero (stdout_fd, 2*sizeof(int));
     10  bzero (stderr_fd, 2*sizeof(int));
    1111
     12  if (pipe (stdout_fd) < 0) goto pipe_error;
     13  if (pipe (stderr_fd) < 0) goto pipe_error;
     14
     15  pid = fork ();
     16  if (!pid) { /* must be child process */
     17    fprintf (stderr, "starting controller connection\n");
     18
     19    /* close the other ends of the pipes */
     20    close (stdout_fd[0]);
     21    close (stderr_fd[0]);
     22
     23    /* tie our ends of the pipes to stdin, stdout, stderr */
     24    dup2 (stdout_fd[1], STDOUT_FILENO);
     25    dup2 (stderr_fd[1], STDERR_FILENO);
     26
     27    /* set all three unblocking */
     28    setvbuf (stdout, (char *) NULL, _IONBF, BUFSIZ);
     29    setvbuf (stderr, (char *) NULL, _IONBF, BUFSIZ);
     30
     31    status = execvp (job[0].argv[0], job[0].argv);
     32    exit (1);
     33  }
     34
     35  /* close the other ends of the pipes */
     36  close (stdout_fd[1]); stdout_fd[1] = 0;
     37  close (stderr_fd[1]); stderr_fd[1] = 0;
     38
     39  /* make the pipes non-blocking */
     40  fcntl (stdout_fd[0], F_SETFL, O_NONBLOCK);
     41  fcntl (stderr_fd[0], F_SETFL, O_NONBLOCK);
     42
     43  job[0].stdout_fd = stdout_fd[0];
     44  job[0].stderr_fd = stderr_fd[0];
     45  job[0].pid = pid;
     46
     47  return (TRUE);
     48
     49pipe_error:
     50  perror ("pipe error:");
     51  if (stdout_fd[0] != 0) close (stdout_fd[0]);
     52  if (stdout_fd[1] != 0) close (stdout_fd[1]);
     53  if (stderr_fd[0] != 0) close (stderr_fd[0]);
     54  if (stderr_fd[1] != 0) close (stderr_fd[1]);
     55  return (FALSE);
    1256}
    1357
    14 CheckLocalJob (Job *job) {
     58/* update current state, drain stdout/stderr buffers */
     59int CheckLocalJob (Job *job) {
    1560
    16   /*
    17      
     61  int Nread;
     62
     63  // XXX do something useful with exit status?
     64  CheckLocalJobStatus (job);
     65
     66  /* read stdout buffer */
     67  Nread = ReadtoIOBuffer (&job[0].stdout, job[0].stdout_fd);
     68  switch (Nread) {
     69    case -2:  /* error in read (programming error?  system level error?) */
     70      fprintf (stderr, "serious IO error\n");
     71      exit (2);
     72    case -1:  /* no data in pipe */
     73      break;
     74    case 0:   /* pipe is closed */
     75      /** change child state? **/
     76      break;
     77    default:  /* data in pipe */
     78      break;
     79  }
     80 
     81  /* read stderr buffer */
     82  Nread = ReadtoIOBuffer (&job[0].stderr, job[0].stderr_fd);
     83  switch (Nread) {
     84    case -2:  /* error in read (programming error?  system level error?) */
     85      fprintf (stderr, "serious IO error\n");
     86      exit (2);
     87    case -1:  /* no data in pipe */
     88      break;
     89    case 0:   /* pipe is closed */
     90      /** change child state? **/
     91      break;
     92    default:  /* data in pipe */
     93      break;
     94  }
     95  return (TRUE);
     96}
     97
     98int CheckLocalJobStatus (Job *job) {
     99
     100  int result, waitstatus;
     101
     102  /* check local job status */
     103  result = waitpid (job[0].pid, &waitstatus, WNOHANG);
     104  switch (result) {
     105    case -1:  /* error with waitpid */
     106      switch (errno) {
     107        case ECHILD:
     108          fprintf (stderr, "unknown PID, not a child proc\n");
     109          fprintf (stderr, "did process already exit?  programming error?\n");
     110          job[0].state = JOB_NONE;
     111          job[0].exit_status = 0;
     112          return (FALSE);
     113        case EINVAL:
     114          fprintf (stderr, "error EINVAL (waitpid): programming error\n");
     115          exit (1);
     116        case EINTR:
     117          fprintf (stderr, "error EINTR (waitpid): programming error\n");
     118          exit (1);
     119        default:
     120          fprintf (stderr, "unknown error for waitpid (%d): programming error\n", errno);
     121          exit (1);
     122      }
     123      break;
     124     
     125    case 0:  /* process not exited */
     126      job[0].state = JOB_BUSY;
     127      job[0].exit_status = 0;
     128      return (TRUE);
     129
     130    default:
     131      if (result != job[0].pid) {
     132        fprintf (stderr, "waitpid error: mis-matched PID (%d vs %d).  programming error\n", result, job[0].pid);
     133        exit (1);
     134      }
     135     
     136      if (WIFEXITED(waitstatus)) {
     137        job[0].state = JOB_EXIT;
     138        job[0].exit_status = WEXITSTATUS(waitstatus);
     139      }
     140      if (WIFSIGNALED(waitstatus)) {
     141        job[0].state = JOB_CRASH;
     142        job[0].exit_status = WTERMSIG(waitstatus);
     143      }
     144      if (WIFSTOPPED(waitstatus)) {
     145        fprintf (stderr, "waitpid returns 'stopped': programming error\n");
     146        exit (1);
     147      }
     148  }
     149  return;
     150}
  • trunk/Ohana/src/opihi/pantasks/Makefile

    r3525 r4450  
    2525# sched user commands and support functions ########################
    2626
    27 sched = \
     27funcs = \
     28$(SDIR)/CheckJobs.$(ARCH).o \
     29$(SDIR)/CheckSystem.$(ARCH).o \
     30$(SDIR)/CheckTasks.$(ARCH).o \
     31$(SDIR)/ControllerOps.$(ARCH).o \
     32$(SDIR)/LocalJob.$(ARCH).o \
     33$(SDIR)/JobOps.$(ARCH).o \
     34$(SDIR)/TaskOps.$(ARCH).o \
     35$(SDIR)/IOBufferOps.$(ARCH).o \
     36$(SDIR)/memstr.$(ARCH).o \
     37$(SDIR)/init.$(ARCH).o
     38
     39cmds = \
     40$(SDIR)/controller.$(ARCH).o \
    2841$(SDIR)/run.$(ARCH).o \
     42$(SDIR)/scheduler.$(ARCH).o \
    2943$(SDIR)/task.$(ARCH).o \
    3044$(SDIR)/task_command.$(ARCH).o \
    3145$(SDIR)/task_host.$(ARCH).o \
    3246$(SDIR)/task_macros.$(ARCH).o \
    33 $(SDIR)/task_periods.$(ARCH).o \
    34 $(SDIR)/JobOps.$(ARCH).o \
    35 $(SDIR)/TaskOps.$(ARCH).o \
    36 $(SDIR)/init.$(ARCH).o \
    37 $(SDIR)/scheduler.$(ARCH).o
     47$(SDIR)/task_periods.$(ARCH).o
    3848
    3949libs = \
     
    4959        @echo done
    5060
    51 $(BIN)/scheduler.$(ARCH) : $(sched) $(libs)
     61$(BIN)/scheduler.$(ARCH) : $(funcs) $(cmds) $(libs)
    5262
    5363install: $(DESTBIN)/scheduler
  • trunk/Ohana/src/opihi/pantasks/TaskOps.c

    r3140 r4450  
    108108
    109109  tasks[Ntasks].host = NULL;
     110  tasks[Ntasks].host_required = FALSE;
    110111
    111112  tasks[Ntasks].argc = 0;
     
    124125  tasks[Ntasks].poll_period = 1.0;
    125126  tasks[Ntasks].timeout_period = 1.0;
     127
     128  /* init task timer (is reset by 'run') */ 
     129  gettimeofday (&tasks[Ntasks].last, (void *) NULL);
    126130
    127131  Ntasks ++;
     
    155159  gettimeofday (timer, (void *) NULL);
    156160}
     161
     162/* start the clock for all tasks */
     163void InitTaskTimers () {
     164
     165  Task *task;
     166
     167  while ((task = NextTask ()) != NULL) {
     168    gettimeofday (&task[0].last, (void *) NULL);
     169 }
     170}
  • trunk/Ohana/src/opihi/pantasks/init.c

    r3140 r4450  
    22# include "scheduler.h"
    33
     4int controller      PROTO((int, char **));
    45int task            PROTO((int, char **));
    56int task_host       PROTO((int, char **));
     
    89int task_periods    PROTO((int, char **));
    910int run             PROTO((int, char **));
     11int stop            PROTO((int, char **));
    1012
    1113static Command cmds[] = { 
    12   {"task",      task,         "define a schedulable task"},
    13   {"host",      task_host,    "define host machine for a task"},
    14   {"task.exit", task_macros,  "define exit macros for a task"},
    15   {"task.exec", task_macros,  "define pre-exec macro for a task"},
    16   {"command",   task_command, "define executed command for a task"},
    17   {"periods",   task_periods, "define time scales for a task"},
    18   {"run",       run,          "run the scheduler"},
     14  {"controller", controller,   "controller commands"},
     15  {"task",       task,         "define a schedulable task"},
     16  {"host",       task_host,    "define host machine for a task"},
     17  {"task.exit",  task_macros,  "define exit macros for a task"},
     18  {"task.exec",  task_macros,  "define pre-exec macro for a task"},
     19  {"command",    task_command, "define executed command for a task"},
     20  {"periods",    task_periods, "define time scales for a task"},
     21  {"run",        run,          "run the scheduler"},
     22  {"stop",       stop,         "stop the scheduler"},
    1923};
    2024
  • trunk/Ohana/src/opihi/pantasks/run.c

    r3140 r4450  
    1 # include "basic.h"
    21# include "scheduler.h"
    32
    43int run (int argc, char **argv) {
    5 
    6   Job *job;
    7   Task *task;
    8   Macro *macro;
    9   int i, found, status;
    10   int Ntest;
    114
    125  if (argc != 1) {
     
    158  }
    169
    17   /* start the clock for all tasks */
    18   while ((task = NextTask ()) != NULL) {
    19     gettimeofday (&task[0].last, (void *) NULL);
     10  InitTaskTimers ();
     11  rl_event_hook = CheckSystem;
     12  rl_set_keyboard_input_timeout (1000000);
     13
     14  return (TRUE);
     15}
     16
     17int stop (int argc, char **argv) {
     18
     19  if (argc != 1) {
     20    fprintf (stderr, "USAGE: stop\n");
     21    return (FALSE);
    2022  }
    2123
    22   Ntest = 0;
     24  rl_event_hook = NULL;
     25  rl_set_keyboard_input_timeout (1000000);
    2326
    24   /* loop forever, checking for completed jobs and ready tasks */
    25   while (1) {
    26     if (Ntest > 5) {
    27       ListJobs ();
    28       Ntest = 0;
    29     }
    30     usleep (10000);
    31     Ntest ++;
    32 
    33     /** test all tasks: ready to test? ready to run? **/
    34     while ((task = NextTask ()) != NULL) {
    35 
    36       /* ready to test? : check exec period */
    37       if (GetTaskTimer(task[0].last) < task[0].exec_period) continue;
    38 
    39       SetCurrentTask (task[0].name);
    40       fprintf (stderr, "trying task %s\n", task[0].name);
    41 
    42       /* ready to run? : run task.exec macro */
    43       if (task[0].exec != NULL) {
    44         status = exec_loop (task[0].exec);
    45         if (!status) continue;
    46       }
    47 
    48       /* is task valid?  check state of task.(argc, argv) */
    49       /*** ADD CODE HERE ***/
    50 
    51       /* construct job from task */
    52       job = CreateJob (task);
    53 
    54       /* execute job - XXX add status test */
    55       SubmitJob (job);
    56 
    57       /* reset timer on task (don't do this if Create/Submit fails)*/
    58       gettimeofday (&task[0].last, (void *) NULL);
    59     }
    60 
    61     /** test all jobs: ready to test?  finished? **/
    62     while ((job = NextJob ()) != NULL) {
    63 
    64       /* check for timeout */
    65       if (GetTaskTimer(job[0].start) >= job[0].task.timeout_period) {
    66         fprintf (stderr, "timeout on %s\n", job[0].task.name);
    67         /* run task.timeout macro, if it exists */
    68         if (job[0].task.timeout != NULL) {
    69           exec_loop (job[0].task.timeout);
    70         }
    71         DeleteJob (job);
    72         continue;
    73       }
    74 
    75       /* check poll period (ready to run again?) */
    76       if (GetTaskTimer(job[0].last) < job[0].task.poll_period) continue;
    77 
    78       /* check current status */
    79       status = CheckJob (job);
    80       switch (status) {
    81         case JOB_BUSY:
    82           fprintf (stderr, "job %s (%d) busy\n", job[0].task.name, job[0].JobID);
    83           break;
    84 
    85         case JOB_CRASH:
    86           fprintf (stderr, "job %s (%d) crash\n", job[0].task.name, job[0].JobID);
    87           /* run task.crash macro, if it exists */
    88           if (job[0].task.crash != NULL) {
    89             exec_loop (job[0].task.crash);
    90           }
    91           DeleteJob (job);
    92           continue;
    93           break;
    94 
    95         case JOB_EXIT:
    96           fprintf (stderr, "job %s (%d) exit\n", job[0].task.name, job[0].JobID);
    97           /* run corresponding task.exit macro, if it exists */
    98           macro = job[0].task.def;
    99           for (i = 0; i < job[0].task.Nexit; i++) {
    100             if (job[0].exit_status == atoi(job[0].task.exit[i][0].name)) {
    101               macro = job[0].task.exit[i];
    102               break;
    103             }
    104           }
    105           if (macro != NULL) exec_loop (macro);
    106           DeleteJob (job);
    107           continue;
    108           break;
    109 
    110         default:
    111           fprintf (stderr, "unknown exit status\n");
    112           /** do something more useful here ?? **/
    113           break;
    114       }
    115 
    116       /* reset polling clock */
    117       SetTaskTimer (&job[0].last);
    118     }
    119   }
    12027  return (TRUE);
    12128}
  • trunk/Ohana/src/opihi/pantasks/scheduler.c

    r2598 r4450  
    4949  rl_readline_name = opihi_name;
    5050  rl_attempted_completion_function = command_completer;
     51  rl_event_hook = NULL;
     52  rl_set_keyboard_input_timeout (1000000);
    5153
    5254  set_str_variable ("HISTORY", opihi_history);
  • trunk/Ohana/src/opihi/pantasks/task_command.c

    r2598 r4450  
    4141  return (TRUE);
    4242}
    43 
    44 
    45 /**
    46     careful with this: the command is supposed to be realized
    47     for the Job, not the Task (at execution)
    48     the code is right, but who calls it when needs to be clarified.
    49  **/
    50 
  • trunk/Ohana/src/opihi/pantasks/task_host.c

    r2598 r4450  
    44int task_host (int argc, char **argv) {
    55
     6  int N, RequiredHost;
    67  Task *task;
    78  char *taskname;
    89
     10  RequiredHost = FALSE;
     11  if (N = get_argument (argc, argv, "-required")) {
     12    remove_argument (N, &argc, argv);
     13    RequiredHost = TRUE;
     14  }
     15
    916  if (argc != 2) {
    10     fprintf (stderr, "USAGE: host <name>\n");
    11     fprintf (stderr, "  (define host machine for this task (or 'none'))\n");
     17    fprintf (stderr, "USAGE: host <name> [-required]\n");
     18    fprintf (stderr, "  define host machine for this task\n");
     19    fprintf (stderr, "  -required flags indicates controller must use this host\n");
     20    fprintf (stderr, "  value of 'local' for host indicates process not using controller\n");
     21    fprintf (stderr, "  value of 'none' for host indicates controller may assign at will\n");
    1222    return (FALSE);
    1323  }
     
    2333    return (FALSE);
    2434  }
     35  task[0].host_required = RequiredHost;
    2536
    2637  if (task[0].host != NULL) free (task[0].host);
    2738  task[0].host = NULL;
    2839
    29   if (!strcasecmp (argv[1], "NONE")) return (TRUE);
     40  if (!strcasecmp (argv[1], "LOCAL")) return (TRUE);
    3041
    3142  task[0].host = strcreate (argv[1]);
  • trunk/Ohana/src/opihi/pclient/ChildOps.c

    r3212 r4450  
    5757      break;
    5858  }
    59 
    6059 
    6160  /* read stderr buffer */
  • trunk/Ohana/src/opihi/pclient/job.c

    r3187 r4450  
    3939    setvbuf (stdout, (char *) NULL, _IONBF, BUFSIZ);
    4040    setvbuf (stderr, (char *) NULL, _IONBF, BUFSIZ);
    41     fprintf (stderr, "child is spawned, executing command\n");
     41    /* fprintf (stderr, "child is spawned, executing command\n"); */
    4242    /* note that this message comes out on the other side of the stderr pipe */
    4343
  • trunk/Ohana/src/opihi/pclient/reset.c

    r3203 r4450  
    6060  return (TRUE);
    6161}
     62
     63/* exit conditions:
     64   -1 - usage message / syntax error
     65    2 - unknown job
     66    0 - process hung
     67    1 - successful resetn
     68*/
     69
  • trunk/Ohana/src/opihi/pclient/stdout.c

    r3187 r4450  
    99  }
    1010 
    11   fprintf (stdout, "NBYTES %d\n", child_stdout.Nbuffer);
    1211  fwrite (child_stdout.buffer, 1, child_stdout.Nbuffer, stdout);
    1312  fprintf (stdout, "STATUS %d\n", 0);
     
    2423  }
    2524 
    26   fprintf (stdout, "NBYTES %d\n", child_stderr.Nbuffer);
    2725  fwrite (child_stderr.buffer, 1, child_stderr.Nbuffer, stdout);
    2826  fprintf (stdout, "STATUS %d\n", 0);
  • trunk/Ohana/src/opihi/pcontrol/CheckBusyJob.c

    r3212 r4450  
    55  int      i;
    66  int      status;
     7  int      outstate;
    78  char    *p;
    89  char     string[64];
     
    1112
    1213  /** must have a valid host : if not? **/
    13   host = job[0].host;
    14   /*** why is this a type error? ***/
     14  host = (Host *) job[0].host;
    1515
    1616  InitIOBuffer (&buffer, 0x100);
     
    3434
    3535    case PCLIENT_GOOD:
    36       fprintf (stderr, "message received (CheckBusyJob)\n"); 
     36      /* fprintf (stderr, "message received (CheckBusyJob)\n");   */
    3737      break;
    3838
     
    6060  }
    6161
    62   /** job has exited : move to DONE stack (host still BUSY) **/
    63   if (!strcmp(string, "EXIT")) {
    64     p = memstr (buffer.buffer, "EXITST", buffer.Nbuffer);
    65     sscanf (p, "%*s %d", &job[0].exit_status);
    66     PutJob (job, PCONTROL_JOB_DONE, STACK_BOTTOM);
    67     job[0].state = PCONTROL_JOB_EXIT;
    68     FreeIOBuffer (&buffer);
    69     return (TRUE);
    70   }
    71   /** job has crashed : move to DONE stack (host still BUSY) */
    72   if (!strcmp(string, "CRASH")) {
    73     p = memstr (buffer.buffer, "EXITST", buffer.Nbuffer);
    74     sscanf (p, "%*s %d", &job[0].exit_status);
    75     PutJob (job, PCONTROL_JOB_DONE, STACK_BOTTOM);
    76     job[0].state = PCONTROL_JOB_CRASH;
    77     FreeIOBuffer (&buffer);
    78     return (TRUE);
     62  /* exit status better be either EXIT or CRASH */
     63  outstate = PCONTROL_JOB_BUSY;
     64  if (!strcmp(string, "EXIT")) outstate = PCONTROL_JOB_EXIT;
     65  if (!strcmp(string, "CRASH")) outstate = PCONTROL_JOB_CRASH;
     66  if (outstate == PCONTROL_JOB_BUSY) {
     67    fprintf (stderr, "programming error : should not reach here (CheckJob)\n");
     68    exit (1);
    7969  }
    8070
    81   fprintf (stderr, "programming error : should not reach here (CheckJob)\n");
    82   exit (1);
     71  /* parse the exit status and sizes of output buffers */
     72  p = memstr (buffer.buffer, "EXITST", buffer.Nbuffer);
     73  sscanf (p, "%*s %d", &job[0].exit_status);
     74  p = memstr (buffer.buffer, "STDOUT", buffer.Nbuffer);
     75  sscanf (p, "%*s %d", &job[0].stdout_size);
     76  p = memstr (buffer.buffer, "STDERR", buffer.Nbuffer);
     77  sscanf (p, "%*s %d", &job[0].stderr_size);
     78
     79  /** job has exited : move to DONE stack (host still BUSY) **/
     80  PutJob (job, PCONTROL_JOB_DONE, STACK_BOTTOM);
     81  job[0].state = outstate;
     82  FreeIOBuffer (&buffer);
     83  return (TRUE);
    8384}
    8485
  • trunk/Ohana/src/opihi/pcontrol/CheckDoneJob.c

    r3203 r4450  
    55  Host *host;
    66
    7   if (!GetJobOutput (job, "stdout")) {
     7  if (!GetJobOutput ("stdout", (Host *) job[0].host, &job[0].stdout, job[0].stdout_size)) {
     8    /* strip off first and last lines */
    89    PutJob (job, PCONTROL_JOB_DONE, STACK_BOTTOM);
    910    return (FALSE);
    1011  }
    1112
    12   if (!GetJobOutput (job, "stderr")) {
     13  if (!GetJobOutput ("stderr", (Host *) job[0].host, &job[0].stderr, job[0].stderr_size)) {
    1314    PutJob (job, PCONTROL_JOB_DONE, STACK_BOTTOM);
    1415    return (FALSE);
  • trunk/Ohana/src/opihi/pcontrol/CheckHost.c

    r3212 r4450  
    1616      /* if host has a job, job is dead, push to Pending */
    1717      if (host[0].stack == PCONTROL_HOST_BUSY) {
    18         job = host[0].job;
    19         N = FindJob (job[0].JobID, PCONTROL_JOB_BUSY);
    20         if (N < 0) {
    21           fprintf (stderr, "error: job is not found in BUSY list\n");
    22           exit (2);
     18        job = (Job *) host[0].job;
     19        if (job != NULL) {
     20          N = FindJob (job[0].JobID, PCONTROL_JOB_BUSY);
     21          if (N < 0) {
     22            fprintf (stderr, "programming error: job is not found in BUSY list\n");
     23            exit (2);
     24          }
     25          job[0].host = NULL; /* unlink host & job */
     26          PutJob (job, PCONTROL_JOB_PENDING, STACK_BOTTOM);
    2327        }
    24         job[0].host = NULL; /* unlink host & job */
    25         job = GetJob (PCONTROL_JOB_BUSY, N);
    26         PutJob (job, PCONTROL_JOB_PENDING, STACK_BOTTOM);
    2728      }
    2829      host[0].job = NULL;
  • trunk/Ohana/src/opihi/pcontrol/CheckIdleHost.c

    r3211 r4450  
    1313    job = (Job *) stack[0].object[i];
    1414    if (job[0].mode != PCONTROL_JOB_NEEDHOST) continue;
    15     if (job[0].hostname == NULL) continue;
     15    if (job[0].hostname == NULL) {
     16      fprintf (stderr, "programming error: NEEDHOST hostname missing\n");
     17      exit (2);
     18    }
    1619    if (strcasecmp (job[0].hostname, host[0].hostname)) continue;
    1720    LinkJobAndHost (job, host);
     
    2427    job = (Job *) stack[0].object[i];
    2528    if (job[0].mode != PCONTROL_JOB_WANTHOST) continue;
    26     if (job[0].hostname == NULL) continue;
     29    if (job[0].hostname == NULL) {
     30      fprintf (stderr, "programming error: WANTHOST hostname missing\n");
     31      exit (2);
     32    }
    2733    if (strcasecmp (job[0].hostname, host[0].hostname)) continue;
    2834    LinkJobAndHost (job, host);
  • trunk/Ohana/src/opihi/pcontrol/GetJobOutput.c

    r3212 r4450  
    11# include "pcontrol.h"
     2# define PCLIENT_TIMEOUT 20
    23
    3 int GetJobOutput (Job *job, char *channel) {
     4/* we read Nbytes from the host, then watch for the prompt */
     5int GetJobOutput (char *command, Host *host, IOBuffer *buffer, int Nbytes) {
    46 
    5   int      status;
    6   IOBuffer *buffer;
    7   Host     *host;
     7  int i, status, Nstart;
     8  char *line;
    89
    9   buffer = NULL;
    10   if (!strcasecmp (channel, "stdout")) buffer = &job[0].stdout;
    11   if (!strcasecmp (channel, "stderr")) buffer = &job[0].stderr;
    12   if (buffer == NULL) {
    13     fprintf (stderr, "invalid output channel : programming error\n");
    14     exit (1);
     10  /* flush any earlier messages */
     11  ReadtoIOBuffer (buffer, host[0].stdout);
     12  FlushIOBuffer (buffer);
     13  Nstart = buffer[0].Nbuffer;
     14
     15  /* send command (stdout / stderr) */
     16  ALLOCATE (line, char, MAX (1, strlen(command) + 1));
     17  sprintf (line, "%s\n", command);
     18  status = write (host[0].stdin, line, strlen(line));
     19  free (line);
     20
     21  /* is pipe still open? */
     22  if ((status == -1) && (errno == EPIPE)) return (PCLIENT_DOWN);
     23
     24  /* read at least Nbytes, then watch for PCLIENT_PROMPT */
     25  line = NULL;
     26  status = -1;
     27  for (i = 0; (i < PCLIENT_TIMEOUT) && (status == -1) && (line == NULL); i++) {
     28    status = ReadtoIOBuffer (buffer, host[0].stdout);
     29    if ((buffer[0].Nbuffer - Nstart) >= Nbytes) {
     30      line = memstr (buffer[0].buffer, PCLIENT_PROMPT, buffer[0].Nbuffer);
     31    }
     32    if (status == -1) usleep (10000);
    1533  }
    16 
    17   /* If we already have the output, just return, don't retry */
    18   if (buffer[0].Nbuffer) return (TRUE);
    19 
    20   /** must have a valid host : if not, move to pending? **/
    21   host = job[0].host;
    22   status = PclientCommand (host, channel, PCLIENT_PROMPT, buffer);
     34  if (status ==  0) return (PCLIENT_DOWN);
     35  if (status == -1) return (PCLIENT_HUNG);
    2336
    2437  /* check on success of pclient command */
    2538  switch (status) {
    26     case PCLIENT_DOWN:
    27       /*** different behavior for ANYHOST, WANTHOST, NEEDHOST ***/
    28       buffer[0].Nbuffer = 0;
     39    case -1:
     40      fprintf (stderr, "host %s is not responding\n", host[0].hostname);
     41      return (FALSE);
     42
     43    case 0:
    2944      fprintf (stderr, "host %s is down\n", host[0].hostname);
    3045      return (FALSE);
    3146
    32     case PCLIENT_HUNG:
    33       /*** should we consider a HUNG host DOWN? ***/
    34       buffer[0].Nbuffer = 0;
    35       fprintf (stderr, "host %s is not responding\n", host[0].hostname);
    36       return (FALSE);
    37 
    38     case PCLIENT_GOOD:
    39       fprintf (stderr, "message received (GetJobOutput : %s)\n", channel); 
     47    default:
     48      fprintf (stderr, "message received (GetJobOutput : %s)\n", command); 
     49      /* drop extra bytes from pclient (not pclient:job) */
     50      buffer[0].Nbuffer = Nstart + Nbytes;
     51      if (buffer[0].Nalloc > buffer[0].Nbuffer) {
     52        bzero (buffer[0].buffer + buffer[0].Nbuffer, buffer[0].Nalloc - buffer[0].Nbuffer);
     53      }
    4054      return (TRUE);
    41 
    42     default:
    43       fprintf (stderr, "unknown status for pclient command: programming error\n"); 
    44       exit (1);
    4555  }
    4656
  • trunk/Ohana/src/opihi/pcontrol/HostOps.c

    r3212 r4450  
    3333  fprintf (stderr, "error: unknown host stack : programming error\n");
    3434  exit (1);
     35}
     36
     37Host *FindHostStack (IDtype HostID) {
     38
     39  Host *host;
     40
     41  host = FindHostPtr (HostID, PCONTROL_HOST_IDLE);
     42  if (host != NULL) return (host);
     43
     44  host = FindHostPtr (HostID, PCONTROL_HOST_DOWN);
     45  if (host != NULL) return (host);
     46
     47  host = FindHostPtr (HostID, PCONTROL_HOST_DONE);
     48  if (host != NULL) return (host);
     49
     50  host = FindHostPtr (HostID, PCONTROL_HOST_BUSY);
     51  if (host != NULL) return (host);
     52
     53  host = FindHostPtr (HostID, PCONTROL_HOST_OFF);
     54  if (host != NULL) return (host);
     55
     56  return (NULL);
    3557}
    3658
     
    79101}
    80102
     103Host *FindHostPtr (IDtype HostID, int StackID) {
     104
     105  int i;
     106  Host *host;
     107  Stack *stack;
     108
     109  stack = GetHostStack (StackID);
     110  if (stack == NULL) return (NULL);
     111
     112  for (i = 0; i < stack[0].Nobject; i++) {
     113    host = (Host *) stack[0].object[i];
     114    if (host[0].HostID == HostID) {
     115      return (host);
     116    }
     117  }
     118  return (NULL);
     119}
     120
     121Host *PullHost (IDtype HostID, int StackID) {
     122 
     123  int N;
     124  Host *host;
     125
     126  N = FindHost (HostID, StackID);
     127  if (N < 0) return (NULL);
     128
     129  host = GetHost (StackID, N);
     130  if (host == NULL) {
     131    fprintf (stderr, "programming error! host missing from stack\n");
     132    exit (1);
     133  }
     134  return (host);
     135}
     136
    81137int FindNamedHost (char *name, int StackID) {
    82138
     
    119175  FREE (host);
    120176}
    121 
  • trunk/Ohana/src/opihi/pcontrol/JobOps.c

    r3212 r4450  
    3535}
    3636
     37Job *FindJobStack (IDtype JobID) {
     38
     39  Job *job;
     40
     41  job = FindJobPtr (JobID, PCONTROL_JOB_PENDING);
     42  if (job != NULL) return (job);
     43
     44  job = FindJobPtr (JobID, PCONTROL_JOB_BUSY);
     45  if (job != NULL) return (job);
     46
     47  job = FindJobPtr (JobID, PCONTROL_JOB_EXIT);
     48  if (job != NULL) return (job);
     49
     50  job = FindJobPtr (JobID, PCONTROL_JOB_CRASH);
     51  if (job != NULL) return (job);
     52
     53  job = FindJobPtr (JobID, PCONTROL_JOB_DONE);
     54  if (job != NULL) return (job);
     55
     56  return (NULL);
     57}
     58
     59/* add job to position in stack */
    3760int PutJob (Job *job, int StackID, int where) {
    3861
     
    5174}
    5275 
     76/* remove job from position in stack */
    5377Job *GetJob (int StackID, int where) {
    5478
     
    6387}
    6488 
     89/* return stack position of job */
    6590int FindJob (IDtype JobID, int StackID) {
    6691
     
    81106}
    82107
     108/* return pointer to job */
     109Job *FindJobPtr (IDtype JobID, int StackID) {
     110
     111  int i;
     112  Job *job;
     113  Stack *stack;
     114
     115  stack = GetJobStack (StackID);
     116  if (stack == NULL) return (NULL);
     117
     118  for (i = 0; i < stack[0].Nobject; i++) {
     119    job = (Job *) stack[0].object[i];
     120    if (job[0].JobID == JobID) {
     121      return (job);
     122    }
     123  }
     124  return (NULL);
     125}
     126
     127/* remove job from stack, return pointer */
     128Job *PullJob (IDtype JobID, int StackID) {
     129 
     130  int N;
     131  Job *job;
     132
     133  N = FindJob (JobID, StackID);
     134  if (N < 0) return (NULL);
     135
     136  job = GetJob (StackID, N);
     137  if (job == NULL) {
     138    fprintf (stderr, "programming error! job missing from stack\n");
     139    exit (1);
     140  }
     141  return (job);
     142}
     143
    83144IDtype AddJob (char *hostname, JobMode mode, int timeout, int argc, char **argv) {
    84145
     
    89150  job[0].argc     = argc;
    90151  job[0].argv     = argv;
    91   job[0].hostname = strcreate (hostname);
     152  job[0].hostname = hostname;
    92153  job[0].mode     = mode;
    93154  job[0].host     = NULL;
     
    119180}
    120181
    121 # if (0)
    122 void KillJob (Job *job) {
    123 
    124   int status;
    125   char line[64];
    126   Host *host;
    127 
    128   sprintf (line, "reset\n");
    129 
    130   /* send command to client */
    131   status = write (host[0].stdin, line, strlen(line));
    132   if ((status == -1) && (errno == EPIPE)) {
    133     fprintf (stderr, "host %s is down\n", host[0].hostname);
    134     PutHost (host, PCONTROL_HOST_DOWN, STACK_BOTTOM);
    135     PutJob (job, PCONTROL_JOB_PENDING, STACK_BOTTOM);
    136     return (FALSE);
    137   }
    138   /* check for a response */
    139 
    140   /** this needs to be cleaned up to handle the slow response case **/
    141 }
    142 # endif
    143 
     182/* unlink job and host, pull host from its stack */
    144183Host *UnlinkJobAndHost (Job *job) {
    145184
     
    147186  Host *host;
    148187
    149   host = job[0].host;
     188  host = (Host *) job[0].host;
     189  if (host == NULL) {
     190    fprintf (stderr, "programming error: job has no host\n");
     191    exit (2);
     192  }
    150193
    151194  /* unlink host & job */
     
    153196  host[0].job = NULL;
    154197 
    155   /*** need to pop host off of correct stack XXX ***/
    156   N = FindHost (host[0].HostID, host[0].stack);
    157   if (N < 0) {
     198  /* remove host from correct stack */
     199  if (PullHost (host[0].HostID, host[0].stack) == NULL) {
    158200    fprintf (stderr, "programming error: host is not found in current stack\n");
    159201    exit (2);
    160202  }
    161   host = GetHost (host[0].stack, N);
    162203  return (host);
    163204}
     
    166207  int N;
    167208
    168   job[0].host = host;
    169   host[0].job = job;
    170 
    171 # if (0)
    172   /*** need to pop host off of correct stack XXX ***/
    173   N = FindHost (host[0].HostID, host[0].stack);
    174   if (N < 0) {
    175     fprintf (stderr, "programming error: host is not found in current stack\n");
    176     exit (2);
    177   }
    178   host = GetHost (host[0].stack, N);
    179 # endif
    180 
    181   /*** need to pop job off of correct stack XXX ***/
    182   N = FindJob (job[0].JobID, job[0].stack);
    183   if (N < 0) {
     209  job[0].host = (struct Host *) host;
     210  host[0].job = (struct Job *) job;
     211
     212  /* remove job from correct stack */
     213  if (PullJob (job[0].JobID, job[0].stack) == NULL) {
    184214    fprintf (stderr, "programming error: job is not found in current stack\n");
    185215    exit (2);
    186216  }
    187   job = GetJob (job[0].stack, N);
    188 
    189   /*** this is fairly crazy : the only reason this works is cause I
    190        get the same host ptr here and in CheckIdleHost
    191   ***/
    192 }
     217}
  • trunk/Ohana/src/opihi/pcontrol/Makefile

    r3525 r4450  
    2020LIBS2   =       -lbasiccmd -lshell -ldata
    2121LIBS    =       $(LIBS2) $(LIBS1)
    22 CFLAGS  =       $(INCS)
     22CFLAGS  =       $(INCS) -Werror
    2323CCFLAGS =       $(LIBS)
    2424
     
    4646$(SDIR)/StartHost.$(ARCH).o \
    4747$(SDIR)/StopHosts.$(ARCH).o \
     48$(SDIR)/KillJob.$(ARCH).o \
    4849$(SDIR)/StartJob.$(ARCH).o
    4950
    5051cmds = \
     52$(SDIR)/kill.$(ARCH).o \
     53$(SDIR)/delete.$(ARCH).o \
     54$(SDIR)/check.$(ARCH).o \
    5155$(SDIR)/job.$(ARCH).o \
    5256$(SDIR)/status.$(ARCH).o \
     57$(SDIR)/stdout.$(ARCH).o \
    5358$(SDIR)/host.$(ARCH).o
    5459
  • trunk/Ohana/src/opihi/pcontrol/ResetJob.c

    r3212 r4450  
    88
    99  /** must have a valid host : if not, move to pending? **/
    10   host = job[0].host;
     10  host = (Host *) job[0].host;
    1111
    1212  InitIOBuffer (&buffer, 0x100);
  • trunk/Ohana/src/opihi/pcontrol/StartJob.c

    r3212 r4450  
    1111
    1212  /* job must have assigned host */
    13   host = job[0].host;
     13  host = (Host *) job[0].host;
    1414  if (host == NULL) {
    1515    fprintf (stderr, "no assigned host : programming error\n");
  • trunk/Ohana/src/opihi/pcontrol/StopHosts.c

    r3212 r4450  
    114114  CLOSE (host[0].stderr);
    115115  host[0].job = NULL;
    116   PutHost (host, PCONTROL_HOST_DOWN, STACK_BOTTOM);
     116  PutHost (host, PCONTROL_HOST_OFF, STACK_BOTTOM);
    117117}
  • trunk/Ohana/src/opihi/pcontrol/init.c

    r3203 r4450  
    11# include "opihi.h"
    22
    3 int job             PROTO((int, char **));
    4 int host            PROTO((int, char **));
    5 int status          PROTO((int, char **));
     3int kill_pc     PROTO((int, char **));
     4int delete      PROTO((int, char **));
     5int job         PROTO((int, char **));
     6int host        PROTO((int, char **));
     7int check       PROTO((int, char **));
     8int status      PROTO((int, char **));
     9int stdout_pc   PROTO((int, char **));
     10int stderr_pc   PROTO((int, char **));
    611
    712static Command cmds[] = { 
    8   {"job",       job,      "add job"},
    9   {"host",      host,     "add / delete / modify host"},
    10   {"status",    status,   "get system status"},
     13  {"kill",      kill_pc,   "kill job"},
     14  {"delete",    delete,    "delete job"},
     15  {"job",       job,       "add job"},
     16  {"host",      host,      "add / delete / modify host"},
     17  {"check",     check,     "get job or host status"},
     18  {"status",    status,    "get system status"},
     19  {"stdout",    stdout_pc, "get stdout buffer for job"},
     20  {"stderr",    stderr_pc, "get stderr buffer for job"},
    1121};
    1222
  • trunk/Ohana/src/opihi/pcontrol/job.c

    r3212 r4450  
    2626    Mode = PCONTROL_JOB_NEEDHOST;
    2727  }
     28  if (Host == NULL) Host = strcreate ("anyhost");
    2829 
    2930  Timeout = 100;
     
    4748
    4849  JobID = AddJob (Host, Mode, Timeout, targc, targv);
    49   FREE (Host);
     50  fprintf (stdout, "JobID: %d\n", JobID);
    5051  return (TRUE);
    5152}
  • trunk/Ohana/src/opihi/pcontrol/kill.c

    r3189 r4450  
    11# include "pcontrol.h"
    22
    3 int kill (int argc, char **argv) {
     3int kill_pc (int argc, char **argv) {
    44
    55  Job *job;
     
    1212  JobID = atoi (argv[1]);
    1313
    14   N = FindJob (JobID, PCONTROL_JOB_BUSY);
    15   if (N < 0) {
     14  job = PullJob (JobID, PCONTROL_JOB_BUSY);
     15  if (job == NULL) {
    1616    fprintf (stderr, "job %s not BUSY\n", argv[1]);
    1717    return (FALSE);
    1818  }
    19   job = GetJob (PCONTROL_JOB_BUSY, N);
    2019  KillJob (job);
    2120  return (TRUE);
    2221}
    23 
    24 int delete (int argc, char **argv) {
    25 
    26   Job *job;
    27   int JobID, N;
    28 
    29   if (argc < 2) {
    30     fprintf (stderr, "USAGE: delete (JobID)\n");
    31     return (FALSE);
    32   }
    33   JobID = atoi (argv[1]);
    34   /* use a string interp to convert JobIDs to ints ? */
    35 
    36   N = FindJob (JobID, PCONTROL_JOB_PENDING);
    37   if (N >= 0) {
    38     job = GetJob (PCONTROL_JOB_PENDING, N);
    39     DelJob (job);
    40     return (TRUE);
    41   }
    42   N = FindJob (JobID, PCONTROL_JOB_CRASH);
    43   if (N >= 0) {
    44     job = GetJob (PCONTROL_JOB_CRASH, N);
    45     DelJob (job);
    46     return (TRUE);
    47   }
    48   N = FindJob (JobID, PCONTROL_JOB_EXIT);
    49   if (N >= 0) {
    50     job = GetJob (PCONTROL_JOB_EXIT, N);
    51     DelJob (job);
    52     return (TRUE);
    53   }
    54 
    55   fprintf (stderr, "job %s not PENDING, CRASH, EXIT\n", argv[1]);
    56   return (FALSE);
    57 }
  • trunk/Ohana/src/opihi/pcontrol/pclient.c

    r3212 r4450  
    1313  /* send command to client (adding on \n) */
    1414  /* fprintf (stderr, "send: %s (%d)\n", command, buffer[0].Nbuffer); */
    15   ALLOCATE (line, char, MAX (1, strlen(command)));
     15  ALLOCATE (line, char, MAX (1, strlen(command) + 1));
    1616  sprintf (line, "%s\n", command);
    1717  status = write (host[0].stdin, line, strlen(line));
  • trunk/Ohana/src/opihi/pcontrol/pcontrol.c

    r3212 r4450  
    4545  rl_attempted_completion_function = command_completer;
    4646  rl_event_hook = CheckSystem;
    47   /* rl_set_keyboard_input_timeout (2000000);  */
     47  rl_set_keyboard_input_timeout (1000000);
    4848
    4949  set_str_variable ("HISTORY", opihi_history);
  • trunk/Ohana/src/opihi/pcontrol/rconnect.c

    r3211 r4450  
    7979    status = ReadtoIOBuffer (&buffer, stdout_fd[0]);
    8080    p = memstr (buffer.buffer, "CONNECTED", buffer.Nbuffer);
    81     fprintf (stderr, "%d  %d  %s\n", i, buffer.Nbuffer, buffer.buffer);
     81    /* fprintf (stderr, "%d  %d  %s\n", i, buffer.Nbuffer, buffer.buffer); */
    8282    usleep (20000);
    8383  }
     84  fprintf (stderr, "%d cycles to connect\n", i);
    8485  if (status == 0) goto pipe_error;
    8586  if (status == -1) goto io_error;
Note: See TracChangeset for help on using the changeset viewer.