Changeset 20047
- Timestamp:
- Oct 10, 2008, 12:51:04 PM (18 years ago)
- Location:
- trunk/Ohana/src/opihi
- Files:
-
- 11 edited
-
include/pcontrol.h (modified) (4 diffs)
-
pclient/job.c (modified) (5 diffs)
-
pclient/reset.c (modified) (4 diffs)
-
pcontrol/CheckBusyJob.c (modified) (2 diffs)
-
pcontrol/CheckDoneHost.c (modified) (1 diff)
-
pcontrol/CheckDoneJob.c (modified) (3 diffs)
-
pcontrol/GetJobOutput.c (modified) (1 diff)
-
pcontrol/JobOps.c (modified) (4 diffs)
-
pcontrol/StartJob.c (modified) (2 diffs)
-
pcontrol/check.c (modified) (1 diff)
-
pcontrol/stdout.c (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
-
trunk/Ohana/src/opihi/include/pcontrol.h
r19124 r20047 83 83 } Fifo; 84 84 85 typedef struct { 86 IOBuffer buffer; 87 int completed; 88 int size; 89 int requested; 90 } JobOutput; 91 85 92 /* data to define a job */ 86 93 typedef struct { … … 91 98 int exit_status; 92 99 int Reset; 93 int stdout_size;94 int stderr_size;95 100 JobMode mode; 96 101 JobStat state; 97 102 JobStat stack; 98 IOBuffer stdout_buff;99 IOBuffer stderr_buff;103 JobOutput stdout; 104 JobOutput stderr; 100 105 Ptime start; 101 106 Ptime stop; … … 199 204 int CheckIdleHost (Host *host); 200 205 int CheckDoneJob (Job *job, Host *host); 201 int GetJobOutput (char *command, Host *host, IOBuffer *buffer, int Nbytes);206 int GetJobOutput (char *command, Host *host, JobOutput *output); 202 207 int rconnect (char *command, char *hostname, char *shell, int *stdio); 203 208 … … 247 252 248 253 /*** JobOps.c ***/ 254 int InitJobOutput (JobOutput *output); 255 int ResetJobOutput (JobOutput *output); 249 256 void InitJobStacks (); 250 257 void FreeJobStacks (); -
trunk/Ohana/src/opihi/pclient/job.c
r7917 r20047 8 8 if (argc < 2) { 9 9 gprint (GP_ERR, "USAGE: job (arg0) (arg1) ... (argN)\n"); 10 gprint (GP_LOG, " STATUS%d\n", -2);10 gprint (GP_LOG, "PCLIENT_PID: %d\n", -2); 11 11 return (FALSE); 12 12 } … … 14 14 if (ChildStatus != PCLIENT_NONE) { 15 15 gprint (GP_ERR, "need to clear existing child\n"); 16 gprint (GP_LOG, " STATUS%d\n", -3);16 gprint (GP_LOG, "PCLIENT_PID: %d\n", -3); 17 17 return (FALSE); 18 18 } … … 73 73 ChildPID = pid; 74 74 75 gprint (GP_LOG, " STATUS%d\n", ChildPID);75 gprint (GP_LOG, "PCLIENT_PID: %d\n", ChildPID); 76 76 return (TRUE); 77 77 … … 85 85 if (child_stderr_fd[1] != 0) close (child_stderr_fd[1]); 86 86 87 gprint (GP_LOG, " STATUS%d\n", -1);87 gprint (GP_LOG, "PCLIENT_PID: %d\n", -1); 88 88 return (FALSE); 89 89 } … … 91 91 /* possible responses: 92 92 93 STATUS-1 - pipe error94 STATUS-2 - syntax error95 STATUS-3 - existing child96 STATUS>0 - success (PID)93 PCLIENT_PID: -1 - pipe error 94 PCLIENT_PID: -2 - syntax error 95 PCLIENT_PID: -3 - existing child 96 PCLIENT_PID: >0 - success (PID) 97 97 98 98 */ -
trunk/Ohana/src/opihi/pclient/reset.c
r7917 r20047 8 8 if (argc != 1) { 9 9 gprint (GP_ERR, "USAGE: reset\n"); 10 gprint (GP_LOG, " STATUS-1\n");10 gprint (GP_LOG, "RESET_RESULT: -1\n"); 11 11 return (FALSE); 12 12 } … … 14 14 if (ChildStatus == PCLIENT_NONE) { 15 15 gprint (GP_ERR, "no child process, cannot reset\n"); 16 gprint (GP_LOG, " STATUS2\n");16 gprint (GP_LOG, "RESET_RESULT: 2\n"); 17 17 return (TRUE); 18 18 } … … 43 43 /* total failure, don't reset */ 44 44 gprint (GP_ERR, "child process %d is hung, cannot reset\n", ChildPID); 45 gprint (GP_LOG, " STATUS0\n");45 gprint (GP_LOG, "RESET_RESULT: 0\n"); 46 46 return (FALSE); 47 47 } … … 62 62 ChildExitStatus = 0; 63 63 64 gprint (GP_LOG, " STATUS1\n");64 gprint (GP_LOG, "RESET_RESULT: 1\n"); 65 65 return (TRUE); 66 66 } -
trunk/Ohana/src/opihi/pcontrol/CheckBusyJob.c
r17477 r20047 81 81 if (!strcmp(string, "EXIT")) outstate = PCONTROL_JOB_EXIT; 82 82 if (!strcmp(string, "CRASH")) outstate = PCONTROL_JOB_CRASH; 83 ASSERT (outstate != PCONTROL_JOB_BUSY, "invalid status response (CheckBusyJobResponse)"); 83 if (outstate == PCONTROL_JOB_BUSY) { 84 if (DEBUG || VerboseMode()) gprint (GP_ERR, "invalid status response (CheckBusyJobResponse), try again\n"); 85 PutHost (host, PCONTROL_HOST_BUSY, STACK_BOTTOM); 86 PutJob (job, PCONTROL_JOB_BUSY, STACK_BOTTOM); 87 return (TRUE); 88 } 89 84 90 85 91 /* parse the exit status and sizes of output buffers */ … … 87 93 sscanf (p, "%*s %d", &job[0].exit_status); 88 94 p = memstr (buffer[0].buffer, "STDOUT", buffer[0].Nbuffer); 89 sscanf (p, "%*s %d", &job[0].stdout _size);95 sscanf (p, "%*s %d", &job[0].stdout.size); 90 96 p = memstr (buffer[0].buffer, "STDERR", buffer[0].Nbuffer); 91 sscanf (p, "%*s %d", &job[0].stderr _size);97 sscanf (p, "%*s %d", &job[0].stderr.size); 92 98 93 99 // XXX runaway job if output too large? 94 if (job[0].stdout _size > 0x100000) abort();95 if (job[0].stderr _size > 0x100000) abort();100 if (job[0].stdout.size > 0x1000000) abort(); 101 if (job[0].stderr.size > 0x1000000) abort(); 96 102 97 103 // job has exited : move to DONE stack -
trunk/Ohana/src/opihi/pcontrol/CheckDoneHost.c
r17477 r20047 42 42 43 43 /** successful command, examine result **/ 44 p = memstr (buffer[0].buffer, " STATUS", buffer[0].Nbuffer);44 p = memstr (buffer[0].buffer, "RESET_RESULT:", buffer[0].Nbuffer); 45 45 if (p == NULL) { 46 if (DEBUG || VerboseMode()) gprint (GP_ERR, "missing STATUSin response; try again\n");46 if (DEBUG || VerboseMode()) gprint (GP_ERR, "missing RESET_RESULT: in response; try again\n"); 47 47 PutHost (host, PCONTROL_HOST_DONE, STACK_BOTTOM); 48 48 return (FALSE); -
trunk/Ohana/src/opihi/pcontrol/CheckDoneJob.c
r12840 r20047 1 1 # include "pcontrol.h" 2 # define DEBUG 0 2 3 3 4 int CheckDoneJob (Job *job, Host *host) { 4 5 5 int s uccess;6 int status1, status2; 6 7 7 8 ASSERT (job, "job not set"); … … 11 12 ASSERT (job == (Job *) host[0].job, "invalid job"); 12 13 13 success = TRUE; 14 success &= GetJobOutput ("stdout", host, &job[0].stdout_buff, job[0].stdout_size); 15 success &= GetJobOutput ("stderr", host, &job[0].stderr_buff, job[0].stderr_size); 14 status1 = GetJobOutput ("stdout", host, &job[0].stdout); 15 status2 = GetJobOutput ("stderr", host, &job[0].stderr); 16 16 17 if (!success) { 18 // XXX some kind of error? 19 // XXX try again later? 17 if ((status1 == PCLIENT_DOWN) || (status2 == PCLIENT_DOWN)) { 18 // unlink host & job 19 if (DEBUG || VerboseMode()) gprint (GP_ERR, "host %s is down\n", host[0].hostname); 20 job[0].host = NULL; 21 host[0].job = NULL; 22 23 PutJob (job, PCONTROL_JOB_PENDING, STACK_BOTTOM); 24 25 // clear the response data 26 host[0].response_state = PCONTROL_RESP_NONE; 27 host[0].response = NULL; 28 29 // host has shutdown; harvest the defunct process 30 HarvestHost (host[0].pid); 31 PutHost (host, PCONTROL_HOST_DOWN, STACK_BOTTOM); 32 return (FALSE); 33 } 34 35 // try again if we are still waiting 36 if ((status1 == PCLIENT_HUNG) || (status2 == PCLIENT_HUNG)) { 20 37 PutHost (host, PCONTROL_HOST_BUSY, STACK_BOTTOM); 21 38 PutJob (job, PCONTROL_JOB_DONE, STACK_BOTTOM); … … 32 49 return (TRUE); 33 50 } 34 35 /** need to add timeout check here **/ -
trunk/Ohana/src/opihi/pcontrol/GetJobOutput.c
r16472 r20047 1 1 # include "pcontrol.h" 2 # define PCLIENT_TIMEOUT 500 2 # define PCLIENT_TIMEOUT 100 3 # define DEBUG 0 3 4 4 /* we read Nbytes from the host, then watch for the prompt */ 5 int GetJobOutput (char *cmd, Host *host, IOBuffer *buffer, int Nbytes) { 5 // we are trying to read a total of Nbytes from the host. This function may be called 6 // repeatedly until the buffer has the complete set of data. We need to read output[0].size 7 // bytes, then look for the PCLIENT_PROMPT in the output stream 8 9 int GetJobOutput (char *command, Host *host, JobOutput *output) { 6 10 7 int i, status, Nstart;8 11 char *line; 12 int i, status; 9 13 struct timespec request, remain; 10 14 11 ASSERT (c md, "cmd missing");15 ASSERT (command, "command missing"); 12 16 ASSERT (host, "host missing"); 13 ASSERT ( buffer, "buffermissing");17 ASSERT (output, "output missing"); 14 18 15 /* flush any earlier messages */ 16 ReadtoIOBuffer (buffer, host[0].stdout_fd); 17 FlushIOBuffer (buffer); 18 Nstart = buffer[0].Nbuffer; 19 20 /* avoid blocking on waitpid, test every 100 usec, up to 50 msec */ 19 /* avoid blocking on waitpid, test every 100 usec, up to 10 msec */ 21 20 request.tv_sec = 0; 22 21 request.tv_nsec = 100000; 23 22 24 /* send cmd (stdout / stderr) */ 25 status = write_fmt (host[0].stdin_fd, "%s\n", cmd); 23 if (!output[0].requested) { 24 /* send command (stdout / stderr) */ 25 status = write_fmt (host[0].stdin_fd, "%s\n", command); 26 26 27 /* is pipe still open? */ 28 if ((status == -1) && (errno == EPIPE)) return (PCLIENT_DOWN); 27 /* is pipe still open? */ 28 if ((status == -1) && (errno == EPIPE)) return PCLIENT_DOWN; 29 output[0].requested = TRUE; 30 } 29 31 30 /* read at least Nbytes, then watch for PCLIENT_PROMPT */ 32 if (output[0].completed) return PCLIENT_GOOD; 33 34 // attempt to read the output->size bytes from the host 35 if (output[0].buffer.Nbuffer < output[0].size) { 36 status = -1; 37 for (i = 0; (i < PCLIENT_TIMEOUT) && (status != 0) && (output[0].buffer.Nbuffer < output[0].size); i++) { 38 status = ReadtoIOBuffer (&output[0].buffer, host[0].stdout_fd); 39 if (status == -1) nanosleep (&request, &remain); 40 } 41 if (VerboseMode()) gprint (GP_ERR, "%s\n Read %d of %d bytes so far\n", output[0].buffer.buffer, output[0].buffer.Nbuffer, output[0].size); 42 if (status == 0) { 43 if (VerboseMode()) gprint (GP_ERR, "host %s is down\n", host[0].hostname); 44 return PCLIENT_DOWN; 45 } 46 if (output[0].buffer.Nbuffer < output[0].size) { 47 if (VerboseMode()) gprint (GP_ERR, "host %s still has data, keep trying\n", host[0].hostname); 48 return PCLIENT_HUNG; 49 } 50 } 51 52 // keep trying to read until we get the prompt 31 53 line = NULL; 32 54 status = -1; 33 55 for (i = 0; (i < PCLIENT_TIMEOUT) && (status != 0) && (line == NULL); i++) { 34 status = ReadtoIOBuffer (buffer, host[0].stdout_fd); 35 if ((buffer[0].Nbuffer - Nstart) >= Nbytes) { 36 line = memstr (buffer[0].buffer, PCLIENT_PROMPT, buffer[0].Nbuffer); 37 } 56 status = ReadtoIOBuffer (&output[0].buffer, host[0].stdout_fd); 57 line = memstr (output[0].buffer.buffer, PCLIENT_PROMPT, output[0].buffer.Nbuffer); 38 58 if (status == -1) nanosleep (&request, &remain); 39 59 } 40 if (status == 0) return (PCLIENT_DOWN); 41 if (status == -1) return (PCLIENT_HUNG); 42 43 /* check on success of pclient command */ 44 switch (status) { 45 case -1: 46 if (VerboseMode()) gprint (GP_ERR, "host %s is not responding\n", host[0].hostname); 47 return (FALSE); 48 49 case 0: 50 if (VerboseMode()) gprint (GP_ERR, "host %s is down\n", host[0].hostname); 51 return (FALSE); 52 53 default: 54 if (VerboseMode()) gprint (GP_ERR, "message received (GetJobOutput : %s)\n", cmd); 55 /* drop extra bytes from pclient (not pclient:job) */ 56 buffer[0].Nbuffer = Nstart + Nbytes; 57 if (buffer[0].Nalloc > buffer[0].Nbuffer) { 58 bzero (buffer[0].buffer + buffer[0].Nbuffer, buffer[0].Nalloc - buffer[0].Nbuffer); 59 } 60 return (TRUE); 60 if (VerboseMode()) gprint (GP_ERR, "%s\n Read %d of %d bytes so far\n", output[0].buffer.buffer, output[0].buffer.Nbuffer, output[0].size); 61 if (status == 0) { 62 if (VerboseMode()) gprint (GP_ERR, "host %s is down\n", host[0].hostname); 63 return PCLIENT_DOWN; 64 } 65 if (line == NULL) { 66 if (VerboseMode()) gprint (GP_ERR, "host %s not yet at prompt, keep trying\n", host[0].hostname); 67 return PCLIENT_HUNG; 61 68 } 62 69 63 gprint (GP_ERR, "programming error: should not reach here (GetJobOutput)\n");64 pcontrol_exit (50);70 output[0].completed = TRUE; 71 return PCLIENT_GOOD; 65 72 } -
trunk/Ohana/src/opihi/pcontrol/JobOps.c
r18098 r20047 175 175 } 176 176 177 int InitJobOutput (JobOutput *output) { 178 179 output[0].size = 0; 180 output[0].requested = FALSE; 181 output[0].completed = FALSE; 182 InitIOBuffer (&output[0].buffer, 0x1000); 183 184 return TRUE; 185 } 186 187 int ResetJobOutput (JobOutput *output) { 188 189 output[0].size = 0; 190 output[0].requested = FALSE; 191 output[0].completed = FALSE; 192 FlushIOBuffer (&output[0].buffer); 193 194 return TRUE; 195 } 196 177 197 IDtype AddJob (char *hostname, JobMode mode, int timeout, int argc, char **argv) { 178 198 … … 189 209 job[0].exit_status = 0; 190 210 job[0].Reset = FALSE; 191 job[0].stdout_size = 0; 192 job[0].stderr_size = 0; 211 212 InitJobOutput (&job[0].stdout); 213 InitJobOutput (&job[0].stderr); 193 214 194 215 job[0].mode = mode; … … 196 217 job[0].state = 0; 197 218 job[0].stack = 0; 198 199 /* do this step on start? */200 InitIOBuffer (&job[0].stdout_buff, 0x1000);201 InitIOBuffer (&job[0].stderr_buff, 0x1000);202 219 203 220 job[0].dtime = 0.0; … … 233 250 FREE (job[0].argv); 234 251 235 FreeIOBuffer (&job[0].stdout _buff);236 FreeIOBuffer (&job[0].stderr _buff);252 FreeIOBuffer (&job[0].stdout.buffer); 253 FreeIOBuffer (&job[0].stderr.buffer); 237 254 238 255 FREE (job); -
trunk/Ohana/src/opihi/pcontrol/StartJob.c
r19124 r20047 12 12 ASSERT (host == (Host *) job[0].host, "invalid host"); 13 13 ASSERT (job == (Job *) host[0].job, "invalid job"); 14 15 ResetJobOutput (&job[0].stdout); 16 ResetJobOutput (&job[0].stderr); 14 17 15 18 /* construct command line : job arg0 arg1 ... argN\n */ … … 80 83 81 84 /* check on result of pclient command */ 82 p = memstr (buffer[0].buffer, " STATUS", buffer[0].Nbuffer);85 p = memstr (buffer[0].buffer, "PCLIENT_PID:", buffer[0].Nbuffer); 83 86 if (p == NULL) { 84 87 // failed to get a valid response. kill the job and try again, -
trunk/Ohana/src/opihi/pcontrol/check.c
r18098 r20047 31 31 gprint (GP_LOG, "STATUS %s\n", GetJobStackName(job[0].stack)); 32 32 gprint (GP_LOG, "EXITST %d\n", job[0].exit_status); 33 gprint (GP_LOG, "STDOUT %d\n", job[0].stdout _size);34 gprint (GP_LOG, "STDERR %d\n", job[0].stderr _size);33 gprint (GP_LOG, "STDOUT %d\n", job[0].stdout.size); 34 gprint (GP_LOG, "STDERR %d\n", job[0].stderr.size); 35 35 gprint (GP_LOG, "DTIME %lf\n", job[0].dtime); 36 36 if (job[0].realhost) { -
trunk/Ohana/src/opihi/pcontrol/stdout.c
r18116 r20047 43 43 44 44 found_stdout: 45 buffer = &job[0].stdout _buff;45 buffer = &job[0].stdout.buffer; 46 46 if (varName == NULL) { 47 47 fwrite (buffer[0].buffer, 1, buffer[0].Nbuffer, stdout); … … 97 97 98 98 found_stderr: 99 buffer = &job[0].stderr _buff;99 buffer = &job[0].stderr.buffer; 100 100 if (varName == NULL) { 101 101 fwrite (buffer[0].buffer, 1, buffer[0].Nbuffer, stdout);
Note:
See TracChangeset
for help on using the changeset viewer.
