Changeset 4450
- Timestamp:
- Jul 4, 2005, 5:35:47 PM (21 years ago)
- Location:
- trunk/Ohana/src/opihi
- Files:
-
- 11 added
- 34 edited
-
doc/pcontrol.txt (modified) (3 diffs)
-
doc/scheduler.txt (modified) (3 diffs)
-
include/pcontrol.h (modified) (4 diffs)
-
include/scheduler.h (modified) (6 diffs)
-
pantasks/CheckJobs.c (added)
-
pantasks/CheckSystem.c (added)
-
pantasks/CheckTasks.c (added)
-
pantasks/ControllerOps.c (added)
-
pantasks/IOBufferOps.c (added)
-
pantasks/JobOps.c (modified) (9 diffs)
-
pantasks/LocalJob.c (modified) (1 diff)
-
pantasks/Makefile (modified) (2 diffs)
-
pantasks/TaskOps.c (modified) (3 diffs)
-
pantasks/controller.c (added)
-
pantasks/init.c (modified) (2 diffs)
-
pantasks/memstr.c (added)
-
pantasks/run.c (modified) (2 diffs)
-
pantasks/scheduler.c (modified) (1 diff)
-
pantasks/task_command.c (modified) (1 diff)
-
pantasks/task_host.c (modified) (2 diffs)
-
pclient/ChildOps.c (modified) (1 diff)
-
pclient/job.c (modified) (1 diff)
-
pclient/reset.c (modified) (1 diff)
-
pclient/stdout.c (modified) (2 diffs)
-
pcontrol/CheckBusyJob.c (modified) (4 diffs)
-
pcontrol/CheckDoneJob.c (modified) (1 diff)
-
pcontrol/CheckHost.c (modified) (1 diff)
-
pcontrol/CheckIdleHost.c (modified) (2 diffs)
-
pcontrol/GetJobOutput.c (modified) (1 diff)
-
pcontrol/HostOps.c (modified) (3 diffs)
-
pcontrol/JobOps.c (modified) (9 diffs)
-
pcontrol/KillJob.c (added)
-
pcontrol/Makefile (modified) (2 diffs)
-
pcontrol/ResetJob.c (modified) (1 diff)
-
pcontrol/StartJob.c (modified) (1 diff)
-
pcontrol/StopHosts.c (modified) (1 diff)
-
pcontrol/check.c (added)
-
pcontrol/delete.c (added)
-
pcontrol/init.c (modified) (1 diff)
-
pcontrol/job.c (modified) (2 diffs)
-
pcontrol/kill.c (modified) (2 diffs)
-
pcontrol/pclient.c (modified) (1 diff)
-
pcontrol/pcontrol.c (modified) (1 diff)
-
pcontrol/rconnect.c (modified) (1 diff)
-
pcontrol/stdout.c (added)
Legend:
- Unmodified
- Added
- Removed
-
trunk/Ohana/src/opihi/doc/pcontrol.txt
r3335 r4450 80 80 can set the CheckTask function to this hook (& unset it). 81 81 82 83 82 --- 84 83 … … 116 115 status -queues 117 116 117 pcontrol may be given a timeout for each job. pcontrol will monitor a 118 job and kill/crash it if the timeout expires. the timeout only 119 governs how long it is allowed to execute, not how long it can sit in 120 the queue. (the scheduler / operator should decide if a job has been 121 on pcontrol for too long -- this probably means there are no 122 appropriate machines ). 123 124 pcontrol currently does not distinguish between multiple instances of 125 a single host. all have the same name. if you want to bring down a 126 host, you need to issue N host -down commands. perhaps this is 127 silly. a simple alternative would be for the host [-on -off -start 128 -stop] commands to apply to all defined entries which match the 129 hostname. In this case, a command like 'host foo -off' would find and 130 halt all connections to the machine 'foo', while 'host foo -on' would 131 restart them all (or rather, given the functionality of pcontrol, 132 would allow pcontrol to attempt to bring them on). 133 134 It is not clear why a user should be able to execute 'start' (down -> 135 idle) or 'stop' (idle -> down). The transition down -> idle is 136 automatically performed by pcontrol for any machines which are 137 currently down, while the transition idle -> down is immediately 138 followed by an attempt by pcontrol to move the host from down -> idle. 139 140 does it makes sense to kill all jobs on a host? this would only have 141 the effect of clearing the host for a moment until pcontrol decided to 142 start another job on that host. the desired effect is gained putting 143 the host to 'off'. 118 144 145 currently the command 'host (hostname)' puts the host in 'down' 146 state. pcontrol then immediately tries to connect to the host, moving 147 it to 'idle' state (and then 'busy' if any jobs are available). it 148 might be useful to be able to add a host in 'off' state as a starting 149 point. 150 151 it is not obvious that the user should be able to run 'CheckHost', 152 unless this gets expanded to return state information on the host. 153 119 154 --- 120 155 … … 137 172 PENDING -> BUSY : StartJob 138 173 PENDING -> DEL : DelJob 139 BUSY -> EXIT : CheckJob 140 BUSY -> CRASH : CheckJob | KillJob 174 BUSY -> DONE : CheckBusyJob | KillJob 175 DONE -> EXIT : CheckDoneJob 176 DONE -> CRASH : CheckDoneJob 141 177 BUSY -> PENDING : CheckJob | CheckHost 142 178 EXIT -> DEL : DelJob -
trunk/Ohana/src/opihi/doc/scheduler.txt
r3335 r4450 1 2 --- 3 4 sched / pcontrol todo: 5 6 - sched: validate task hosts with controller 7 - pcontrol: host (name) -check should return state 8 - sched: start / stop controller connection? 9 - pcontrol: add command to check status of a job 10 - pcontrol: add command to dump stdout/stderr for a job 11 12 --- 1 13 2 14 scheduler commands: … … 24 36 (in task, command line is static; in task.macro, command line is expanded for each instance) 25 37 26 host (machine) 38 host (machine) [-required] 27 39 - defines preferred host 28 40 - may be in task or in task.macro (exit/exec) … … 61 73 --- 62 74 75 scheduler / controller / local interactions 76 77 scheduler can execute local jobs and jobs on the controller. 78 scheduler needs to initiate the connection to the controller, if the 79 controller is being used. it also needs to fork each local command. 80 81 we need a command to define the controller hosts to be used. if a 82 task or job defines a host which is not known, how is this caught? 83 does sched need to keep a list of valid hosts, or should the host be 84 passed down to the controller. Also, the scheduler should not 85 initiate a connection to the controller unless it is requested. is 86 this implicit when defining a host? 87 88 - controller host foobar 89 90 this should setup the controller interface (if it does not currently 91 exist), and send the command 'host foobar'. scheduler should not 92 care if the connection is successful or not (we can know about hosts 93 which are currently off). 94 95 96 --- 63 97 controller interaction: 64 98 -
trunk/Ohana/src/opihi/include/pcontrol.h
r3212 r4450 72 72 int exit_status; 73 73 int Reset; 74 int stdout_size; 75 int stderr_size; 74 76 JobMode mode; 75 77 JobStat state; … … 128 130 Host *GetHost (int StackID, int where); 129 131 int FindHost (IDtype HostID, int StackID); 132 Host *FindHostPtr (IDtype HostID, int StackID); 133 Host *FindHostStack (IDtype HostID); 134 Host *PullHost (IDtype HostID, int StackID); 130 135 int FindNamedHost (char *name, int StackID); 131 136 void AddHost (char *hostname); … … 149 154 int CheckDoneHost (Host *host); 150 155 int CheckDoneJob (Job *job); 151 int GetJobOutput ( Job *job, char *channel);156 int GetJobOutput (char *command, Host *host, IOBuffer *buffer, int Nbytes); 152 157 int ResetJob (Job *job); 153 158 int PclientCommand (Host *host, char *command, char *response, IOBuffer *buffer); … … 159 164 Job *GetJob (int StackID, int where); 160 165 int FindJob (IDtype JobID, int StackID); 166 Job *FindJobPtr (IDtype JobID, int StackID); 167 Job *FindJobStack (IDtype JobID); 168 Job *PullJob (IDtype JobID, int StackID); 161 169 IDtype AddJob (char *hostname, JobMode mode, int timeout, int argc, char **argv); 162 voidKillJob (Job *job);170 int KillJob (Job *job); 163 171 void DelJob (Job *job); 164 172 Host *UnlinkJobAndHost (Job *job); -
trunk/Ohana/src/opihi/include/scheduler.h
r3143 r4450 1 # include "external.h" 2 # include "shell.h" 3 # include "dvomath.h" 4 # include "convert.h" 5 # include "display.h" 6 # include <sys/types.h> 7 # include <sys/wait.h> 1 8 2 enum {JOB_BUSY, JOB_EXIT, JOB_CRASH}; 9 typedef enum { 10 JOB_NONE, 11 JOB_BUSY, 12 JOB_EXIT, 13 JOB_CRASH, 14 JOB_PENDING, 15 } JobStat; 16 17 typedef enum { 18 JOB_LOCAL, 19 JOB_CONTROLLER, 20 } JobMode; 21 22 typedef enum { 23 CONTROLLER_HUNG = -1, 24 CONTROLLER_DOWN = 0, 25 CONTROLLER_GOOD = 1, 26 } ControllerStat; 27 28 /* socket / pipe communication buffer */ 29 typedef struct { 30 char *buffer; 31 int Nalloc; 32 int Nreset; 33 int Nblock; 34 int Nbuffer; 35 } IOBuffer; 3 36 4 37 /* a task is a description of the wrapping of a job */ … … 16 49 char **argv; 17 50 char *host; 51 int host_required; 18 52 19 53 char *name; … … 28 62 typedef struct { 29 63 int JobID; /* internal ID for job */ 30 int PID; /* external ID for job */ 31 32 Task task; 64 int pid; /* external ID for job */ 33 65 34 66 struct timeval last; … … 36 68 int state; 37 69 int exit_status; 70 71 int argc; 72 char **argv; 73 Task *task; 74 75 IOBuffer stdout; /* stdout storage buffer */ 76 IOBuffer stderr; /* stderr storage buffer */ 77 JobMode mode; /* local or controller? */ 78 79 int stdout_size; /* stdout pipe (local only) */ 80 int stderr_size; /* stderr pipe (local only) */ 81 int stdout_fd; /* stdout pipe (local only) */ 82 int stderr_fd; /* stderr pipe (local only) */ 38 83 } Job; 39 84 85 # define CONTROLLER_PROMPT "pclient:" 40 86 41 87 /* scheduler prototypes */ … … 50 96 int TaskHash (char *input); 51 97 int ShowTask (char *name); 98 char *memstr (char *m1, char *m2, int n); 52 99 53 100 Job *NextJob (); … … 65 112 void SetTaskTimer (struct timeval *timer); 66 113 double GetTaskTimer (struct timeval start); 114 115 int CheckJobs (); 116 int CheckTasks (); 117 int CheckSystem (); 118 int GetJobOutput (char *channel, int pid, IOBuffer *buffer, int Nbytes); 119 int CheckControllerJob (Job *job); 120 int CheckControllerJobStatus (Job *job); 121 int SubmitControllerJob (Job *job); 122 int StartController (); 123 int ControllerCommand (char *command, char *response, IOBuffer *buffer); 124 int SubmitLocalJob (Job *job); 125 int CheckLocalJob (Job *job); 126 int CheckLocalJobStatus (Job *job); 127 void InitTaskTimers (); -
trunk/Ohana/src/opihi/pantasks/JobOps.c
r3140 r4450 1 # include "opihi.h"2 1 # include "scheduler.h" 3 2 … … 9 8 10 9 static Job *jobs; 11 static int Njobs;12 static int NJOBS;10 static int Njobs; 11 static int NJOBS; 13 12 14 13 static char *JobName = dot; … … 65 64 66 65 for (i = 0; i < Njobs; i++) { 67 fprintf (stderr, "%d: %-15s %5d %20s (%x)\n", Njobs, jobs[i].task .name, jobs[i].JobID, jobs[i].task.argv[0], jobs[i].task.argv);66 fprintf (stderr, "%d: %-15s %5d %20s (%x)\n", Njobs, jobs[i].task[0].name, jobs[i].JobID, jobs[i].argv[0], jobs[i].argv); 68 67 } 69 68 return; … … 75 74 int i; 76 75 77 /* try for an exact match first*/76 /* return job with matching JobID */ 78 77 for (i = 0; i < Njobs; i++) { 79 78 if (jobs[i].JobID == JobID) { … … 90 89 91 90 jobs[Njobs].JobID = NextJobID (); 92 jobs[Njobs].PID = 0; 93 jobs[Njobs].task = task[0]; 94 95 /* we need our own copy of task[0].argv */ 96 ALLOCATE (jobs[Njobs].task.argv, char *, MAX (jobs[Njobs].task.argc, 1)); 97 for (i = 0; i < jobs[Njobs].task.argc; i++) { 98 jobs[Njobs].task.argv[i] = strcreate (task[0].argv[i]); 99 } 100 jobs[Njobs].task.host = strcreate (task[0].host); 91 jobs[Njobs].pid = 0; 92 jobs[Njobs].mode = JOB_LOCAL; 93 if (task[0].host != NULL) { 94 jobs[Njobs].mode = JOB_CONTROLLER; 95 } 96 97 /* we need our own copy of task[0].argv 98 * argc is the number of valid args, like the usual command line. 99 * we allocate one extra element, with value 0 to be passed to execvp 100 */ 101 jobs[Njobs].argc = task[0].argc; 102 ALLOCATE (jobs[Njobs].argv, char *, MAX (task[0].argc + 1, 1)); 103 for (i = 0; i < task[0].argc; i++) { 104 jobs[Njobs].argv[i] = strcreate (task[0].argv[i]); 105 } 106 jobs[Njobs].argv[i] = 0; 107 108 /* other data from the task is needed by the job 109 we carry a pointer back to the task. this means we 110 cannot modify the task once a job is created, or the changes will 111 be applied to the existing jobs */ 112 113 jobs[Njobs].task = task; 114 115 /* if we decide we need to be able to dynamically set task qualities 116 (like host, timeouts, etc), the we will need to have matched 117 entries to these quantites in the job structure */ 101 118 102 119 Njobs ++; … … 108 125 } 109 126 127 void FreeJob (Job *job) { 128 129 int i; 130 131 if (job == NULL) return; 132 133 if ((job[0].JobID >= 0) || (job[0].JobID < MAX_N_JOBS)) { 134 JobIDList[job[0].JobID] = FALSE; 135 } 136 137 for (i = 0; i < job[0].argc; i++) { 138 free (job[0].argv[i]); 139 } 140 free (job[0].argv); 141 return; 142 } 143 110 144 void SetCurrentJob (char *name) { 111 145 JobName = name; … … 114 148 char *GetCurrentJob () { 115 149 return (JobName); 116 }117 118 void FreeJob (Job *job) {119 120 int i;121 122 if (job == NULL) return;123 124 if ((job[0].JobID >= 0) || (job[0].JobID < MAX_N_JOBS)) {125 JobIDList[job[0].JobID] = FALSE;126 }127 128 for (i = 0; i < job[0].task.argc; i++) {129 free (job[0].task.argv[i]);130 }131 free (job[0].task.argv);132 free (job[0].task.host);133 return;134 150 } 135 151 … … 161 177 } 162 178 163 /* this needs to:164 1) distinguish local from controller jobs165 2) fork the local jobs in the background166 3) send the controller jobs to the controller */167 168 179 int SubmitJob (Job *job) { 169 180 170 int i, Nchar; 171 char *string; 172 173 Nchar = 0; 174 for (i = 0; i < job[0].task.argc; i++) { 175 Nchar += strlen (job[0].task.argv[i]) + 1; 176 } 177 ALLOCATE (string, char, Nchar); 178 bzero (string, Nchar); 179 180 strcat (string, job[0].task.argv[0]); 181 for (i = 1; i < job[0].task.argc; i++) { 182 strcat (string, " "); 183 strcat (string, job[0].task.argv[i]); 184 } 185 186 fprintf (stderr, "executing job %d ...%s...\n", job[0].JobID, string); 187 free (string); 181 if (job[0].mode == JOB_LOCAL) { 182 SubmitLocalJob (job); 183 } else { 184 SubmitControllerJob (job); 185 } 188 186 189 187 /* reset clock for start and poll-test */ … … 196 194 int CheckJob (Job *job) { 197 195 198 float f; 199 f = drand48 (); 200 201 if ((0.00 < f) && (f < 0.25)) { 202 job[0].state = JOB_BUSY; 203 job[0].exit_status = -1; 204 return (JOB_BUSY); 205 } 206 if ((0.25 < f) && (f < 0.50)) { 207 job[0].state = JOB_BUSY; 208 job[0].exit_status = -1; 209 return (JOB_CRASH); 210 } 211 if ((0.50 < f) && (f < 0.75)) { 212 job[0].state = JOB_BUSY; 213 job[0].exit_status = 0; 214 return (JOB_EXIT); 215 } 216 if ((0.75 < f) && (f < 1.00)) { 217 job[0].state = JOB_BUSY; 218 job[0].exit_status = 1; 219 return (JOB_EXIT); 220 } 221 } 196 /* add checks for timeouts */ 197 198 if (job[0].mode == JOB_LOCAL) { 199 CheckLocalJob (job); 200 } else { 201 CheckControllerJob (job); 202 } 203 return (job[0].state); 204 } -
trunk/Ohana/src/opihi/pantasks/LocalJob.c
r3392 r4450 1 # include "scheduler.h" 1 2 2 /* local jobs are forked in the background */ 3 /* this could be written a just a one-way pipe */ 4 int SubmitLocalJob (Job *job) { 3 5 4 SubmitLocalJob (Job *job) { 6 int status, pid; 7 int stdout_fd[2], stderr_fd[2]; 5 8 6 /* 7 construct the command line 8 fork the command, get back the PID 9 increment local job counter 10 */ 9 bzero (stdout_fd, 2*sizeof(int)); 10 bzero (stderr_fd, 2*sizeof(int)); 11 11 12 if (pipe (stdout_fd) < 0) goto pipe_error; 13 if (pipe (stderr_fd) < 0) goto pipe_error; 14 15 pid = fork (); 16 if (!pid) { /* must be child process */ 17 fprintf (stderr, "starting controller connection\n"); 18 19 /* close the other ends of the pipes */ 20 close (stdout_fd[0]); 21 close (stderr_fd[0]); 22 23 /* tie our ends of the pipes to stdin, stdout, stderr */ 24 dup2 (stdout_fd[1], STDOUT_FILENO); 25 dup2 (stderr_fd[1], STDERR_FILENO); 26 27 /* set all three unblocking */ 28 setvbuf (stdout, (char *) NULL, _IONBF, BUFSIZ); 29 setvbuf (stderr, (char *) NULL, _IONBF, BUFSIZ); 30 31 status = execvp (job[0].argv[0], job[0].argv); 32 exit (1); 33 } 34 35 /* close the other ends of the pipes */ 36 close (stdout_fd[1]); stdout_fd[1] = 0; 37 close (stderr_fd[1]); stderr_fd[1] = 0; 38 39 /* make the pipes non-blocking */ 40 fcntl (stdout_fd[0], F_SETFL, O_NONBLOCK); 41 fcntl (stderr_fd[0], F_SETFL, O_NONBLOCK); 42 43 job[0].stdout_fd = stdout_fd[0]; 44 job[0].stderr_fd = stderr_fd[0]; 45 job[0].pid = pid; 46 47 return (TRUE); 48 49 pipe_error: 50 perror ("pipe error:"); 51 if (stdout_fd[0] != 0) close (stdout_fd[0]); 52 if (stdout_fd[1] != 0) close (stdout_fd[1]); 53 if (stderr_fd[0] != 0) close (stderr_fd[0]); 54 if (stderr_fd[1] != 0) close (stderr_fd[1]); 55 return (FALSE); 12 56 } 13 57 14 CheckLocalJob (Job *job) { 58 /* update current state, drain stdout/stderr buffers */ 59 int CheckLocalJob (Job *job) { 15 60 16 /* 17 61 int Nread; 62 63 // XXX do something useful with exit status? 64 CheckLocalJobStatus (job); 65 66 /* read stdout buffer */ 67 Nread = ReadtoIOBuffer (&job[0].stdout, job[0].stdout_fd); 68 switch (Nread) { 69 case -2: /* error in read (programming error? system level error?) */ 70 fprintf (stderr, "serious IO error\n"); 71 exit (2); 72 case -1: /* no data in pipe */ 73 break; 74 case 0: /* pipe is closed */ 75 /** change child state? **/ 76 break; 77 default: /* data in pipe */ 78 break; 79 } 80 81 /* read stderr buffer */ 82 Nread = ReadtoIOBuffer (&job[0].stderr, job[0].stderr_fd); 83 switch (Nread) { 84 case -2: /* error in read (programming error? system level error?) */ 85 fprintf (stderr, "serious IO error\n"); 86 exit (2); 87 case -1: /* no data in pipe */ 88 break; 89 case 0: /* pipe is closed */ 90 /** change child state? **/ 91 break; 92 default: /* data in pipe */ 93 break; 94 } 95 return (TRUE); 96 } 97 98 int CheckLocalJobStatus (Job *job) { 99 100 int result, waitstatus; 101 102 /* check local job status */ 103 result = waitpid (job[0].pid, &waitstatus, WNOHANG); 104 switch (result) { 105 case -1: /* error with waitpid */ 106 switch (errno) { 107 case ECHILD: 108 fprintf (stderr, "unknown PID, not a child proc\n"); 109 fprintf (stderr, "did process already exit? programming error?\n"); 110 job[0].state = JOB_NONE; 111 job[0].exit_status = 0; 112 return (FALSE); 113 case EINVAL: 114 fprintf (stderr, "error EINVAL (waitpid): programming error\n"); 115 exit (1); 116 case EINTR: 117 fprintf (stderr, "error EINTR (waitpid): programming error\n"); 118 exit (1); 119 default: 120 fprintf (stderr, "unknown error for waitpid (%d): programming error\n", errno); 121 exit (1); 122 } 123 break; 124 125 case 0: /* process not exited */ 126 job[0].state = JOB_BUSY; 127 job[0].exit_status = 0; 128 return (TRUE); 129 130 default: 131 if (result != job[0].pid) { 132 fprintf (stderr, "waitpid error: mis-matched PID (%d vs %d). programming error\n", result, job[0].pid); 133 exit (1); 134 } 135 136 if (WIFEXITED(waitstatus)) { 137 job[0].state = JOB_EXIT; 138 job[0].exit_status = WEXITSTATUS(waitstatus); 139 } 140 if (WIFSIGNALED(waitstatus)) { 141 job[0].state = JOB_CRASH; 142 job[0].exit_status = WTERMSIG(waitstatus); 143 } 144 if (WIFSTOPPED(waitstatus)) { 145 fprintf (stderr, "waitpid returns 'stopped': programming error\n"); 146 exit (1); 147 } 148 } 149 return; 150 } -
trunk/Ohana/src/opihi/pantasks/Makefile
r3525 r4450 25 25 # sched user commands and support functions ######################## 26 26 27 sched = \ 27 funcs = \ 28 $(SDIR)/CheckJobs.$(ARCH).o \ 29 $(SDIR)/CheckSystem.$(ARCH).o \ 30 $(SDIR)/CheckTasks.$(ARCH).o \ 31 $(SDIR)/ControllerOps.$(ARCH).o \ 32 $(SDIR)/LocalJob.$(ARCH).o \ 33 $(SDIR)/JobOps.$(ARCH).o \ 34 $(SDIR)/TaskOps.$(ARCH).o \ 35 $(SDIR)/IOBufferOps.$(ARCH).o \ 36 $(SDIR)/memstr.$(ARCH).o \ 37 $(SDIR)/init.$(ARCH).o 38 39 cmds = \ 40 $(SDIR)/controller.$(ARCH).o \ 28 41 $(SDIR)/run.$(ARCH).o \ 42 $(SDIR)/scheduler.$(ARCH).o \ 29 43 $(SDIR)/task.$(ARCH).o \ 30 44 $(SDIR)/task_command.$(ARCH).o \ 31 45 $(SDIR)/task_host.$(ARCH).o \ 32 46 $(SDIR)/task_macros.$(ARCH).o \ 33 $(SDIR)/task_periods.$(ARCH).o \ 34 $(SDIR)/JobOps.$(ARCH).o \ 35 $(SDIR)/TaskOps.$(ARCH).o \ 36 $(SDIR)/init.$(ARCH).o \ 37 $(SDIR)/scheduler.$(ARCH).o 47 $(SDIR)/task_periods.$(ARCH).o 38 48 39 49 libs = \ … … 49 59 @echo done 50 60 51 $(BIN)/scheduler.$(ARCH) : $( sched) $(libs)61 $(BIN)/scheduler.$(ARCH) : $(funcs) $(cmds) $(libs) 52 62 53 63 install: $(DESTBIN)/scheduler -
trunk/Ohana/src/opihi/pantasks/TaskOps.c
r3140 r4450 108 108 109 109 tasks[Ntasks].host = NULL; 110 tasks[Ntasks].host_required = FALSE; 110 111 111 112 tasks[Ntasks].argc = 0; … … 124 125 tasks[Ntasks].poll_period = 1.0; 125 126 tasks[Ntasks].timeout_period = 1.0; 127 128 /* init task timer (is reset by 'run') */ 129 gettimeofday (&tasks[Ntasks].last, (void *) NULL); 126 130 127 131 Ntasks ++; … … 155 159 gettimeofday (timer, (void *) NULL); 156 160 } 161 162 /* start the clock for all tasks */ 163 void InitTaskTimers () { 164 165 Task *task; 166 167 while ((task = NextTask ()) != NULL) { 168 gettimeofday (&task[0].last, (void *) NULL); 169 } 170 } -
trunk/Ohana/src/opihi/pantasks/init.c
r3140 r4450 2 2 # include "scheduler.h" 3 3 4 int controller PROTO((int, char **)); 4 5 int task PROTO((int, char **)); 5 6 int task_host PROTO((int, char **)); … … 8 9 int task_periods PROTO((int, char **)); 9 10 int run PROTO((int, char **)); 11 int stop PROTO((int, char **)); 10 12 11 13 static Command cmds[] = { 12 {"task", task, "define a schedulable task"}, 13 {"host", task_host, "define host machine for a task"}, 14 {"task.exit", task_macros, "define exit macros for a task"}, 15 {"task.exec", task_macros, "define pre-exec macro for a task"}, 16 {"command", task_command, "define executed command for a task"}, 17 {"periods", task_periods, "define time scales for a task"}, 18 {"run", run, "run the scheduler"}, 14 {"controller", controller, "controller commands"}, 15 {"task", task, "define a schedulable task"}, 16 {"host", task_host, "define host machine for a task"}, 17 {"task.exit", task_macros, "define exit macros for a task"}, 18 {"task.exec", task_macros, "define pre-exec macro for a task"}, 19 {"command", task_command, "define executed command for a task"}, 20 {"periods", task_periods, "define time scales for a task"}, 21 {"run", run, "run the scheduler"}, 22 {"stop", stop, "stop the scheduler"}, 19 23 }; 20 24 -
trunk/Ohana/src/opihi/pantasks/run.c
r3140 r4450 1 # include "basic.h"2 1 # include "scheduler.h" 3 2 4 3 int run (int argc, char **argv) { 5 6 Job *job;7 Task *task;8 Macro *macro;9 int i, found, status;10 int Ntest;11 4 12 5 if (argc != 1) { … … 15 8 } 16 9 17 /* start the clock for all tasks */ 18 while ((task = NextTask ()) != NULL) { 19 gettimeofday (&task[0].last, (void *) NULL); 10 InitTaskTimers (); 11 rl_event_hook = CheckSystem; 12 rl_set_keyboard_input_timeout (1000000); 13 14 return (TRUE); 15 } 16 17 int stop (int argc, char **argv) { 18 19 if (argc != 1) { 20 fprintf (stderr, "USAGE: stop\n"); 21 return (FALSE); 20 22 } 21 23 22 Ntest = 0; 24 rl_event_hook = NULL; 25 rl_set_keyboard_input_timeout (1000000); 23 26 24 /* loop forever, checking for completed jobs and ready tasks */25 while (1) {26 if (Ntest > 5) {27 ListJobs ();28 Ntest = 0;29 }30 usleep (10000);31 Ntest ++;32 33 /** test all tasks: ready to test? ready to run? **/34 while ((task = NextTask ()) != NULL) {35 36 /* ready to test? : check exec period */37 if (GetTaskTimer(task[0].last) < task[0].exec_period) continue;38 39 SetCurrentTask (task[0].name);40 fprintf (stderr, "trying task %s\n", task[0].name);41 42 /* ready to run? : run task.exec macro */43 if (task[0].exec != NULL) {44 status = exec_loop (task[0].exec);45 if (!status) continue;46 }47 48 /* is task valid? check state of task.(argc, argv) */49 /*** ADD CODE HERE ***/50 51 /* construct job from task */52 job = CreateJob (task);53 54 /* execute job - XXX add status test */55 SubmitJob (job);56 57 /* reset timer on task (don't do this if Create/Submit fails)*/58 gettimeofday (&task[0].last, (void *) NULL);59 }60 61 /** test all jobs: ready to test? finished? **/62 while ((job = NextJob ()) != NULL) {63 64 /* check for timeout */65 if (GetTaskTimer(job[0].start) >= job[0].task.timeout_period) {66 fprintf (stderr, "timeout on %s\n", job[0].task.name);67 /* run task.timeout macro, if it exists */68 if (job[0].task.timeout != NULL) {69 exec_loop (job[0].task.timeout);70 }71 DeleteJob (job);72 continue;73 }74 75 /* check poll period (ready to run again?) */76 if (GetTaskTimer(job[0].last) < job[0].task.poll_period) continue;77 78 /* check current status */79 status = CheckJob (job);80 switch (status) {81 case JOB_BUSY:82 fprintf (stderr, "job %s (%d) busy\n", job[0].task.name, job[0].JobID);83 break;84 85 case JOB_CRASH:86 fprintf (stderr, "job %s (%d) crash\n", job[0].task.name, job[0].JobID);87 /* run task.crash macro, if it exists */88 if (job[0].task.crash != NULL) {89 exec_loop (job[0].task.crash);90 }91 DeleteJob (job);92 continue;93 break;94 95 case JOB_EXIT:96 fprintf (stderr, "job %s (%d) exit\n", job[0].task.name, job[0].JobID);97 /* run corresponding task.exit macro, if it exists */98 macro = job[0].task.def;99 for (i = 0; i < job[0].task.Nexit; i++) {100 if (job[0].exit_status == atoi(job[0].task.exit[i][0].name)) {101 macro = job[0].task.exit[i];102 break;103 }104 }105 if (macro != NULL) exec_loop (macro);106 DeleteJob (job);107 continue;108 break;109 110 default:111 fprintf (stderr, "unknown exit status\n");112 /** do something more useful here ?? **/113 break;114 }115 116 /* reset polling clock */117 SetTaskTimer (&job[0].last);118 }119 }120 27 return (TRUE); 121 28 } -
trunk/Ohana/src/opihi/pantasks/scheduler.c
r2598 r4450 49 49 rl_readline_name = opihi_name; 50 50 rl_attempted_completion_function = command_completer; 51 rl_event_hook = NULL; 52 rl_set_keyboard_input_timeout (1000000); 51 53 52 54 set_str_variable ("HISTORY", opihi_history); -
trunk/Ohana/src/opihi/pantasks/task_command.c
r2598 r4450 41 41 return (TRUE); 42 42 } 43 44 45 /**46 careful with this: the command is supposed to be realized47 for the Job, not the Task (at execution)48 the code is right, but who calls it when needs to be clarified.49 **/50 -
trunk/Ohana/src/opihi/pantasks/task_host.c
r2598 r4450 4 4 int task_host (int argc, char **argv) { 5 5 6 int N, RequiredHost; 6 7 Task *task; 7 8 char *taskname; 8 9 10 RequiredHost = FALSE; 11 if (N = get_argument (argc, argv, "-required")) { 12 remove_argument (N, &argc, argv); 13 RequiredHost = TRUE; 14 } 15 9 16 if (argc != 2) { 10 fprintf (stderr, "USAGE: host <name>\n"); 11 fprintf (stderr, " (define host machine for this task (or 'none'))\n"); 17 fprintf (stderr, "USAGE: host <name> [-required]\n"); 18 fprintf (stderr, " define host machine for this task\n"); 19 fprintf (stderr, " -required flags indicates controller must use this host\n"); 20 fprintf (stderr, " value of 'local' for host indicates process not using controller\n"); 21 fprintf (stderr, " value of 'none' for host indicates controller may assign at will\n"); 12 22 return (FALSE); 13 23 } … … 23 33 return (FALSE); 24 34 } 35 task[0].host_required = RequiredHost; 25 36 26 37 if (task[0].host != NULL) free (task[0].host); 27 38 task[0].host = NULL; 28 39 29 if (!strcasecmp (argv[1], " NONE")) return (TRUE);40 if (!strcasecmp (argv[1], "LOCAL")) return (TRUE); 30 41 31 42 task[0].host = strcreate (argv[1]); -
trunk/Ohana/src/opihi/pclient/ChildOps.c
r3212 r4450 57 57 break; 58 58 } 59 60 59 61 60 /* read stderr buffer */ -
trunk/Ohana/src/opihi/pclient/job.c
r3187 r4450 39 39 setvbuf (stdout, (char *) NULL, _IONBF, BUFSIZ); 40 40 setvbuf (stderr, (char *) NULL, _IONBF, BUFSIZ); 41 fprintf (stderr, "child is spawned, executing command\n");41 /* fprintf (stderr, "child is spawned, executing command\n"); */ 42 42 /* note that this message comes out on the other side of the stderr pipe */ 43 43 -
trunk/Ohana/src/opihi/pclient/reset.c
r3203 r4450 60 60 return (TRUE); 61 61 } 62 63 /* exit conditions: 64 -1 - usage message / syntax error 65 2 - unknown job 66 0 - process hung 67 1 - successful resetn 68 */ 69 -
trunk/Ohana/src/opihi/pclient/stdout.c
r3187 r4450 9 9 } 10 10 11 fprintf (stdout, "NBYTES %d\n", child_stdout.Nbuffer);12 11 fwrite (child_stdout.buffer, 1, child_stdout.Nbuffer, stdout); 13 12 fprintf (stdout, "STATUS %d\n", 0); … … 24 23 } 25 24 26 fprintf (stdout, "NBYTES %d\n", child_stderr.Nbuffer);27 25 fwrite (child_stderr.buffer, 1, child_stderr.Nbuffer, stdout); 28 26 fprintf (stdout, "STATUS %d\n", 0); -
trunk/Ohana/src/opihi/pcontrol/CheckBusyJob.c
r3212 r4450 5 5 int i; 6 6 int status; 7 int outstate; 7 8 char *p; 8 9 char string[64]; … … 11 12 12 13 /** must have a valid host : if not? **/ 13 host = job[0].host; 14 /*** why is this a type error? ***/ 14 host = (Host *) job[0].host; 15 15 16 16 InitIOBuffer (&buffer, 0x100); … … 34 34 35 35 case PCLIENT_GOOD: 36 fprintf (stderr, "message received (CheckBusyJob)\n");36 /* fprintf (stderr, "message received (CheckBusyJob)\n"); */ 37 37 break; 38 38 … … 60 60 } 61 61 62 /** job has exited : move to DONE stack (host still BUSY) **/ 63 if (!strcmp(string, "EXIT")) { 64 p = memstr (buffer.buffer, "EXITST", buffer.Nbuffer); 65 sscanf (p, "%*s %d", &job[0].exit_status); 66 PutJob (job, PCONTROL_JOB_DONE, STACK_BOTTOM); 67 job[0].state = PCONTROL_JOB_EXIT; 68 FreeIOBuffer (&buffer); 69 return (TRUE); 70 } 71 /** job has crashed : move to DONE stack (host still BUSY) */ 72 if (!strcmp(string, "CRASH")) { 73 p = memstr (buffer.buffer, "EXITST", buffer.Nbuffer); 74 sscanf (p, "%*s %d", &job[0].exit_status); 75 PutJob (job, PCONTROL_JOB_DONE, STACK_BOTTOM); 76 job[0].state = PCONTROL_JOB_CRASH; 77 FreeIOBuffer (&buffer); 78 return (TRUE); 62 /* exit status better be either EXIT or CRASH */ 63 outstate = PCONTROL_JOB_BUSY; 64 if (!strcmp(string, "EXIT")) outstate = PCONTROL_JOB_EXIT; 65 if (!strcmp(string, "CRASH")) outstate = PCONTROL_JOB_CRASH; 66 if (outstate == PCONTROL_JOB_BUSY) { 67 fprintf (stderr, "programming error : should not reach here (CheckJob)\n"); 68 exit (1); 79 69 } 80 70 81 fprintf (stderr, "programming error : should not reach here (CheckJob)\n"); 82 exit (1); 71 /* parse the exit status and sizes of output buffers */ 72 p = memstr (buffer.buffer, "EXITST", buffer.Nbuffer); 73 sscanf (p, "%*s %d", &job[0].exit_status); 74 p = memstr (buffer.buffer, "STDOUT", buffer.Nbuffer); 75 sscanf (p, "%*s %d", &job[0].stdout_size); 76 p = memstr (buffer.buffer, "STDERR", buffer.Nbuffer); 77 sscanf (p, "%*s %d", &job[0].stderr_size); 78 79 /** job has exited : move to DONE stack (host still BUSY) **/ 80 PutJob (job, PCONTROL_JOB_DONE, STACK_BOTTOM); 81 job[0].state = outstate; 82 FreeIOBuffer (&buffer); 83 return (TRUE); 83 84 } 84 85 -
trunk/Ohana/src/opihi/pcontrol/CheckDoneJob.c
r3203 r4450 5 5 Host *host; 6 6 7 if (!GetJobOutput (job, "stdout")) { 7 if (!GetJobOutput ("stdout", (Host *) job[0].host, &job[0].stdout, job[0].stdout_size)) { 8 /* strip off first and last lines */ 8 9 PutJob (job, PCONTROL_JOB_DONE, STACK_BOTTOM); 9 10 return (FALSE); 10 11 } 11 12 12 if (!GetJobOutput ( job, "stderr")) {13 if (!GetJobOutput ("stderr", (Host *) job[0].host, &job[0].stderr, job[0].stderr_size)) { 13 14 PutJob (job, PCONTROL_JOB_DONE, STACK_BOTTOM); 14 15 return (FALSE); -
trunk/Ohana/src/opihi/pcontrol/CheckHost.c
r3212 r4450 16 16 /* if host has a job, job is dead, push to Pending */ 17 17 if (host[0].stack == PCONTROL_HOST_BUSY) { 18 job = host[0].job; 19 N = FindJob (job[0].JobID, PCONTROL_JOB_BUSY); 20 if (N < 0) { 21 fprintf (stderr, "error: job is not found in BUSY list\n"); 22 exit (2); 18 job = (Job *) host[0].job; 19 if (job != NULL) { 20 N = FindJob (job[0].JobID, PCONTROL_JOB_BUSY); 21 if (N < 0) { 22 fprintf (stderr, "programming error: job is not found in BUSY list\n"); 23 exit (2); 24 } 25 job[0].host = NULL; /* unlink host & job */ 26 PutJob (job, PCONTROL_JOB_PENDING, STACK_BOTTOM); 23 27 } 24 job[0].host = NULL; /* unlink host & job */25 job = GetJob (PCONTROL_JOB_BUSY, N);26 PutJob (job, PCONTROL_JOB_PENDING, STACK_BOTTOM);27 28 } 28 29 host[0].job = NULL; -
trunk/Ohana/src/opihi/pcontrol/CheckIdleHost.c
r3211 r4450 13 13 job = (Job *) stack[0].object[i]; 14 14 if (job[0].mode != PCONTROL_JOB_NEEDHOST) continue; 15 if (job[0].hostname == NULL) continue; 15 if (job[0].hostname == NULL) { 16 fprintf (stderr, "programming error: NEEDHOST hostname missing\n"); 17 exit (2); 18 } 16 19 if (strcasecmp (job[0].hostname, host[0].hostname)) continue; 17 20 LinkJobAndHost (job, host); … … 24 27 job = (Job *) stack[0].object[i]; 25 28 if (job[0].mode != PCONTROL_JOB_WANTHOST) continue; 26 if (job[0].hostname == NULL) continue; 29 if (job[0].hostname == NULL) { 30 fprintf (stderr, "programming error: WANTHOST hostname missing\n"); 31 exit (2); 32 } 27 33 if (strcasecmp (job[0].hostname, host[0].hostname)) continue; 28 34 LinkJobAndHost (job, host); -
trunk/Ohana/src/opihi/pcontrol/GetJobOutput.c
r3212 r4450 1 1 # include "pcontrol.h" 2 # define PCLIENT_TIMEOUT 20 2 3 3 int GetJobOutput (Job *job, char *channel) { 4 /* we read Nbytes from the host, then watch for the prompt */ 5 int GetJobOutput (char *command, Host *host, IOBuffer *buffer, int Nbytes) { 4 6 5 int status; 6 IOBuffer *buffer; 7 Host *host; 7 int i, status, Nstart; 8 char *line; 8 9 9 buffer = NULL; 10 if (!strcasecmp (channel, "stdout")) buffer = &job[0].stdout; 11 if (!strcasecmp (channel, "stderr")) buffer = &job[0].stderr; 12 if (buffer == NULL) { 13 fprintf (stderr, "invalid output channel : programming error\n"); 14 exit (1); 10 /* flush any earlier messages */ 11 ReadtoIOBuffer (buffer, host[0].stdout); 12 FlushIOBuffer (buffer); 13 Nstart = buffer[0].Nbuffer; 14 15 /* send command (stdout / stderr) */ 16 ALLOCATE (line, char, MAX (1, strlen(command) + 1)); 17 sprintf (line, "%s\n", command); 18 status = write (host[0].stdin, line, strlen(line)); 19 free (line); 20 21 /* is pipe still open? */ 22 if ((status == -1) && (errno == EPIPE)) return (PCLIENT_DOWN); 23 24 /* read at least Nbytes, then watch for PCLIENT_PROMPT */ 25 line = NULL; 26 status = -1; 27 for (i = 0; (i < PCLIENT_TIMEOUT) && (status == -1) && (line == NULL); i++) { 28 status = ReadtoIOBuffer (buffer, host[0].stdout); 29 if ((buffer[0].Nbuffer - Nstart) >= Nbytes) { 30 line = memstr (buffer[0].buffer, PCLIENT_PROMPT, buffer[0].Nbuffer); 31 } 32 if (status == -1) usleep (10000); 15 33 } 16 17 /* If we already have the output, just return, don't retry */ 18 if (buffer[0].Nbuffer) return (TRUE); 19 20 /** must have a valid host : if not, move to pending? **/ 21 host = job[0].host; 22 status = PclientCommand (host, channel, PCLIENT_PROMPT, buffer); 34 if (status == 0) return (PCLIENT_DOWN); 35 if (status == -1) return (PCLIENT_HUNG); 23 36 24 37 /* check on success of pclient command */ 25 38 switch (status) { 26 case PCLIENT_DOWN: 27 /*** different behavior for ANYHOST, WANTHOST, NEEDHOST ***/ 28 buffer[0].Nbuffer = 0; 39 case -1: 40 fprintf (stderr, "host %s is not responding\n", host[0].hostname); 41 return (FALSE); 42 43 case 0: 29 44 fprintf (stderr, "host %s is down\n", host[0].hostname); 30 45 return (FALSE); 31 46 32 case PCLIENT_HUNG: 33 /*** should we consider a HUNG host DOWN? ***/ 34 buffer[0].Nbuffer = 0; 35 fprintf (stderr, "host %s is not responding\n", host[0].hostname); 36 return (FALSE); 37 38 case PCLIENT_GOOD: 39 fprintf (stderr, "message received (GetJobOutput : %s)\n", channel); 47 default: 48 fprintf (stderr, "message received (GetJobOutput : %s)\n", command); 49 /* drop extra bytes from pclient (not pclient:job) */ 50 buffer[0].Nbuffer = Nstart + Nbytes; 51 if (buffer[0].Nalloc > buffer[0].Nbuffer) { 52 bzero (buffer[0].buffer + buffer[0].Nbuffer, buffer[0].Nalloc - buffer[0].Nbuffer); 53 } 40 54 return (TRUE); 41 42 default:43 fprintf (stderr, "unknown status for pclient command: programming error\n");44 exit (1);45 55 } 46 56 -
trunk/Ohana/src/opihi/pcontrol/HostOps.c
r3212 r4450 33 33 fprintf (stderr, "error: unknown host stack : programming error\n"); 34 34 exit (1); 35 } 36 37 Host *FindHostStack (IDtype HostID) { 38 39 Host *host; 40 41 host = FindHostPtr (HostID, PCONTROL_HOST_IDLE); 42 if (host != NULL) return (host); 43 44 host = FindHostPtr (HostID, PCONTROL_HOST_DOWN); 45 if (host != NULL) return (host); 46 47 host = FindHostPtr (HostID, PCONTROL_HOST_DONE); 48 if (host != NULL) return (host); 49 50 host = FindHostPtr (HostID, PCONTROL_HOST_BUSY); 51 if (host != NULL) return (host); 52 53 host = FindHostPtr (HostID, PCONTROL_HOST_OFF); 54 if (host != NULL) return (host); 55 56 return (NULL); 35 57 } 36 58 … … 79 101 } 80 102 103 Host *FindHostPtr (IDtype HostID, int StackID) { 104 105 int i; 106 Host *host; 107 Stack *stack; 108 109 stack = GetHostStack (StackID); 110 if (stack == NULL) return (NULL); 111 112 for (i = 0; i < stack[0].Nobject; i++) { 113 host = (Host *) stack[0].object[i]; 114 if (host[0].HostID == HostID) { 115 return (host); 116 } 117 } 118 return (NULL); 119 } 120 121 Host *PullHost (IDtype HostID, int StackID) { 122 123 int N; 124 Host *host; 125 126 N = FindHost (HostID, StackID); 127 if (N < 0) return (NULL); 128 129 host = GetHost (StackID, N); 130 if (host == NULL) { 131 fprintf (stderr, "programming error! host missing from stack\n"); 132 exit (1); 133 } 134 return (host); 135 } 136 81 137 int FindNamedHost (char *name, int StackID) { 82 138 … … 119 175 FREE (host); 120 176 } 121 -
trunk/Ohana/src/opihi/pcontrol/JobOps.c
r3212 r4450 35 35 } 36 36 37 Job *FindJobStack (IDtype JobID) { 38 39 Job *job; 40 41 job = FindJobPtr (JobID, PCONTROL_JOB_PENDING); 42 if (job != NULL) return (job); 43 44 job = FindJobPtr (JobID, PCONTROL_JOB_BUSY); 45 if (job != NULL) return (job); 46 47 job = FindJobPtr (JobID, PCONTROL_JOB_EXIT); 48 if (job != NULL) return (job); 49 50 job = FindJobPtr (JobID, PCONTROL_JOB_CRASH); 51 if (job != NULL) return (job); 52 53 job = FindJobPtr (JobID, PCONTROL_JOB_DONE); 54 if (job != NULL) return (job); 55 56 return (NULL); 57 } 58 59 /* add job to position in stack */ 37 60 int PutJob (Job *job, int StackID, int where) { 38 61 … … 51 74 } 52 75 76 /* remove job from position in stack */ 53 77 Job *GetJob (int StackID, int where) { 54 78 … … 63 87 } 64 88 89 /* return stack position of job */ 65 90 int FindJob (IDtype JobID, int StackID) { 66 91 … … 81 106 } 82 107 108 /* return pointer to job */ 109 Job *FindJobPtr (IDtype JobID, int StackID) { 110 111 int i; 112 Job *job; 113 Stack *stack; 114 115 stack = GetJobStack (StackID); 116 if (stack == NULL) return (NULL); 117 118 for (i = 0; i < stack[0].Nobject; i++) { 119 job = (Job *) stack[0].object[i]; 120 if (job[0].JobID == JobID) { 121 return (job); 122 } 123 } 124 return (NULL); 125 } 126 127 /* remove job from stack, return pointer */ 128 Job *PullJob (IDtype JobID, int StackID) { 129 130 int N; 131 Job *job; 132 133 N = FindJob (JobID, StackID); 134 if (N < 0) return (NULL); 135 136 job = GetJob (StackID, N); 137 if (job == NULL) { 138 fprintf (stderr, "programming error! job missing from stack\n"); 139 exit (1); 140 } 141 return (job); 142 } 143 83 144 IDtype AddJob (char *hostname, JobMode mode, int timeout, int argc, char **argv) { 84 145 … … 89 150 job[0].argc = argc; 90 151 job[0].argv = argv; 91 job[0].hostname = strcreate (hostname);152 job[0].hostname = hostname; 92 153 job[0].mode = mode; 93 154 job[0].host = NULL; … … 119 180 } 120 181 121 # if (0) 122 void KillJob (Job *job) { 123 124 int status; 125 char line[64]; 126 Host *host; 127 128 sprintf (line, "reset\n"); 129 130 /* send command to client */ 131 status = write (host[0].stdin, line, strlen(line)); 132 if ((status == -1) && (errno == EPIPE)) { 133 fprintf (stderr, "host %s is down\n", host[0].hostname); 134 PutHost (host, PCONTROL_HOST_DOWN, STACK_BOTTOM); 135 PutJob (job, PCONTROL_JOB_PENDING, STACK_BOTTOM); 136 return (FALSE); 137 } 138 /* check for a response */ 139 140 /** this needs to be cleaned up to handle the slow response case **/ 141 } 142 # endif 143 182 /* unlink job and host, pull host from its stack */ 144 183 Host *UnlinkJobAndHost (Job *job) { 145 184 … … 147 186 Host *host; 148 187 149 host = job[0].host; 188 host = (Host *) job[0].host; 189 if (host == NULL) { 190 fprintf (stderr, "programming error: job has no host\n"); 191 exit (2); 192 } 150 193 151 194 /* unlink host & job */ … … 153 196 host[0].job = NULL; 154 197 155 /*** need to pop host off of correct stack XXX ***/ 156 N = FindHost (host[0].HostID, host[0].stack); 157 if (N < 0) { 198 /* remove host from correct stack */ 199 if (PullHost (host[0].HostID, host[0].stack) == NULL) { 158 200 fprintf (stderr, "programming error: host is not found in current stack\n"); 159 201 exit (2); 160 202 } 161 host = GetHost (host[0].stack, N);162 203 return (host); 163 204 } … … 166 207 int N; 167 208 168 job[0].host = host; 169 host[0].job = job; 170 171 # if (0) 172 /*** need to pop host off of correct stack XXX ***/ 173 N = FindHost (host[0].HostID, host[0].stack); 174 if (N < 0) { 175 fprintf (stderr, "programming error: host is not found in current stack\n"); 176 exit (2); 177 } 178 host = GetHost (host[0].stack, N); 179 # endif 180 181 /*** need to pop job off of correct stack XXX ***/ 182 N = FindJob (job[0].JobID, job[0].stack); 183 if (N < 0) { 209 job[0].host = (struct Host *) host; 210 host[0].job = (struct Job *) job; 211 212 /* remove job from correct stack */ 213 if (PullJob (job[0].JobID, job[0].stack) == NULL) { 184 214 fprintf (stderr, "programming error: job is not found in current stack\n"); 185 215 exit (2); 186 216 } 187 job = GetJob (job[0].stack, N); 188 189 /*** this is fairly crazy : the only reason this works is cause I 190 get the same host ptr here and in CheckIdleHost 191 ***/ 192 } 217 } -
trunk/Ohana/src/opihi/pcontrol/Makefile
r3525 r4450 20 20 LIBS2 = -lbasiccmd -lshell -ldata 21 21 LIBS = $(LIBS2) $(LIBS1) 22 CFLAGS = $(INCS) 22 CFLAGS = $(INCS) -Werror 23 23 CCFLAGS = $(LIBS) 24 24 … … 46 46 $(SDIR)/StartHost.$(ARCH).o \ 47 47 $(SDIR)/StopHosts.$(ARCH).o \ 48 $(SDIR)/KillJob.$(ARCH).o \ 48 49 $(SDIR)/StartJob.$(ARCH).o 49 50 50 51 cmds = \ 52 $(SDIR)/kill.$(ARCH).o \ 53 $(SDIR)/delete.$(ARCH).o \ 54 $(SDIR)/check.$(ARCH).o \ 51 55 $(SDIR)/job.$(ARCH).o \ 52 56 $(SDIR)/status.$(ARCH).o \ 57 $(SDIR)/stdout.$(ARCH).o \ 53 58 $(SDIR)/host.$(ARCH).o 54 59 -
trunk/Ohana/src/opihi/pcontrol/ResetJob.c
r3212 r4450 8 8 9 9 /** must have a valid host : if not, move to pending? **/ 10 host = job[0].host;10 host = (Host *) job[0].host; 11 11 12 12 InitIOBuffer (&buffer, 0x100); -
trunk/Ohana/src/opihi/pcontrol/StartJob.c
r3212 r4450 11 11 12 12 /* job must have assigned host */ 13 host = job[0].host;13 host = (Host *) job[0].host; 14 14 if (host == NULL) { 15 15 fprintf (stderr, "no assigned host : programming error\n"); -
trunk/Ohana/src/opihi/pcontrol/StopHosts.c
r3212 r4450 114 114 CLOSE (host[0].stderr); 115 115 host[0].job = NULL; 116 PutHost (host, PCONTROL_HOST_ DOWN, STACK_BOTTOM);116 PutHost (host, PCONTROL_HOST_OFF, STACK_BOTTOM); 117 117 } -
trunk/Ohana/src/opihi/pcontrol/init.c
r3203 r4450 1 1 # include "opihi.h" 2 2 3 int job PROTO((int, char **)); 4 int host PROTO((int, char **)); 5 int status PROTO((int, char **)); 3 int kill_pc PROTO((int, char **)); 4 int delete PROTO((int, char **)); 5 int job PROTO((int, char **)); 6 int host PROTO((int, char **)); 7 int check PROTO((int, char **)); 8 int status PROTO((int, char **)); 9 int stdout_pc PROTO((int, char **)); 10 int stderr_pc PROTO((int, char **)); 6 11 7 12 static Command cmds[] = { 8 {"job", job, "add job"}, 9 {"host", host, "add / delete / modify host"}, 10 {"status", status, "get system status"}, 13 {"kill", kill_pc, "kill job"}, 14 {"delete", delete, "delete job"}, 15 {"job", job, "add job"}, 16 {"host", host, "add / delete / modify host"}, 17 {"check", check, "get job or host status"}, 18 {"status", status, "get system status"}, 19 {"stdout", stdout_pc, "get stdout buffer for job"}, 20 {"stderr", stderr_pc, "get stderr buffer for job"}, 11 21 }; 12 22 -
trunk/Ohana/src/opihi/pcontrol/job.c
r3212 r4450 26 26 Mode = PCONTROL_JOB_NEEDHOST; 27 27 } 28 if (Host == NULL) Host = strcreate ("anyhost"); 28 29 29 30 Timeout = 100; … … 47 48 48 49 JobID = AddJob (Host, Mode, Timeout, targc, targv); 49 FREE (Host);50 fprintf (stdout, "JobID: %d\n", JobID); 50 51 return (TRUE); 51 52 } -
trunk/Ohana/src/opihi/pcontrol/kill.c
r3189 r4450 1 1 # include "pcontrol.h" 2 2 3 int kill (int argc, char **argv) {3 int kill_pc (int argc, char **argv) { 4 4 5 5 Job *job; … … 12 12 JobID = atoi (argv[1]); 13 13 14 N = FindJob (JobID, PCONTROL_JOB_BUSY);15 if ( N < 0) {14 job = PullJob (JobID, PCONTROL_JOB_BUSY); 15 if (job == NULL) { 16 16 fprintf (stderr, "job %s not BUSY\n", argv[1]); 17 17 return (FALSE); 18 18 } 19 job = GetJob (PCONTROL_JOB_BUSY, N);20 19 KillJob (job); 21 20 return (TRUE); 22 21 } 23 24 int delete (int argc, char **argv) {25 26 Job *job;27 int JobID, N;28 29 if (argc < 2) {30 fprintf (stderr, "USAGE: delete (JobID)\n");31 return (FALSE);32 }33 JobID = atoi (argv[1]);34 /* use a string interp to convert JobIDs to ints ? */35 36 N = FindJob (JobID, PCONTROL_JOB_PENDING);37 if (N >= 0) {38 job = GetJob (PCONTROL_JOB_PENDING, N);39 DelJob (job);40 return (TRUE);41 }42 N = FindJob (JobID, PCONTROL_JOB_CRASH);43 if (N >= 0) {44 job = GetJob (PCONTROL_JOB_CRASH, N);45 DelJob (job);46 return (TRUE);47 }48 N = FindJob (JobID, PCONTROL_JOB_EXIT);49 if (N >= 0) {50 job = GetJob (PCONTROL_JOB_EXIT, N);51 DelJob (job);52 return (TRUE);53 }54 55 fprintf (stderr, "job %s not PENDING, CRASH, EXIT\n", argv[1]);56 return (FALSE);57 } -
trunk/Ohana/src/opihi/pcontrol/pclient.c
r3212 r4450 13 13 /* send command to client (adding on \n) */ 14 14 /* fprintf (stderr, "send: %s (%d)\n", command, buffer[0].Nbuffer); */ 15 ALLOCATE (line, char, MAX (1, strlen(command) ));15 ALLOCATE (line, char, MAX (1, strlen(command) + 1)); 16 16 sprintf (line, "%s\n", command); 17 17 status = write (host[0].stdin, line, strlen(line)); -
trunk/Ohana/src/opihi/pcontrol/pcontrol.c
r3212 r4450 45 45 rl_attempted_completion_function = command_completer; 46 46 rl_event_hook = CheckSystem; 47 /* rl_set_keyboard_input_timeout (2000000); */47 rl_set_keyboard_input_timeout (1000000); 48 48 49 49 set_str_variable ("HISTORY", opihi_history); -
trunk/Ohana/src/opihi/pcontrol/rconnect.c
r3211 r4450 79 79 status = ReadtoIOBuffer (&buffer, stdout_fd[0]); 80 80 p = memstr (buffer.buffer, "CONNECTED", buffer.Nbuffer); 81 fprintf (stderr, "%d %d %s\n", i, buffer.Nbuffer, buffer.buffer);81 /* fprintf (stderr, "%d %d %s\n", i, buffer.Nbuffer, buffer.buffer); */ 82 82 usleep (20000); 83 83 } 84 fprintf (stderr, "%d cycles to connect\n", i); 84 85 if (status == 0) goto pipe_error; 85 86 if (status == -1) goto io_error;
Note:
See TracChangeset
for help on using the changeset viewer.
