IPP Software Navigation Tools IPP Links Communication Pan-STARRS Links

Changeset 20047


Ignore:
Timestamp:
Oct 10, 2008, 12:51:04 PM (18 years ago)
Author:
eugene
Message:

allow job stdout / stderr from pclient to arrive slowly

Location:
trunk/Ohana/src/opihi
Files:
11 edited

Legend:

Unmodified
Added
Removed
  • trunk/Ohana/src/opihi/include/pcontrol.h

    r19124 r20047  
    8383} Fifo;
    8484
     85typedef struct {
     86  IOBuffer     buffer;
     87  int          completed;
     88  int          size;
     89  int          requested;
     90} JobOutput;
     91
    8592/* data to define a job */
    8693typedef struct {
     
    9198  int          exit_status;
    9299  int          Reset;
    93   int          stdout_size;
    94   int          stderr_size;
    95100  JobMode      mode;
    96101  JobStat      state;
    97102  JobStat      stack;
    98   IOBuffer     stdout_buff;
    99   IOBuffer     stderr_buff;
     103  JobOutput    stdout;
     104  JobOutput    stderr;
    100105  Ptime        start;
    101106  Ptime        stop;
     
    199204int CheckIdleHost (Host *host);
    200205int CheckDoneJob (Job *job, Host *host);
    201 int GetJobOutput (char *command, Host *host, IOBuffer *buffer, int Nbytes);
     206int GetJobOutput (char *command, Host *host, JobOutput *output);
    202207int rconnect (char *command, char *hostname, char *shell, int *stdio);
    203208
     
    247252
    248253/*** JobOps.c ***/
     254int InitJobOutput (JobOutput *output);
     255int ResetJobOutput (JobOutput *output);
    249256void   InitJobStacks ();
    250257void   FreeJobStacks ();
  • trunk/Ohana/src/opihi/pclient/job.c

    r7917 r20047  
    88  if (argc < 2) {
    99    gprint (GP_ERR, "USAGE: job (arg0) (arg1) ... (argN)\n");
    10     gprint (GP_LOG, "STATUS %d\n", -2);
     10    gprint (GP_LOG, "PCLIENT_PID: %d\n", -2);
    1111    return (FALSE);
    1212  }
     
    1414  if (ChildStatus != PCLIENT_NONE) {
    1515    gprint (GP_ERR, "need to clear existing child\n");
    16     gprint (GP_LOG, "STATUS %d\n", -3);
     16    gprint (GP_LOG, "PCLIENT_PID: %d\n", -3);
    1717    return (FALSE);
    1818  }
     
    7373  ChildPID = pid;
    7474
    75   gprint (GP_LOG, "STATUS %d\n", ChildPID);
     75  gprint (GP_LOG, "PCLIENT_PID: %d\n", ChildPID);
    7676  return (TRUE);
    7777
     
    8585  if (child_stderr_fd[1] != 0) close (child_stderr_fd[1]);
    8686
    87   gprint (GP_LOG, "STATUS %d\n", -1);
     87  gprint (GP_LOG, "PCLIENT_PID: %d\n", -1);
    8888  return (FALSE);
    8989}
     
    9191/* possible responses:
    9292
    93 STATUS -1 - pipe error
    94 STATUS -2 - syntax error
    95 STATUS -3 - existing child
    96 STATUS >0 - success (PID)
     93PCLIENT_PID: -1 - pipe error
     94PCLIENT_PID: -2 - syntax error
     95PCLIENT_PID: -3 - existing child
     96PCLIENT_PID: >0 - success (PID)
    9797
    9898*/
  • trunk/Ohana/src/opihi/pclient/reset.c

    r7917 r20047  
    88  if (argc != 1) {
    99    gprint (GP_ERR, "USAGE: reset\n");
    10     gprint (GP_LOG, "STATUS -1\n");
     10    gprint (GP_LOG, "RESET_RESULT: -1\n");
    1111    return (FALSE);
    1212  }
     
    1414  if (ChildStatus == PCLIENT_NONE) {
    1515    gprint (GP_ERR, "no child process, cannot reset\n");
    16     gprint (GP_LOG, "STATUS 2\n");
     16    gprint (GP_LOG, "RESET_RESULT: 2\n");
    1717    return (TRUE);
    1818  }
     
    4343    /* total failure, don't reset */
    4444    gprint (GP_ERR, "child process %d is hung, cannot reset\n", ChildPID);
    45     gprint (GP_LOG, "STATUS 0\n");
     45    gprint (GP_LOG, "RESET_RESULT: 0\n");
    4646    return (FALSE);
    4747  }
     
    6262  ChildExitStatus = 0;
    6363
    64   gprint (GP_LOG, "STATUS 1\n");
     64  gprint (GP_LOG, "RESET_RESULT: 1\n");
    6565  return (TRUE);
    6666}
  • trunk/Ohana/src/opihi/pcontrol/CheckBusyJob.c

    r17477 r20047  
    8181  if (!strcmp(string, "EXIT")) outstate = PCONTROL_JOB_EXIT;
    8282  if (!strcmp(string, "CRASH")) outstate = PCONTROL_JOB_CRASH;
    83   ASSERT (outstate != PCONTROL_JOB_BUSY, "invalid status response (CheckBusyJobResponse)");
     83  if (outstate == PCONTROL_JOB_BUSY) {
     84    if (DEBUG || VerboseMode()) gprint (GP_ERR, "invalid status response (CheckBusyJobResponse), try again\n");
     85    PutHost (host, PCONTROL_HOST_BUSY, STACK_BOTTOM);
     86    PutJob (job, PCONTROL_JOB_BUSY, STACK_BOTTOM);
     87    return (TRUE);
     88  }
     89
    8490
    8591  /* parse the exit status and sizes of output buffers */
     
    8793  sscanf (p, "%*s %d", &job[0].exit_status);
    8894  p = memstr (buffer[0].buffer, "STDOUT", buffer[0].Nbuffer);
    89   sscanf (p, "%*s %d", &job[0].stdout_size);
     95  sscanf (p, "%*s %d", &job[0].stdout.size);
    9096  p = memstr (buffer[0].buffer, "STDERR", buffer[0].Nbuffer);
    91   sscanf (p, "%*s %d", &job[0].stderr_size);
     97  sscanf (p, "%*s %d", &job[0].stderr.size);
    9298
    9399  // XXX runaway job if output too large?
    94   if (job[0].stdout_size > 0x100000) abort();
    95   if (job[0].stderr_size > 0x100000) abort();
     100  if (job[0].stdout.size > 0x1000000) abort();
     101  if (job[0].stderr.size > 0x1000000) abort();
    96102
    97103  // job has exited : move to DONE stack
  • trunk/Ohana/src/opihi/pcontrol/CheckDoneHost.c

    r17477 r20047  
    4242
    4343  /** successful command, examine result **/
    44   p = memstr (buffer[0].buffer, "STATUS", buffer[0].Nbuffer);
     44  p = memstr (buffer[0].buffer, "RESET_RESULT:", buffer[0].Nbuffer);
    4545  if (p == NULL) {
    46       if (DEBUG || VerboseMode()) gprint (GP_ERR, "missing STATUS in response; try again\n");
     46      if (DEBUG || VerboseMode()) gprint (GP_ERR, "missing RESET_RESULT: in response; try again\n");
    4747      PutHost (host, PCONTROL_HOST_DONE, STACK_BOTTOM);
    4848      return (FALSE);
  • trunk/Ohana/src/opihi/pcontrol/CheckDoneJob.c

    r12840 r20047  
    11# include "pcontrol.h"
     2# define DEBUG 0
    23
    34int CheckDoneJob (Job *job, Host *host) {
    45 
    5   int success;
     6  int status1, status2;
    67
    78  ASSERT (job, "job not set");
     
    1112  ASSERT (job == (Job *) host[0].job, "invalid job");
    1213
    13   success = TRUE;
    14   success &= GetJobOutput ("stdout", host, &job[0].stdout_buff, job[0].stdout_size);
    15   success &= GetJobOutput ("stderr", host, &job[0].stderr_buff, job[0].stderr_size);
     14  status1 = GetJobOutput ("stdout", host, &job[0].stdout);
     15  status2 = GetJobOutput ("stderr", host, &job[0].stderr);
    1616
    17   if (!success) {
    18     // XXX some kind of error?
    19     // XXX try again later?
     17  if ((status1 == PCLIENT_DOWN) || (status2 == PCLIENT_DOWN)) {
     18    // unlink host & job
     19    if (DEBUG || VerboseMode()) gprint (GP_ERR, "host %s is down\n", host[0].hostname);
     20    job[0].host = NULL;
     21    host[0].job = NULL;
     22       
     23    PutJob (job, PCONTROL_JOB_PENDING, STACK_BOTTOM);
     24
     25    // clear the response data
     26    host[0].response_state = PCONTROL_RESP_NONE;
     27    host[0].response = NULL;
     28
     29    // host has shutdown; harvest the defunct process
     30    HarvestHost (host[0].pid);
     31    PutHost (host, PCONTROL_HOST_DOWN, STACK_BOTTOM);
     32    return (FALSE);
     33  }
     34
     35  // try again if we are still waiting
     36  if ((status1 == PCLIENT_HUNG) || (status2 == PCLIENT_HUNG)) {
    2037    PutHost (host, PCONTROL_HOST_BUSY, STACK_BOTTOM);
    2138    PutJob (job, PCONTROL_JOB_DONE, STACK_BOTTOM);
     
    3249  return (TRUE);
    3350}
    34 
    35 /** need to add timeout check here **/
  • trunk/Ohana/src/opihi/pcontrol/GetJobOutput.c

    r16472 r20047  
    11# include "pcontrol.h"
    2 # define PCLIENT_TIMEOUT 500
     2# define PCLIENT_TIMEOUT 100
     3# define DEBUG 0
    34
    4 /* we read Nbytes from the host, then watch for the prompt */
    5 int GetJobOutput (char *cmd, Host *host, IOBuffer *buffer, int Nbytes) {
     5// we are trying to read a total of Nbytes from the host.  This function may be called
     6// repeatedly until the buffer has the complete set of data.  We need to read output[0].size
     7// bytes, then look for the PCLIENT_PROMPT in the output stream
     8
     9int GetJobOutput (char *command, Host *host, JobOutput *output) {
    610 
    7   int i, status, Nstart;
    811  char *line;
     12  int i, status;
    913  struct timespec request, remain;
    1014
    11   ASSERT (cmd, "cmd missing");
     15  ASSERT (command, "command missing");
    1216  ASSERT (host, "host missing");
    13   ASSERT (buffer, "buffer missing");
     17  ASSERT (output, "output missing");
    1418
    15   /* flush any earlier messages */
    16   ReadtoIOBuffer (buffer, host[0].stdout_fd);
    17   FlushIOBuffer (buffer);
    18   Nstart = buffer[0].Nbuffer;
    19 
    20   /* avoid blocking on waitpid, test every 100 usec, up to 50 msec */
     19  /* avoid blocking on waitpid, test every 100 usec, up to 10 msec */
    2120  request.tv_sec = 0;
    2221  request.tv_nsec = 100000;
    2322
    24   /* send cmd (stdout / stderr) */
    25   status = write_fmt (host[0].stdin_fd, "%s\n", cmd);
     23  if (!output[0].requested) {
     24      /* send command (stdout / stderr) */
     25      status = write_fmt (host[0].stdin_fd, "%s\n", command);
    2626
    27   /* is pipe still open? */
    28   if ((status == -1) && (errno == EPIPE)) return (PCLIENT_DOWN);
     27      /* is pipe still open? */
     28      if ((status == -1) && (errno == EPIPE)) return PCLIENT_DOWN;
     29      output[0].requested = TRUE;
     30  }
    2931
    30   /* read at least Nbytes, then watch for PCLIENT_PROMPT */
     32  if (output[0].completed) return PCLIENT_GOOD;
     33
     34  // attempt to read the output->size bytes from the host
     35  if (output[0].buffer.Nbuffer < output[0].size) {
     36      status = -1;
     37      for (i = 0; (i < PCLIENT_TIMEOUT) && (status != 0) && (output[0].buffer.Nbuffer < output[0].size); i++) {
     38          status = ReadtoIOBuffer (&output[0].buffer, host[0].stdout_fd);
     39          if (status == -1) nanosleep (&request, &remain);
     40      }
     41      if (VerboseMode()) gprint (GP_ERR, "%s\n Read %d of %d bytes so far\n", output[0].buffer.buffer, output[0].buffer.Nbuffer, output[0].size);
     42      if (status == 0) {
     43          if (VerboseMode()) gprint (GP_ERR, "host %s is down\n", host[0].hostname);
     44          return PCLIENT_DOWN;
     45      }
     46      if (output[0].buffer.Nbuffer < output[0].size) {
     47          if (VerboseMode()) gprint (GP_ERR, "host %s still has data, keep trying\n", host[0].hostname);
     48          return PCLIENT_HUNG;
     49      }
     50  }
     51
     52  // keep trying to read until we get the prompt
    3153  line = NULL;
    3254  status = -1;
    3355  for (i = 0; (i < PCLIENT_TIMEOUT) && (status != 0) && (line == NULL); i++) {
    34     status = ReadtoIOBuffer (buffer, host[0].stdout_fd);
    35     if ((buffer[0].Nbuffer - Nstart) >= Nbytes) {
    36       line = memstr (buffer[0].buffer, PCLIENT_PROMPT, buffer[0].Nbuffer);
    37     }
     56    status = ReadtoIOBuffer (&output[0].buffer, host[0].stdout_fd);
     57    line = memstr (output[0].buffer.buffer, PCLIENT_PROMPT, output[0].buffer.Nbuffer);
    3858    if (status == -1) nanosleep (&request, &remain);
    3959  }
    40   if (status ==  0) return (PCLIENT_DOWN);
    41   if (status == -1) return (PCLIENT_HUNG);
    42 
    43   /* check on success of pclient command */
    44   switch (status) {
    45     case -1:
    46       if (VerboseMode()) gprint (GP_ERR, "host %s is not responding\n", host[0].hostname);
    47       return (FALSE);
    48 
    49     case 0:
    50       if (VerboseMode()) gprint (GP_ERR, "host %s is down\n", host[0].hostname);
    51       return (FALSE);
    52 
    53     default:
    54       if (VerboseMode()) gprint (GP_ERR, "message received (GetJobOutput : %s)\n", cmd); 
    55       /* drop extra bytes from pclient (not pclient:job) */
    56       buffer[0].Nbuffer = Nstart + Nbytes;
    57       if (buffer[0].Nalloc > buffer[0].Nbuffer) {
    58         bzero (buffer[0].buffer + buffer[0].Nbuffer, buffer[0].Nalloc - buffer[0].Nbuffer);
    59       }
    60       return (TRUE);
     60  if (VerboseMode()) gprint (GP_ERR, "%s\n Read %d of %d bytes so far\n", output[0].buffer.buffer, output[0].buffer.Nbuffer, output[0].size);
     61  if (status == 0) {
     62    if (VerboseMode()) gprint (GP_ERR, "host %s is down\n", host[0].hostname);
     63    return PCLIENT_DOWN;
     64  }
     65  if (line == NULL) {
     66    if (VerboseMode()) gprint (GP_ERR, "host %s not yet at prompt, keep trying\n", host[0].hostname);
     67    return PCLIENT_HUNG;
    6168  }
    6269
    63   gprint (GP_ERR, "programming error: should not reach here (GetJobOutput)\n");
    64   pcontrol_exit (50);
     70  output[0].completed = TRUE;
     71  return PCLIENT_GOOD;
    6572}
  • trunk/Ohana/src/opihi/pcontrol/JobOps.c

    r18098 r20047  
    175175}
    176176
     177int InitJobOutput (JobOutput *output) {
     178
     179  output[0].size = 0;
     180  output[0].requested = FALSE;
     181  output[0].completed = FALSE;
     182  InitIOBuffer (&output[0].buffer, 0x1000);
     183
     184  return TRUE;
     185}
     186
     187int ResetJobOutput (JobOutput *output) {
     188
     189  output[0].size = 0;
     190  output[0].requested = FALSE;
     191  output[0].completed = FALSE;
     192  FlushIOBuffer (&output[0].buffer);
     193
     194  return TRUE;
     195}
     196
    177197IDtype AddJob (char *hostname, JobMode mode, int timeout, int argc, char **argv) {
    178198
     
    189209  job[0].exit_status = 0;
    190210  job[0].Reset    = FALSE;
    191   job[0].stdout_size = 0;
    192   job[0].stderr_size = 0;
     211
     212  InitJobOutput (&job[0].stdout);
     213  InitJobOutput (&job[0].stderr);
    193214
    194215  job[0].mode     = mode;
     
    196217  job[0].state = 0;
    197218  job[0].stack = 0;
    198 
    199   /* do this step on start? */
    200   InitIOBuffer (&job[0].stdout_buff, 0x1000);
    201   InitIOBuffer (&job[0].stderr_buff, 0x1000);
    202219
    203220  job[0].dtime = 0.0;
     
    233250  FREE (job[0].argv);
    234251
    235   FreeIOBuffer (&job[0].stdout_buff);
    236   FreeIOBuffer (&job[0].stderr_buff);
     252  FreeIOBuffer (&job[0].stdout.buffer);
     253  FreeIOBuffer (&job[0].stderr.buffer);
    237254
    238255  FREE (job);
  • trunk/Ohana/src/opihi/pcontrol/StartJob.c

    r19124 r20047  
    1212  ASSERT (host == (Host *) job[0].host, "invalid host");
    1313  ASSERT (job  == (Job *) host[0].job, "invalid job");
     14
     15  ResetJobOutput (&job[0].stdout);
     16  ResetJobOutput (&job[0].stderr);
    1417
    1518  /* construct command line : job arg0 arg1 ... argN\n */
     
    8083
    8184  /* check on result of pclient command */
    82   p = memstr (buffer[0].buffer, "STATUS", buffer[0].Nbuffer);
     85  p = memstr (buffer[0].buffer, "PCLIENT_PID:", buffer[0].Nbuffer);
    8386  if (p == NULL) {
    8487      // failed to get a valid response.  kill the job and try again,
  • trunk/Ohana/src/opihi/pcontrol/check.c

    r18098 r20047  
    3131    gprint (GP_LOG, "STATUS %s\n", GetJobStackName(job[0].stack));
    3232    gprint (GP_LOG, "EXITST %d\n", job[0].exit_status);
    33     gprint (GP_LOG, "STDOUT %d\n", job[0].stdout_size);
    34     gprint (GP_LOG, "STDERR %d\n", job[0].stderr_size);
     33    gprint (GP_LOG, "STDOUT %d\n", job[0].stdout.size);
     34    gprint (GP_LOG, "STDERR %d\n", job[0].stderr.size);
    3535    gprint (GP_LOG, "DTIME %lf\n", job[0].dtime);
    3636    if (job[0].realhost) {
  • trunk/Ohana/src/opihi/pcontrol/stdout.c

    r18116 r20047  
    4343
    4444found_stdout:
    45   buffer = &job[0].stdout_buff;
     45  buffer = &job[0].stdout.buffer;
    4646  if (varName == NULL) {
    4747    fwrite (buffer[0].buffer, 1, buffer[0].Nbuffer, stdout);
     
    9797
    9898found_stderr:
    99   buffer = &job[0].stderr_buff;
     99  buffer = &job[0].stderr.buffer;
    100100  if (varName == NULL) {
    101101    fwrite (buffer[0].buffer, 1, buffer[0].Nbuffer, stdout);
Note: See TracChangeset for help on using the changeset viewer.