IPP Software Navigation Tools IPP Links Communication Pan-STARRS Links

Changeset 8424


Ignore:
Timestamp:
Aug 18, 2006, 1:44:51 PM (20 years ago)
Author:
eugene
Message:

successful implementation of the threaded version of pcontrol

Location:
trunk/Ohana/src/opihi
Files:
2 added
1 deleted
24 edited

Legend:

Unmodified
Added
Removed
  • trunk/Ohana/src/opihi/doc/pcontrol.txt

    r8297 r8424  
     1
     22006.08.18
     3
     4 Outstanding issues related to pcontrol:
     5
     6 * disposition of HUNG jobs?
     7 * probably should not save the history for pcontrol or pclient
     8   (these will be many lines long very quickly...)
     9 * need to add options to run/stop for hosts and jobs independently
    110
    2112006.08.11
  • trunk/Ohana/src/opihi/include/pcontrol.h

    r8297 r8424  
    11# include "data.h"
    22# include "basic.h"
     3# define THREADED
    34
    45typedef struct timeval Ptime;
     
    9899  int    Nobject;
    99100  int    NOBJECT;
    100   // pthread_mutex_t mutex;
     101# ifdef THREADED   
     102  pthread_mutex_t mutex;
     103# endif
    101104} Stack;
    102105
     
    108111# define DTIME(A,B) ((A.tv_sec - B.tv_sec) + 1e-6*(A.tv_usec - B.tv_usec))
    109112# define ZTIME(A) ((A.tv_sec == 0) && (A.tv_usec == 0))
     113
     114// # define ASSERT(TEST,STRING) { if (!(TEST)) { gprint (GP_ERR, "programming error: %s\n", STRING); abort (); }}
     115// # define ABORT(STRING) { gprint (GP_ERR, "programming error: %s\n", STRING); abort (); }
     116# define ASSERT(TEST,STRING) { if (!(TEST)) { gprint (GP_ERR, "programming error: %s\n", STRING); raise (SIGINT); exit (2); }}
     117# define ABORT(STRING) { gprint (GP_ERR, "programming error: %s\n", STRING); raise (SIGINT); exit (2); }
    110118
    111119void InitPcontrol ();
     
    117125void  *PullStackByName (Stack *stack, char *name);
    118126void  *PullStackByID (Stack *stack, int id);
    119 void  *FindStackByID (Stack *stack, int id);
    120 void  *FindStackByName (Stack *stack, char *name);
     127int    RemoveStackEntry (Stack *stack, int where);
     128void  *RemoveStackByID (Stack *stack, int id);
    121129void   LockStack (Stack *stack);
    122130void   UnlockStack (Stack *stack);
    123131
    124 int CheckSystem ();
    125 int CheckBusyJobs (float delay);
    126 int CheckDoneJobs (float delay);
    127 int CheckKillJobs (float delay);
    128 int CheckDoneHosts (float delay);
    129 int CheckLiveHosts (float delay);
    130 int CheckDownHosts (float delay);
    131 int CheckIdleHosts (float delay);
     132// void  *FindStackByID (Stack *stack, int id);
     133// void  *FindStackByName (Stack *stack, char *name);
     134
     135/*** CheckSystem.c ***/
     136int   CheckSystem ();
     137void *CheckSystem_Threaded (void *data);
     138int   CheckBusyJobs (float delay);
     139int   CheckDoneJobs (float delay);
     140int   CheckKillJobs (float delay);
     141int   CheckDoneHosts (float delay);
     142int   CheckDownHosts (float delay);
     143int   CheckIdleHosts (float delay);
     144int   CheckLiveHosts (float delay);
     145int   SetRunSystem (int state);
     146
     147/*** own files ***/
     148int CheckHost (Host *host);
     149int StartHost (Host *host);
    132150int CheckIdleHost (Host *host);
     151int CheckDoneHost (Host *host);
     152int CheckBusyJob (Job *job, Host *host);
     153int CheckDoneJob (Job *job, Host *host);
     154int KillJob (Job *job, Host *host);
     155int StartJob (Job *job, Host *host);
     156int ResetJob (Job *job);
     157int GetJobOutput (char *command, Host *host, IOBuffer *buffer, int Nbytes);
     158int PclientCommand (Host *host, char *command, char *response, IOBuffer *buffer);
     159int rconnect (char *command, char *hostname, char *shell, int *stdio);
     160
     161/*** misc files ***/
     162int    VerboseMode ();  // in verbose.c
     163void   gotsignal (int signum); // in pcontrol.c
     164
     165/*** IDops.c ***/
    133166void InitIDs ();
    134167IDtype NextJobID ();
    135168IDtype NextHostID ();
    136169void PrintID (gpDest dest, IDtype ID);
    137 int CheckBusyJob (Job *job);
    138 int StartHost (Host *host);
    139 int CheckDoneHost (Host *host);
    140 int CheckDoneJob (Job *job);
    141 int GetJobOutput (char *command, Host *host, IOBuffer *buffer, int Nbytes);
    142 int ResetJob (Job *job);
    143 int PclientCommand (Host *host, char *command, char *response, IOBuffer *buffer);
    144 int rconnect (char *command, char *hostname, char *shell, int *stdio);
    145 int CheckHost (Host *host);
    146 
    147 int PrintJobStack (int Nstack);
    148 int PrintHostStack (int Nstack);
    149 
    150 int    VerboseMode ();
    151 
    152 void   gotsignal (int signum);
     170
     171/*** CheckPoint.c ***/
     172int SetCheckPoint ();
     173int ClearCheckPoint ();
     174int TestCheckPoint ();
    153175
    154176/*** HostOps.c ***/
    155177void   InitHostStacks ();
    156178Stack *GetHostStack (int StackID);
     179char  *GetHostStackName (int StackID);
    157180Stack *GetHostStackByName (char *name);
    158181int    PutHost (Host *host, int StackID, int where);
     
    161184Host  *PullHostFromStackByID (int StackID, IDtype ID);
    162185Host  *PullHostFromStackByName (int StackID, char *name);
    163 Host  *FindHostByID (IDtype HostID, int *StackID);
    164 Host  *FindHostByName (char *name, int *StackID);
    165 Host  *FindHostInStackByID (int StackID, IDtype ID);
    166 Host  *FindHostInStackByName (int StackID, char *name);
    167186IDtype AddHost (char *hostname);
    168187void   DelHost (Host *host);
    169188
     189/*** StopHosts.c ***/
    170190void   DownHost (Host *host);
    171191void   OffHost (Host *host);
    172192int    DownHosts ();
     193int    StopHost (Host *host);
    173194int    HarvestHost (int pid);
    174 int    StopHost (Host *host);
    175195
    176196/*** JobOps.c ***/
    177197void   InitJobStacks ();
    178198Stack *GetJobStack (int StackID);
     199char  *GetJobStackName (int StackID);
    179200Stack *GetJobStackByName (char *name);
    180201int    PutJob (Job *job, int StackID, int where);
     
    182203Job   *PullJobByID (IDtype JobID, int *StackID);
    183204Job   *PullJobFromStackByID (int StackID, int ID);
    184 Job   *FindJobByID (IDtype JobID, int *StackID);
    185 Job   *FindJobInStackByID (int StackID, int ID);
    186205IDtype AddJob (char *hostname, JobMode mode, int timeout, int argc, char **argv);
    187206void   DelJob (Job *job);
     
    189208void   LinkJobAndHost (Job *job, Host *host);
    190209
    191 int    KillJob (Job *job);
    192 int    StartJob (Job *job);
     210// Job   *FindJobByID (IDtype JobID, int *StackID);
     211// Job   *FindJobInStackByID (int StackID, int ID);
     212// Host  *FindHostByID (IDtype HostID, int *StackID);
     213// Host  *FindHostByName (char *name, int *StackID);
     214// Host  *FindHostInStackByID (int StackID, IDtype ID);
     215// Host  *FindHostInStackByName (int StackID, char *name);
  • trunk/Ohana/src/opihi/include/shell.h

    r8174 r8424  
    1111# define MACRO_STRING(s) #s
    1212# define MACRO_NAME(s) MACRO_STRING(s)
     13
     14/* enums used by gprint functions */
     15typedef enum {GP_FILE, GP_BUFF} gpMode;
     16typedef enum {GP_LOG, GP_ERR} gpDest;
    1317
    1418typedef int CommandF ();
     
    3337} List;
    3438
     39/* structure used to represent the gprint i/o stream */
     40typedef struct {
     41  FILE *file;
     42  IOBuffer *buffer;
     43  gpMode mode;
     44  gpDest dest;
     45  pthread_t thread;
     46} gpStream;
     47
    3548/*** globals used to track the shell language concepts  ***/
    3649List         *lists;                    /* variable to store the list of all lists */
     
    4760void          program_init              PROTO((int *argc, char **argv));
    4861void          startup                   PROTO((int *argc, char **argv));
     62int           opihi                     PROTO((int argc, char **argv));
    4963int           multicommand              PROTO((char *line));
    5064void          multicommand_InitServer   PROTO((void));
     
    118132int           macro_list_f              PROTO((int, char **));  /* "macro_list" is a readline func */
    119133int           macro_read                PROTO((int, char **));
    120 int           macro_write               PROTO((int, char **));
     134int           macro_write               PROTO((int, char **));
    121135
    122 char *memstr (char *m1, char *m2, int n);
    123 int write_fmt (int fd, char *format, ...);
    124 char *opihi_version ();
    125 char *strip_version (char *input);
     136char         *memstr                    PROTO((char *m1, char *m2, int n));
     137int           write_fmt                 PROTO((int fd, char *format, ...));
     138char         *opihi_version             PROTO(());
     139char         *strip_version             PROTO((char *input));
    126140
    127 /*** gprint defines ***/
    128 
    129 /* enums used by gprint functions */
    130 typedef enum {GP_FILE, GP_BUFF} gpMode;
    131 typedef enum {GP_LOG, GP_ERR} gpDest;
    132 
    133 /* structure used to represent the gprint i/o stream */
    134 typedef struct {
    135   FILE *file;
    136   IOBuffer *buffer;
    137   gpMode mode;
    138   gpDest dest;
    139   pthread_t thread;
    140 } gpStream;
    141 
    142 void gprintInit ();
    143 gpStream *gprintGetStream (gpDest dest);
    144 void gprintSetBuffer (gpDest dest);
    145 IOBuffer *gprintGetBuffer (gpDest dest);
    146 void gprintSetFile (gpDest dest, char *filename);
    147 FILE *gprintGetFile (gpDest dest);
    148 int gprint (gpDest dest, char *format, ...);
    149 int gwrite (char *buffer, int size, int N, gpDest dest);
     141/* gprint functions */
     142void          gprintInit                PROTO(());
     143gpStream     *gprintGetStream           PROTO((gpDest dest));
     144void          gprintSetBuffer           PROTO((gpDest dest));
     145IOBuffer     *gprintGetBuffer           PROTO((gpDest dest));
     146void          gprintSetFile             PROTO((gpDest dest, char *filename));
     147FILE         *gprintGetFile             PROTO((gpDest dest));
     148int           gprint                    PROTO((gpDest dest, char *format, ...));
     149int           gwrite                    PROTO((char *buffer, int size, int N, gpDest dest));
    150150
    151151# endif
  • trunk/Ohana/src/opihi/pcontrol/CheckBusyJob.c

    r8296 r8424  
    11# include "pcontrol.h"
    22
    3 int CheckBusyJob (Job *job) {
     3int CheckBusyJob (Job *job, Host *host) {
    44
    55  int      status;
     
    88  char     string[64];
    99  IOBuffer buffer;
    10   Host    *host;
    1110
    1211  /* we are checking a job which is currently busy.  it has been pulled from the
     
    1413     XXX need to check on state of HOST on return */
    1514
    16   /** must have a valid host : if not? **/
    17   host = (Host *) job[0].host;
     15  ASSERT (host == (Host *) job[0].host, "invalid host");
     16  ASSERT (job  == (Job *) host[0].job, "invalid job");
    1817
    1918  InitIOBuffer (&buffer, 0x100);
     
    2524    case PCLIENT_DOWN:
    2625      HarvestHost (host[0].pid);
    27       UnlinkJobAndHost (job);
     26      // unlink host & job
     27      job[0].host = NULL;
     28      host[0].job = NULL;
    2829      PutHost (host, PCONTROL_HOST_DOWN, STACK_BOTTOM);
    2930      PutJob (job, PCONTROL_JOB_PENDING, STACK_BOTTOM);
     
    3233
    3334    case PCLIENT_HUNG:
     35      HarvestHost (host[0].pid);
     36      // unlink host & job
     37      job[0].host = NULL;
     38      host[0].job = NULL;
     39      PutHost (host, PCONTROL_HOST_DOWN, STACK_BOTTOM);
    3440      PutJobSetState (job, PCONTROL_JOB_BUSY, STACK_BOTTOM, PCONTROL_JOB_HUNG);
    3541      FreeIOBuffer (&buffer);
     
    3743
    3844    case PCLIENT_GOOD:
    39       if (VerboseMode()) gprint (GP_ERR, "message received (CheckBusyJob)\n");
     45      if (VerboseMode()) gprint (GP_ERR, "message received (CheckBusyJob)");
    4046      break;
    4147
    4248    default:
    43       gprint (GP_ERR, "programming error: unknown status for pclient command\n"); 
    44       exit (1);
     49      ABORT ("unknown status for pclient command"); 
    4550  }
    4651
    4752  /** host is up, need to parse message **/
    4853  p = memstr (buffer.buffer, "STATUS", buffer.Nbuffer);
    49   if (p == NULL) {
    50     gprint (GP_ERR, "programming error: missing STATUS in pclient message\n");
    51     exit (1);
    52   }
     54  ASSERT (p != NULL, "missing STATUS in pclient message");
     55
    5356  sscanf (p, "%*s %s", string);
    54   if (!strcmp(string, "NONE")) {
    55     gprint (GP_ERR, "programming error: no current job\n");
    56     exit (1);
    57   }
     57  ASSERT (strcmp(string, "NONE"), "no current job\n");
     58
    5859  /** no status change, return to BUSY stack **/
    5960  if (!strcmp(string, "BUSY")) {
     61    PutHost (host, PCONTROL_HOST_BUSY, STACK_BOTTOM);
    6062    PutJob (job, PCONTROL_JOB_BUSY, STACK_BOTTOM);
    6163    FreeIOBuffer (&buffer);
     
    6769  if (!strcmp(string, "EXIT")) outstate = PCONTROL_JOB_EXIT;
    6870  if (!strcmp(string, "CRASH")) outstate = PCONTROL_JOB_CRASH;
    69   if (outstate == PCONTROL_JOB_BUSY) {
    70     gprint (GP_ERR, "programming error : should not reach here (CheckJob)\n");
    71     exit (1);
    72   }
     71  ASSERT (outstate != PCONTROL_JOB_BUSY, "should not reach here (CheckJob)");
    7372
    7473  /* parse the exit status and sizes of output buffers */
     
    8079  sscanf (p, "%*s %d", &job[0].stderr_size);
    8180
    82   /** job has exited : move to DONE stack (host still BUSY) **/
     81  // job has exited : move to DONE stack
     82  // the host is still BUSY until job output is gathered (int CheckDoneJob)
     83  // don't unlink job and host yet
     84  PutHost (host, PCONTROL_HOST_BUSY, STACK_BOTTOM);
    8385  PutJobSetState (job, PCONTROL_JOB_DONE, STACK_BOTTOM, outstate);
    8486  FreeIOBuffer (&buffer);
    8587  return (TRUE);
    8688}
    87 
    88 /** need to add timeout check here **/
  • trunk/Ohana/src/opihi/pcontrol/CheckDoneHost.c

    r7917 r8424  
    2020      FreeIOBuffer (&buffer);
    2121      return (FALSE);
    22       /** do we need to close the connection? **/
     22      // XXX do we need to close the connection?
    2323
    2424    case PCLIENT_HUNG:
     25      // XXX should this be DONE or DOWN?/
    2526      PutHost (host, PCONTROL_HOST_DONE, STACK_BOTTOM);
    2627      if (VerboseMode()) gprint (GP_ERR, "host %s is not responding\n", host[0].hostname);
     
    3334
    3435    default:
    35       if (VerboseMode()) gprint (GP_ERR, "unknown status for pclient command: programming error\n"); 
    36       exit (1);
     36      ABORT ("unknown status for pclient command"); 
    3737  }
    3838
    3939  /** successful command, examine result **/
    4040  p = memstr (buffer.buffer, "STATUS", buffer.Nbuffer);
    41   if (p == NULL) {
    42     gprint (GP_ERR, "programming error: missing STATUS in pclient message (CheckDoneHost)\n");
    43     exit (1);
    44   }
     41  ASSERT (p != NULL, "missing STATUS in pclient message (CheckDoneHost)");
     42
    4543  sscanf (p, "%*s %d", &status);
    4644  switch (status) {
    4745    case -1:
    48       gprint (GP_ERR, "programming error: reset syntax error\n");
    49       exit (1);
     46      ABORT ("reset syntax error");
    5047     
    5148    case 0:
     
    6360
    6461    default:
    65       gprint (GP_ERR, "programming error: should not reach here (CheckDoneHost)\n");
    66       exit (1);
     62      ABORT ("should not reach here (CheckDoneHost)");
    6763  }
    68   gprint (GP_ERR, "programming error: should not reach here either (CheckDoneHost)\n");
    69   exit (1);
     64  ABORT ("should not reach here (CheckDoneHost)");
    7065}
    7166
  • trunk/Ohana/src/opihi/pcontrol/CheckDoneJob.c

    r8296 r8424  
    11# include "pcontrol.h"
    22
    3 int CheckDoneJob (Job *job) {
     3int CheckDoneJob (Job *job, Host *host) {
    44 
    5   Host *host;
     5  int success;
    66
    7   if (!GetJobOutput ("stdout", (Host *) job[0].host, &job[0].stdout, job[0].stdout_size)) {
    8     /* strip off first and last lines */
    9     PutJob (job, PCONTROL_JOB_DONE, STACK_BOTTOM);
    10     return (FALSE);
    11   }
     7  ASSERT (host == (Host *) job[0].host, "invalid host");
     8  ASSERT (job == (Job *) host[0].job, "invalid job");
    129
    13   if (!GetJobOutput ("stderr", (Host *) job[0].host, &job[0].stderr, job[0].stderr_size)) {
     10  success = TRUE;
     11  success &= GetJobOutput ("stdout", host, &job[0].stdout, job[0].stdout_size);
     12  success &= GetJobOutput ("stderr", host, &job[0].stderr, job[0].stderr_size);
     13
     14  if (!success) {
     15    // XXX some kind of error?
     16    // XXX try again later?
     17    PutHost (host, PCONTROL_HOST_BUSY, STACK_BOTTOM);
    1418    PutJob (job, PCONTROL_JOB_DONE, STACK_BOTTOM);
    1519    return (FALSE);
     
    1721
    1822  /* job's state is either EXIT or CRASH (verify?) */
    19   host = UnlinkJobAndHost (job);
     23  // unlink host & job
     24  job[0].host = NULL;
     25  host[0].job = NULL;
    2026  PutHost (host, PCONTROL_HOST_DONE, STACK_BOTTOM);
    2127  PutJob (job, job[0].state, STACK_BOTTOM);
  • trunk/Ohana/src/opihi/pcontrol/CheckHost.c

    r7917 r8424  
    55  int status;
    66  IOBuffer buffer;
    7   Job *job;
     7
     8  if (host[0].job != NULL) return (TRUE);
     9
     10  /* if this host has been marked to be turned off, do that and return */
     11  if (host[0].markoff) {
     12    host[0].markoff = FALSE;
     13    StopHost (host);
     14    OffHost (host);
     15    return (TRUE);
     16  }
    817
    918  InitIOBuffer (&buffer, 0x100);
     
    1322    case 0:
    1423      if (VerboseMode()) gprint (GP_ERR, "host %s is down\n", host[0].hostname);
    15       /* if host has a job, job is dead, return to Pending */
    16       job = (Job *) host[0].job;
    17       if (job != NULL) {
    18         UnlinkJobAndHost (job);
    19         PutJob (job, PCONTROL_JOB_PENDING, STACK_BOTTOM);
    20       }
    2124      HarvestHost (host[0].pid);
    2225      PutHost (host, PCONTROL_HOST_DOWN, STACK_BOTTOM);
     
    3639      return (TRUE);
    3740  }
    38   gprint (GP_ERR, "programming error: should not reach here (Check Host)\n");
    39   return (FALSE);
     41  ABORT ("should not reach here (Check Host)");
    4042}
     43
     44// if the host has a job, we skip it (down or crash state will be caught elsewhere)
     45// in fact, just touch the IDLE hosts, not the BUSY hosts?
  • trunk/Ohana/src/opihi/pcontrol/CheckIdleHost.c

    r8296 r8424  
    88  Job *job;
    99
     10  /* if this host has been marked to be turned off, do that and return */
     11  if (host[0].markoff) {
     12    host[0].markoff = FALSE;
     13    StopHost (host);
     14    OffHost (host);
     15    return (TRUE);
     16  }
     17   
    1018  /* search the JOB_PENDING stack for an appropriate job */
    1119  stack = GetJobStack (PCONTROL_JOB_PENDING);
     
    1624    job = (Job *) stack[0].object[i];
    1725    if (job[0].mode != PCONTROL_JOB_NEEDHOST) continue;
    18     if (job[0].hostname == NULL) {
    19       gprint (GP_ERR, "programming error: NEEDHOST hostname missing\n");
    20       exit (2);
    21     }
     26    ASSERT (job[0].hostname != NULL, "NEEDHOST hostname missing");
    2227    if (strcasecmp (job[0].hostname, host[0].hostname)) continue;
    2328
     
    2934    RemoveStackEntry (stack, i);
    3035    UnlockStack (stack);
    31     StartJob (job);
     36    StartJob (job, host);
    3237    return (TRUE);
    3338  }
     
    3742    job = (Job *) stack[0].object[i];
    3843    if (job[0].mode != PCONTROL_JOB_WANTHOST) continue;
    39     if (job[0].hostname == NULL) {
    40       gprint (GP_ERR, "programming error: WANTHOST hostname missing\n");
    41       exit (2);
    42     }
     44    ASSERT (job[0].hostname != NULL, "WANTHOST hostname missing");
    4345    if (strcasecmp (job[0].hostname, host[0].hostname)) continue;
    4446
     
    5052    RemoveStackEntry (stack, i);
    5153    UnlockStack (stack);
    52     StartJob (job);
     54    StartJob (job, host);
    5355    return (TRUE);
    5456  }
     
    6668    RemoveStackEntry (stack, i);
    6769    UnlockStack (stack);
    68     StartJob (job);
    69    return (TRUE);
     70    StartJob (job, host);
     71    return (TRUE);
    7072  }
    7173
     
    7678    job = (Job *) stack[0].object[i];
    7779    if (job[0].mode != PCONTROL_JOB_WANTHOST) continue;
    78     // test the job age and skip if too young
     80    // XXX test the job age and skip if too young
    7981
    8082    /* we have found an appropriate job; link it to the host and send to StartJob */
     
    8587    RemoveStackEntry (stack, i);
    8688    UnlockStack (stack);
    87     StartJob (job);
     89    StartJob (job, host);
    8890   return (TRUE);
    8991  }
  • trunk/Ohana/src/opihi/pcontrol/CheckSystem.c

    r8296 r8424  
    11# include "pcontrol.h"
     2# define DEBUG 0
    23
    34static struct timeval lastlive = {0, 0};
     5static int RunSystem = FALSE;
     6
     7int SetRunSystem (int state) {
     8  int oldstate;
     9  oldstate = RunSystem;
     10  RunSystem = state;
     11  return oldstate;
     12}
    413
    514int CheckSystem () {
     
    2938  }
    3039
    31   if (0) {
     40  if (DEBUG) {
    3241    Stack *stack;
    3342    int Nidle, Ndown, Nbusy;
     
    4453}
    4554
     55void *CheckSystem_Threaded (void *data) {
     56
     57  struct timeval now;
     58  float dtime;
     59
     60  gprintInit ();
     61
     62  while (1) {
     63    // stop here if the user-thread requests (no objects in flight)
     64    TestCheckPoint ();
     65
     66    // don't run the system checks if RunSystem is FALSE
     67    if (!RunSystem) {
     68      usleep (50000);
     69      continue;
     70    }
     71
     72    // we want to give each block a maximum allowed time
     73    CheckIdleHosts(0.020); /* submit a new job */
     74
     75    CheckBusyJobs(0.020);  /* get job status */
     76    CheckDoneJobs(0.020);  /* harvest job stdout/stderr */
     77    CheckKillJobs(0.020);  /* harvest job stdout/stderr */
     78
     79    CheckDoneHosts(0.020); /* reset the host */
     80    CheckDownHosts(0.100); /* launch the host */
     81
     82    /* always allow at least one test */
     83    /* most tests require about 2ms per host. 
     84       CheckDoneJobs must depend on the size of the output buffer */
     85
     86    gettimeofday (&now, (void *) NULL);
     87    dtime = DTIME (now, lastlive);
     88    if (dtime > 1.0) {
     89      CheckLiveHosts(0.040);
     90      lastlive = now;
     91    }
     92
     93    if (DEBUG) {
     94      Stack *stack;
     95      int Nidle, Ndown, Nbusy;
     96      stack = GetHostStack (PCONTROL_HOST_IDLE);
     97      Nidle = stack[0].Nobject;
     98      stack = GetHostStack (PCONTROL_HOST_DOWN);
     99      Ndown = stack[0].Nobject;
     100      stack = GetHostStack (PCONTROL_HOST_BUSY);
     101      Nbusy = stack[0].Nobject;
     102      gprint (GP_ERR, "busy, idle, down: %2d %2d %2d\n", Nbusy, Nidle, Ndown);
     103    }
     104  }
     105  return (NULL);
     106}
     107
    46108int CheckBusyJobs (float MaxDelay) {
    47109
    48110  struct timeval start, stop;
    49111  int i, Nobject;
    50   Stack *stack;
     112  Stack *hoststack;
     113  Stack *jobstack;
    51114  Job   *job;
     115  Host  *host;
    52116  float dtime;
    53117
     
    65129  dtime = 0.0;
    66130  for (i = 0; (i < Nobject) && (dtime < MaxDelay); i++) {
    67     /* pull both job and host from their stacks */
    68     /* XXX is the subject to the Dangerous Embrace? */
     131    // pull both job and host from their stacks
    69132    LockStack (hoststack);
    70133    job = PullStackByLocation (jobstack, STACK_TOP);
     
    73136      break;
    74137    }
    75     host = RemoveStackByID (hoststack, job[0].host[0].HostID);
     138    host = (Host *) job[0].host;
     139    RemoveStackByID (hoststack, host[0].HostID);
    76140    UnlockStack (hoststack);
    77141
    78     CheckBusyJob (job);
    79     gettimeofday (&stop, (void *) NULL);
    80     dtime = DTIME (stop, start);
    81   }
    82   if (0 && (Nobject > 0)) gprint (GP_ERR, "checked %d of %d jobs\n", i, Nobject);
     142    CheckBusyJob (job, host);
     143    gettimeofday (&stop, (void *) NULL);
     144    dtime = DTIME (stop, start);
     145  }
     146  if (DEBUG && (Nobject > 0)) gprint (GP_ERR, "checked %d of %d jobs\n", i, Nobject);
    83147  return (TRUE);
    84148}
     
    88152  struct timeval start, stop;
    89153  int i, Nobject;
    90   Stack *stack;
     154  Stack *hoststack;
     155  Stack *jobstack;
    91156  Job   *job;
    92   float dtime;
    93 
    94   /* Loop through objects on the stack, no more than once. see note above */
    95   stack = GetJobStack (PCONTROL_JOB_DONE);
    96   Nobject = stack[0].Nobject;
    97 
    98   /* always allow at least one test */
    99   gettimeofday (&start, (void *) NULL);
    100   dtime = 0.0;
    101   for (i = 0; (i < Nobject) && (dtime < MaxDelay); i++) {
    102     job = PullStackByLocation (stack, STACK_TOP);
    103     if (job == NULL) break;
    104     CheckDoneJob (job);
    105     gettimeofday (&stop, (void *) NULL);
    106     dtime = DTIME (stop, start);
    107   }
    108   if (0 && (Nobject > 0)) gprint (GP_ERR, "checked %d of %d jobs\n", i, Nobject);
     157  Host  *host;
     158  float dtime;
     159
     160  /* Loop through objects on the stack, no more than once. see note above */
     161  hoststack = GetHostStack (PCONTROL_HOST_BUSY);
     162  jobstack  = GetJobStack (PCONTROL_JOB_DONE);
     163  Nobject   = jobstack[0].Nobject;
     164
     165  /* always allow at least one test */
     166  gettimeofday (&start, (void *) NULL);
     167  dtime = 0.0;
     168  for (i = 0; (i < Nobject) && (dtime < MaxDelay); i++) {
     169    LockStack (hoststack);
     170    job = PullStackByLocation (jobstack, STACK_TOP);
     171    if (job == NULL) {
     172      UnlockStack (hoststack);
     173      break;
     174    }
     175    host = (Host *) job[0].host;
     176    RemoveStackByID (hoststack, host[0].HostID);
     177    UnlockStack (hoststack);
     178
     179    CheckDoneJob (job, host);
     180    gettimeofday (&stop, (void *) NULL);
     181    dtime = DTIME (stop, start);
     182  }
     183  if (DEBUG && (Nobject > 0)) gprint (GP_ERR, "checked %d of %d jobs\n", i, Nobject);
    109184  return (TRUE);
    110185}
     
    114189  struct timeval start, stop;
    115190  int i, Nobject;
    116   Stack *stack;
     191  Stack *hoststack;
     192  Stack *jobstack;
    117193  Job   *job;
    118   float dtime;
    119 
    120   /* Loop through objects on the stack, no more than once. see note above */
    121   stack = GetJobStack (PCONTROL_JOB_KILL);
    122   Nobject = stack[0].Nobject;
    123 
    124   /* always allow at least one test */
    125   gettimeofday (&start, (void *) NULL);
    126   dtime = 0.0;
    127   for (i = 0; (i < Nobject) && (dtime < MaxDelay); i++) {
    128     job = PullStackByLocation (stack, STACK_TOP);
    129     if (job == NULL) break;
    130     KillJob (job);
    131     gettimeofday (&stop, (void *) NULL);
    132     dtime = DTIME (stop, start);
    133   }
    134   if (0 && (Nobject > 0)) gprint (GP_ERR, "checked %d of %d jobs\n", i, Nobject);
     194  Host  *host;
     195  float dtime;
     196
     197  /* Loop through objects on the stack, no more than once. see note above */
     198  hoststack = GetHostStack (PCONTROL_HOST_BUSY);
     199  jobstack = GetJobStack (PCONTROL_JOB_KILL);
     200  Nobject = jobstack[0].Nobject;
     201
     202  /* always allow at least one test */
     203  gettimeofday (&start, (void *) NULL);
     204  dtime = 0.0;
     205  for (i = 0; (i < Nobject) && (dtime < MaxDelay); i++) {
     206    LockStack (hoststack);
     207    job = PullStackByLocation (jobstack, STACK_TOP);
     208    if (job == NULL) {
     209      UnlockStack (hoststack);
     210      break;
     211    }
     212    host = (Host *) job[0].host;
     213    RemoveStackByID (hoststack, host[0].HostID);
     214    UnlockStack (hoststack);
     215
     216    KillJob (job, host);
     217    gettimeofday (&stop, (void *) NULL);
     218    dtime = DTIME (stop, start);
     219  }
     220  if (DEBUG && (Nobject > 0)) gprint (GP_ERR, "checked %d of %d jobs\n", i, Nobject);
    135221  return (TRUE);
    136222}
     
    158244    dtime = DTIME (stop, start);
    159245  }
    160   if (0) gprint (GP_ERR, "checked %d hosts\n", i);
     246  if (DEBUG) gprint (GP_ERR, "checked %d hosts\n", i);
    161247  return (TRUE);
    162248}
     
    180266    host = PullStackByLocation (stack, STACK_TOP);
    181267    if (host == NULL) break;
     268    if (host[0].markoff) {
     269      host[0].markoff = FALSE;
     270      OffHost (host);
     271      return (TRUE);
     272    }
    182273    dtime = DTIME (host[0].nexttry, start);
    183274    if (dtime > 0) {
     
    189280    dtime = DTIME (stop, start);
    190281  }
    191   if (0) gprint (GP_ERR, "checked %d hosts\n", i);
     282  if (DEBUG) gprint (GP_ERR, "checked %d hosts\n", i);
    192283  return (TRUE);
    193284}
     
    219310    dtime = DTIME (stop, start);
    220311  }
    221   if (0) gprint (GP_ERR, "checked %d hosts\n", i);
    222   return (TRUE);
    223 }
    224 
    225 /* this is just a heartbeat check */
     312  if (DEBUG) gprint (GP_ERR, "checked %d hosts\n", i);
     313  return (TRUE);
     314}
     315
     316/* this is just a heartbeat check (only IDLE hosts) */
    226317int CheckLiveHosts (float MaxDelay) {
    227318
     
    246337    dtime = DTIME (stop, start);
    247338  }
    248   if (0) gprint (GP_ERR, "checked %d idle hosts\n", i);
    249 
    250   /* Loop through objects on the stack, no more than once. see note above */
    251   stack = GetHostStack (PCONTROL_HOST_BUSY);
    252   Nobject = stack[0].Nobject;
    253 
    254   dtime = 0.0;
    255   for (i = 0; (i < Nobject) && (dtime < MaxDelay); i++) {
    256     host = PullStackByLocation (stack, STACK_TOP);
    257     if (host == NULL) break;
    258     CheckHost (host);
    259     gettimeofday (&stop, (void *) NULL);
    260     dtime = DTIME (stop, start);
    261   }
    262   if (0) gprint (GP_ERR, "checked %d busy hosts\n", i);
     339  if (DEBUG) gprint (GP_ERR, "checked %d idle hosts\n", i);
    263340  return (TRUE);
    264341}
  • trunk/Ohana/src/opihi/pcontrol/HostOps.c

    r8296 r8424  
    1515}
    1616
     17char *GetHostStackName (int StackID) {
     18  switch (StackID) {
     19    case PCONTROL_HOST_IDLE: return ("IDLE");
     20    case PCONTROL_HOST_DOWN: return ("DOWN");
     21    case PCONTROL_HOST_DONE: return ("DONE");
     22    case PCONTROL_HOST_BUSY: return ("BUSY");
     23    case PCONTROL_HOST_OFF:  return ("OFF");
     24  }
     25  gprint (GP_ERR, "error: unknown host stack : programming error\n");
     26  exit (1);
     27}
     28
    1729Stack *GetHostStack (int StackID) {
    1830  switch (StackID) {
    19     case PCONTROL_HOST_IDLE:
    20       return (HostPool_Idle);
    21     case PCONTROL_HOST_DOWN:
    22       return (HostPool_Down);
    23     case PCONTROL_HOST_DONE:
    24       return (HostPool_Done);
    25     case PCONTROL_HOST_BUSY:
    26       return (HostPool_Busy);
    27     case PCONTROL_HOST_OFF:
    28       return (HostPool_Off);
    29     default:
    30       gprint (GP_ERR, "error: unknown host stack : programming error\n");
    31       exit (1);
     31    case PCONTROL_HOST_IDLE: return (HostPool_Idle);
     32    case PCONTROL_HOST_DOWN: return (HostPool_Down);
     33    case PCONTROL_HOST_DONE: return (HostPool_Done);
     34    case PCONTROL_HOST_BUSY: return (HostPool_Busy);
     35    case PCONTROL_HOST_OFF:  return (HostPool_Off);
    3236  }
    3337  gprint (GP_ERR, "error: unknown host stack : programming error\n");
  • trunk/Ohana/src/opihi/pcontrol/JobOps.c

    r8296 r8424  
    1717}
    1818
     19char *GetJobStackName (int StackID) {
     20  switch (StackID) {
     21    case PCONTROL_JOB_PENDING: return ("PENDING");
     22    case PCONTROL_JOB_BUSY:    return ("BUSY");
     23    case PCONTROL_JOB_DONE:    return ("DONE");
     24    case PCONTROL_JOB_KILL:    return ("KILL");
     25    case PCONTROL_JOB_EXIT:    return ("EXIT");
     26    case PCONTROL_JOB_CRASH:   return ("CRASH");
     27  }
     28  gprint (GP_ERR, "error: unknown host stack : programming error\n");
     29  exit (1);
     30}
     31
    1932Stack *GetJobStack (int StackID) {
    2033  switch (StackID) {
    21     case PCONTROL_JOB_PENDING:
    22       return (JobPool_Pending);
    23     case PCONTROL_JOB_BUSY:
    24       return (JobPool_Busy);
    25     case PCONTROL_JOB_DONE:
    26       return (JobPool_Done);
    27     case PCONTROL_JOB_KILL:
    28       return (JobPool_Kill);
    29     case PCONTROL_JOB_EXIT:
    30       return (JobPool_Exit);
    31     case PCONTROL_JOB_CRASH:
    32       return (JobPool_Crash);
    33     default:
    34       gprint (GP_ERR, "error: unknown job stack : programming error\n");
    35       exit (1);
     34    case PCONTROL_JOB_PENDING: return (JobPool_Pending);
     35    case PCONTROL_JOB_BUSY:    return (JobPool_Busy);
     36    case PCONTROL_JOB_DONE:    return (JobPool_Done);
     37    case PCONTROL_JOB_KILL:    return (JobPool_Kill);
     38    case PCONTROL_JOB_EXIT:    return (JobPool_Exit);
     39    case PCONTROL_JOB_CRASH:   return (JobPool_Crash);
    3640  }
    3741  gprint (GP_ERR, "error: unknown job stack : programming error\n");
     
    191195  JobID = job[0].JobID;
    192196  PutJob (job, PCONTROL_JOB_PENDING, STACK_BOTTOM);
    193   gprint (GP_ERR, "added new job\n");
     197  if (VerboseMode()) gprint (GP_ERR, "added new job\n");
    194198  return (JobID);
    195199}
     
    210214  FREE (job);
    211215}
    212 
    213 /* unlink job and host, pull host from its stack */
    214 Host *UnlinkJobAndHost (Job *job) {
    215 
    216   Host *host;
    217 
    218   host = (Host *) job[0].host;
    219   if (host == NULL) {
    220     gprint (GP_ERR, "programming error: job has no host\n");
    221     exit (2);
    222   }
    223 
    224   /* unlink host & job */
    225   job[0].host = NULL;
    226   host[0].job = NULL;
    227  
    228   /* remove host from correct stack */
    229   XXXX does this step asuume the host is in this stack??
    230   if (PullHostFromStackByID (host[0].stack, host[0].HostID) == NULL) {
    231     gprint (GP_ERR, "programming error: host is not found in current stack\n");
    232     exit (2);
    233   }
    234   return (host);
    235 }
  • trunk/Ohana/src/opihi/pcontrol/KillJob.c

    r8296 r8424  
    11# include "pcontrol.h"
    22
    3 int KillJob (Job *job) {
     3int KillJob (Job *job, Host *host) {
    44 
    5   Host *host;
    65  IOBuffer buffer;
    76  int status;
    87  char *p;
    98
    10   /** must have a valid host : if not? **/
    11   host = (Host *) job[0].host;
     9  ASSERT (host == (Host *) job[0].host, "invalid host");
     10  ASSERT (job  == (Job *) host[0].job, "invalid job");
    1211
    1312  InitIOBuffer (&buffer, 0x100);
     
    1918    case PCLIENT_DOWN:
    2019      HarvestHost (host[0].pid);
    21       UnlinkJobAndHost (job);
    22       PutJob (job, PCONTROL_JOB_PENDING, STACK_BOTTOM);
     20      // unlink host & job
     21      job[0].host = NULL;
     22      host[0].job = NULL;
    2323      PutHost (host, PCONTROL_HOST_DOWN, STACK_BOTTOM);
     24      PutJob (job, PCONTROL_JOB_CRASH, STACK_BOTTOM);
    2425      FreeIOBuffer (&buffer);
    2526      return (FALSE);
    2627
    2728    case PCLIENT_HUNG:
     29      HarvestHost (host[0].pid);
     30      // unlink host & job
     31      job[0].host = NULL;
     32      host[0].job = NULL;
     33      PutHost (host, PCONTROL_HOST_DOWN, STACK_BOTTOM);
    2834      PutJobSetState (job, PCONTROL_JOB_BUSY, STACK_BOTTOM, PCONTROL_JOB_HUNG);
    2935      FreeIOBuffer (&buffer);
     
    3137
    3238    case PCLIENT_GOOD:
    33       gprint (GP_ERR, "message received (KillJob)\n"); 
     39      if (VerboseMode()) gprint (GP_ERR, "message received (KillJob)\n"); 
    3440      break;
    3541
    3642    default:
    37       gprint (GP_ERR, "unknown status for pclient command: programming error\n"); 
    38       exit (1);
     43      ABORT ("unknown status for pclient command"); 
    3944  }
    4045
    4146  /** host is up, need to parse message **/
    4247  p = memstr (buffer.buffer, "STATUS", buffer.Nbuffer);
    43   if (p == NULL) {
    44     gprint (GP_ERR, "missing STATUS in pclient message : programming error\n");
    45     exit (1);
    46   }
    47   gprint (GP_ERR, "client message: %s\n", buffer.buffer);
     48  ASSERT (p != NULL, "missing STATUS in pclient message");
     49  if (VerboseMode()) gprint (GP_ERR, "client message: %s\n", buffer.buffer);
     50
    4851  sscanf (p, "%*s %d", &status);
    4952  FreeIOBuffer (&buffer);
     
    5255  switch (status) {
    5356    case -1:
    54       gprint (GP_ERR, "programming error (syntax error to pclient)\n");
    55       return (FALSE);
    56       break;
     57      ABORT ("syntax error to pclient");
    5758    case 0:
    5859      gprint (GP_ERR, "failure to kill child process\n");
    59       PutJob (job, PCONTROL_JOB_BUSY, STACK_BOTTOM);
     60      PutHost (host, PCONTROL_HOST_BUSY, STACK_BOTTOM);
     61      PutJob (job, PCONTROL_JOB_KILL, STACK_BOTTOM);
    6062      return (FALSE);
    6163    case 1:
    62       PutJobSetState (job, PCONTROL_JOB_DONE, STACK_BOTTOM, PCONTROL_JOB_CRASH);
     64      gprint (GP_ERR, "killing job %s on %s\n", job[0].argv[0], host[0].hostname);
     65      // unlink host & job
     66      job[0].host = NULL;
     67      host[0].job = NULL;
     68      PutHost (host, PCONTROL_HOST_IDLE, STACK_BOTTOM);
     69      PutJob (job, PCONTROL_JOB_CRASH, STACK_BOTTOM);
    6370      return (TRUE);
    6471    case 2:
    65       gprint (GP_ERR, "programming error (client has no job)\n");
    66       return (FALSE);
     72      ABORT ("client has no job");
    6773  }
    68 
    69   gprint (GP_ERR, "programming error : should not reach here (CheckJob)\n");
    70   exit (1);
     74  ABORT ("should not reach here (KillJob)");
    7175}
    72 
    73 /** XXX need to do something appropriate with host? ***/
  • trunk/Ohana/src/opihi/pcontrol/Makefile

    r8296 r8424  
    1919# link flags
    2020LIBS    =       -L$(LIB) -L$(LLIB) -L$(XLIB)
    21 LIBS1   =       -lsocket -lnsl -lreadline $(TLIB) -lkapa -lFITS -lohana -lX11 -lm
     21LIBS1   =       -lsocket -lnsl -lreadline $(TLIB) -lkapa -lFITS -lohana -lX11 -lpthread -lm
    2222LIBS2   =       -lbasiccmd -lshell -ldata
    2323LFLAGS  =       $(LIBS) $(LIBS2) $(LIBS1)
     
    2727funcs = \
    2828$(SDIR)/init.$(ARCH).o \
    29 $(SDIR)/pclient.$(ARCH).o \
    3029$(SDIR)/pcontrol.$(ARCH).o \
    3130$(SDIR)/rconnect.$(ARCH).o \
     
    3534$(SDIR)/CheckHost.$(ARCH).o \
    3635$(SDIR)/CheckIdleHost.$(ARCH).o \
     36$(SDIR)/CheckPoint.$(ARCH).o \
    3737$(SDIR)/CheckSystem.$(ARCH).o \
    3838$(SDIR)/GetJobOutput.$(ARCH).o \
     
    4141$(SDIR)/JobOps.$(ARCH).o \
    4242$(SDIR)/StackOps.$(ARCH).o \
     43$(SDIR)/PclientCommand.$(ARCH).o \
    4344$(SDIR)/ResetJob.$(ARCH).o \
    4445$(SDIR)/StartHost.$(ARCH).o \
  • trunk/Ohana/src/opihi/pcontrol/StackOps.c

    r8296 r8424  
    2525  ALLOCATE (stack[0].id,     int,    stack[0].NOBJECT);
    2626
    27   // we need to use a mutex of type
    28   // stack[0].mutex = PTHREAD_MUTEX_INITIALIZER;
    29 
     27# ifdef THREADED
     28  pthread_mutex_init (&stack[0].mutex, NULL);
     29# endif
    3030  return (stack);
    3131}
     
    7272void *PullStackByLocation (Stack *stack, int where) {
    7373
    74   int i;
    7574  void *object;
    7675 
     
    9695void *PullStackByName (Stack *stack, char *name) {
    9796
    98   int i, j;
     97  int i;
    9998  void *object;
    10099
     
    106105    /* here is the element of interest */
    107106    object = stack[0].object[i];
    108     RemoveStackEntry (i);
     107    RemoveStackEntry (stack, i);
    109108    UnlockStack (stack);
    110109    return (object);
     
    117116void *PullStackByID (Stack *stack, int id) {
    118117
    119   int i, j;
     118  int i;
    120119  void *object;
    121120 
     
    137136/* should only be called if you know where is a valid entry */
    138137int RemoveStackEntry (Stack *stack, int where) {
     138
     139  int i;
    139140
    140141  if (where < 0) abort();
     
    152153}
    153154
    154 /* should only be called if manually lock the stack */
    155 int RemoveStackByID (Stack *stack, int id) {
    156 
    157   int i, j;
     155/* should only be called if you manually lock the stack */
     156void *RemoveStackByID (Stack *stack, int id) {
     157
     158  int i;
    158159  void *object;
    159160 
     
    205206
    206207void LockStack (Stack *stack) {
     208# ifdef THREADED
     209  pthread_mutex_lock (&stack[0].mutex);
     210# endif
    207211  return;
    208212}
    209213
    210214void UnlockStack (Stack *stack) {
     215# ifdef THREADED
     216  pthread_mutex_unlock (&stack[0].mutex);
     217# endif
    211218  return;
    212219}
    213 
    214 // Safe with PTHREAD_MUTEX_INITIALIZER lock
  • trunk/Ohana/src/opihi/pcontrol/StartHost.c

    r8296 r8424  
    1313  if (VarConfig ("COMMAND", "%s", command) == NULL) strcpy (command, "ssh");
    1414  if (VarConfig ("SHELL", "%s", shell)     == NULL) strcpy (shell, "pclient");
     15
     16  gprint (GP_ERR, "starting host within thread %d\n", pthread_self());
    1517
    1618  pid = rconnect (command, host[0].hostname, shell, stdio);
  • trunk/Ohana/src/opihi/pcontrol/StartJob.c

    r7917 r8424  
    11# include "pcontrol.h"
    22
    3 int StartJob (Job *job) {
     3int StartJob (Job *job, Host *host) {
    44
    55  int  i, Nline, status;
    66  char *line, *p;
    7   Host *host;
    87  IOBuffer buffer;
    98
     
    1110
    1211  /* job must have assigned host */
    13   host = (Host *) job[0].host;
    14   if (host == NULL) {
    15     gprint (GP_ERR, "programming error: no assigned host\n");
    16     exit (1);
    17   }
     12  ASSERT (host == (Host *) job[0].host, "invalid host");
     13  ASSERT (job  == (Job *) host[0].job, "invalid job");
    1814
    1915  /* construct command line : job arg0 arg1 ... argN\n */
     
    4844
    4945    default:
    50       if (VerboseMode()) gprint (GP_ERR, "unknown status for pclient command: programming error\n"); 
    51       exit (1);
     46      ABORT ("unknown status for pclient command"); 
    5247  }
    5348
    5449  /* check on result of pclient command */
    5550  p = memstr (buffer.buffer, "STATUS", buffer.Nbuffer);
    56   if (p == NULL) {
    57     gprint (GP_ERR, "programming error: missing STATUS in pclient message\n");
    58     exit (1);
    59   }
     51  ASSERT (p != NULL, "missing STATUS in pclient message");
     52
    6053  sscanf (p, "%*s %d", &status);
    6154  switch (status) {
     
    6558
    6659    case -2:
    67       gprint (GP_ERR, "programming error: syntax error in pclient command\n");
    68       exit (1);
     60      ABORT ("syntax error in pclient command");
    6961
    7062    case -3:
    71       gprint (GP_ERR, "programming error: existing child on pclient\n");
    72       exit (1);
     63      ABORT ("existing child on pclient");
    7364
    7465    default:
     
    8172  }
    8273  /* we should never reach here */
    83   gprint (GP_ERR, "programming error: should not reach here (StartJob)\n");
    84   exit (1);
     74  ABORT ("should not reach here (StartJob)");
    8575
    8676failure:
    87   /* unlink host & job */
     77  // unlink host & job
    8878  job[0].host = NULL;
    8979  host[0].job = NULL;
     
    9484  return (FALSE);
    9585}
    96 
    97 /** note : host and job popped off stacks : can't use UnlinkJobAndHost **/
  • trunk/Ohana/src/opihi/pcontrol/StopHosts.c

    r8296 r8424  
    1919int DownHosts () {
    2020
    21   int i, Nobject;
    2221  Stack *stack;
    2322  Host  *host;
     
    6867int HarvestHost (int pid) {
    6968 
    70   int       result;
    71   int       waitstatus;
     69  int i, result, waitstatus;
    7270
    73   /* I probably should loop a few time with max timeout larger than 10ms... */
    74   usleep (10000);
    75   result = waitpid (pid, &waitstatus, WNOHANG);
     71  gprint (GP_ERR, "harvesting within thread %d\n", pthread_self());
     72  gprint (GP_ERR, "child process %d is down, wait for exit status\n", pid);
     73 
     74  // Loop a few times waiting for child to exit
     75  for (i = 0; i < 50; i++) {
     76    result = waitpid (pid, &waitstatus, WNOHANG);
     77    if ((result == -1) && (errno == ECHILD)) {
     78      usleep (10000);
     79      continue;
     80    } else {
     81      break;
     82    }
     83  }
    7684  switch (result) {
    7785    case -1:  /* error with waitpid */
     
    8189          gprint (GP_ERR, "did process already exit?  programming error?\n");
    8290          break;
     91        case EINTR:
    8392        case EINVAL:
    84           gprint (GP_ERR, "error EINVAL (waitpid): programming error\n");
    85           exit (1);
    86         case EINTR:
    87           gprint (GP_ERR, "error EINTR (waitpid): programming error\n");
    88           exit (1);
    8993        default:
    90           gprint (GP_ERR, "unknown error for waitpid (%d): programming error\n", errno);
    91           exit (1);
     94          perror ("unexpected error");
     95          ABORT ("(HarvestHost)");
    9296      }
    9397      break;
  • trunk/Ohana/src/opihi/pcontrol/check.c

    r8296 r8424  
    11# include "pcontrol.h"
    2 
    3 char jobstate[7][32]  = {"PENDING", "BUSY", "HUNG", "DONE", "KILL", "EXIT", "CRASH"};
    4 char hoststate[5][32] = {"IDLE",    "BUSY", "DOWN", "DONE", "OFF"};
    52
    63int check (int argc, char **argv) {
     
    1815  if (!strcasecmp (argv[1], "JOB")) {
    1916    JobID = atoi (argv[2]);
     17
     18    SetCheckPoint ();  // ensure the JOB is on one of the stacks
    2019    job = PullJobByID (JobID, &StackID);
    2120    if (job == NULL) {
    2221      gprint (GP_LOG, "job not found\n");
     22      ClearCheckPoint ();
    2323      return (FALSE);
    2424    }
    25     gprint (GP_LOG, "STATUS %s\n", jobstate[StackID]);
     25    gprint (GP_LOG, "STATUS %s\n", GetJobStackName(StackID));
    2626    gprint (GP_LOG, "EXITST %d\n", job[0].exit_status);
    2727    gprint (GP_LOG, "STDOUT %d\n", job[0].stdout_size);
    2828    gprint (GP_LOG, "STDERR %d\n", job[0].stderr_size);
    2929    PutJob (job, StackID, STACK_BOTTOM);
     30    ClearCheckPoint ();
    3031    return (TRUE);
    3132  }
     
    3334  if (!strcasecmp (argv[1], "HOST")) {
    3435    HostID = atoi (argv[2]);
     36
     37    SetCheckPoint ();  // ensure the HOST is on one of the stacks
    3538    host = PullHostByID (HostID, &StackID);
    3639    if (host == NULL) {
    3740      gprint (GP_LOG, "host not found\n");
     41      ClearCheckPoint ();
    3842      return (FALSE);
    3943    }
    40     gprint (GP_LOG, "host %s\n", hoststate[StackID]);
     44    gprint (GP_LOG, "host %s\n", GetHostStackName(StackID));
    4145    PutHost (host, StackID, STACK_BOTTOM);
     46    ClearCheckPoint ();
    4247    return (TRUE);
    4348  }
     
    4651  return (FALSE);
    4752}
    48 
    49 XXX how do I handle objects which are in flight??
  • trunk/Ohana/src/opihi/pcontrol/host.c

    r8296 r8424  
    11# include "pcontrol.h"
    22
     3// we use CheckPoints in this function to prevent objects in flight from being missing.
    34int host (int argc, char **argv) {
    45
    5   int N, Ns;
    66  int StackID;
    77  IDtype HostID;
     
    2121      return (FALSE);
    2222    }
     23    host[0].markoff = FALSE;
    2324    DownHost (host);
    2425    return (TRUE);
    2526  }
    2627  if (!strcasecmp (argv[1], "RETRY")) {
     28    // no need to use a check point [thief: CheckDownHost (DOWN->IDLE)]
    2729    host = PullHostFromStackByName (PCONTROL_HOST_DOWN, argv[2]);
    2830    if (!host) {
     
    3941  }
    4042  if (!strcasecmp (argv[1], "CHECK")) {
     43    SetCheckPoint ();  // ensure the host is on one of the stacks
    4144    host = PullHostByName (argv[2], &StackID);
    42     switch (StackID) {
    43       case PCONTROL_HOST_IDLE:
    44         gprint (GP_LOG, "host %s is IDLE\n", argv[2]);
    45       case PCONTROL_HOST_BUSY:
    46         gprint (GP_LOG, "host %s is BUSY\n", argv[2]);
    47       case PCONTROL_HOST_DONE:
    48         gprint (GP_LOG, "host %s is DONE\n", argv[2]);
    49       case PCONTROL_HOST_DOWN:
    50         gprint (GP_LOG, "host %s is DOWN\n", argv[2]);
    51       case PCONTROL_HOST_OFF:
    52         gprint (GP_LOG, "host %s is OFF\n", argv[2]);
    53       default:
    54         gprint (GP_LOG, "host %s not found\n", argv[2]);
    55         return (FALSE);
     45    if (host == NULL) {
     46      gprint (GP_LOG, "host %s not found\n", argv[2]);
     47      ClearCheckPoint ();
     48      return (FALSE);
    5649    }
    5750    PutHost (host, StackID, STACK_BOTTOM);
    58     return (FALSE);
     51    ClearCheckPoint ();
     52
     53    gprint (GP_LOG, "host %s is %s\n", argv[2], GetHostStackName (StackID));
     54    return (TRUE);
    5955  }
    6056  if (!strcasecmp (argv[1], "OFF")) {
     57    SetCheckPoint (); // ensure we can find the specified host
     58    host = PullHostByName (argv[2], &StackID);
     59    if (host == NULL) {
     60      gprint (GP_LOG, "host %s not found\n", argv[2]);
     61      ClearCheckPoint ();
     62      return (FALSE);
     63    }
     64    host[0].markoff = TRUE;
     65    PutHost (host, StackID, STACK_BOTTOM);
     66    ClearCheckPoint ();
     67    return (TRUE);
     68  }
     69
     70# if 0
    6171    host = PullHostFromStackByName (PCONTROL_HOST_IDLE, argv[2]);
    6272    if (host) {
    6373      StopHost (host);
    6474      OffHost (host);
     75      ClearCheckPoint ();
    6576      return (TRUE);
    6677    }
     
    6879    if (host) {
    6980      OffHost (host);
     81      ClearCheckPoint ();
    7082      return (TRUE);
    7183    }
    72     /* XXX the 'markoff' flag is not being used */
     84    /* XXX the 'markoff' flag is not being checked */
    7385    host = PullHostFromStackByName (PCONTROL_HOST_BUSY, argv[2]);
    7486    if (host) {
    7587      host[0].markoff  = TRUE;
    7688      PutHost (host, PCONTROL_HOST_BUSY, STACK_BOTTOM);
     89      ClearCheckPoint ();
    7790      return (TRUE);
    7891    }
    79     gprint (GP_LOG, "host %s is not BUSY, IDLE, or DOWN\n", argv[2]);
     92    /* XXX the 'markoff' flag is not being checked */
     93    host = PullHostFromStackByName (PCONTROL_HOST_DONE, argv[2]);
     94    if (host) {
     95      host[0].markoff  = TRUE;
     96      PutHost (host, PCONTROL_HOST_DONE, STACK_BOTTOM);
     97      ClearCheckPoint ();
     98      return (TRUE);
     99    }
     100    gprint (GP_LOG, "host %s is not found (already off?)\n", argv[2]);
     101    ClearCheckPoint ();
    80102    return (FALSE);
    81103  }
     104# endif
     105
    82106  if (!strcasecmp (argv[1], "DELETE")) {
     107    // a check point is not required: no possible thief
    83108    host = PullHostFromStackByName (PCONTROL_HOST_OFF, argv[2]);
    84109    if (!host) {
  • trunk/Ohana/src/opihi/pcontrol/init.c

    r8296 r8424  
    88int jobstack    PROTO((int, char **));
    99int kill_pc     PROTO((int, char **));
    10 int pulse       PROTO((int, char **));
    1110int status      PROTO((int, char **));
    1211int run         PROTO((int, char **));
     
    1615int verbose     PROTO((int, char **));
    1716int version         PROTO((int, char **));
     17
     18// pulse is only available in the un-threaded version
     19int pulse       PROTO((int, char **));
    1820
    1921static Command cmds[] = { 
     
    2527  {"verbose",   verbose,   "set the verbose mode for job"},
    2628  {"version",   version,   "show version information"},
    27   {"pulse",     pulse,     "set system pulse"},
    2829  {"job",       job,       "add job"},
    2930  {"jobstack",  jobstack,  "list jobs for a single stack"},
     
    3334  {"stderr",    stderr_pc, "get stderr buffer for job"},
    3435  {"stdout",    stdout_pc, "get stdout buffer for job"},
     36# ifndef THREADED
     37  {"pulse",     pulse,     "set system pulse"},
     38# endif
    3539};
    3640
     
    4246    AddCommand (&cmds[i]);
    4347  }
    44 
    45   /* XXX temporary : put this elsewhere? */
    46   InitJobStacks ();
    47   InitHostStacks ();
    4848}
  • trunk/Ohana/src/opihi/pcontrol/pcontrol.c

    r8179 r8424  
    1010void program_init (int *argc, char **argv) {
    1111 
     12# ifdef THREADED 
     13  pthread_t clientsThread;
     14# endif
     15
    1216  auto_break = TRUE;
    1317
     
    1822  gprintInit ();
    1923
     24  InitJobStacks ();
     25  InitHostStacks ();
     26
    2027  rl_readline_name = opihi_name;
    2128  rl_attempted_completion_function = command_completer;
     29# ifdef THREADED
     30  pthread_create (&clientsThread, NULL, &CheckSystem_Threaded, NULL);
     31  rl_event_hook = NULL;
     32  rl_set_keyboard_input_timeout (1000);
     33# else
    2234  rl_event_hook = CheckSystem;
    2335  rl_set_keyboard_input_timeout (1000);
     36# endif 
    2437
    2538  set_str_variable ("HISTORY", opihi_history);
  • trunk/Ohana/src/opihi/pcontrol/run.c

    r8296 r8424  
    88  }
    99
     10# ifdef THREADED
     11  SetRunSystem (TRUE);
     12# else
    1013  rl_event_hook = CheckSystem;
     14# endif
    1115
    1216  return (TRUE);
    1317}
    1418
    15 int run_threaded (int argc, char **argv) {
    16 
    17   if (argc != 1) {
    18     gprint (GP_ERR, "USAGE: run\n");
    19     return (FALSE);
    20   }
    21 
    22   // some action
    23 
    24   return (TRUE);
    25 }
  • trunk/Ohana/src/opihi/pcontrol/status.c

    r8296 r8424  
    11# include "pcontrol.h"
    22
    3 char jobname[7][32]  = {"PENDING", "BUSY", "HUNG", "DONE", "KILL", "EXIT", "CRASH"};
    4 char hostname[5][32] = {"IDLE",    "BUSY", "DOWN", "DONE", "OFF"};
     3int PrintJobStack (int Nstack);
     4int PrintHostStack (int Nstack);
    55
    66int status (int argc, char **argv) {
     
    3131  LockStack (stack);
    3232  Nobject = stack[0].Nobject;
    33   gprint (GP_LOG, "job stack %s:  %d objects\n", jobname[Nstack], Nobject);
     33  gprint (GP_LOG, "job stack %s:  %d objects\n", GetJobStackName(Nstack), Nobject);
    3434
    3535  for (i = 0; i < Nobject; i++) {
     
    5757  LockStack (stack);
    5858  Nobject = stack[0].Nobject;
    59   gprint (GP_LOG, "host stack %s:  %d objects\n", hostname[Nstack], Nobject);
     59  gprint (GP_LOG, "host stack %s:  %d objects\n", GetHostStackName(Nstack), Nobject);
    6060
    6161  for (i = 0; i < Nobject; i++) {
  • trunk/Ohana/src/opihi/pcontrol/stop.c

    r7917 r8424  
    88  }
    99
     10# ifdef THREADED
     11  SetRunSystem (FALSE);
     12# else
    1013  rl_event_hook = NULL;
     14# endif
    1115
    1216  return (TRUE);
Note: See TracChangeset for help on using the changeset viewer.