Changeset 8424
- Timestamp:
- Aug 18, 2006, 1:44:51 PM (20 years ago)
- Location:
- trunk/Ohana/src/opihi
- Files:
-
- 2 added
- 1 deleted
- 24 edited
-
doc/pcontrol.txt (modified) (1 diff)
-
include/pcontrol.h (modified) (7 diffs)
-
include/shell.h (modified) (4 diffs)
-
pcontrol/CheckBusyJob.c (modified) (8 diffs)
-
pcontrol/CheckDoneHost.c (modified) (3 diffs)
-
pcontrol/CheckDoneJob.c (modified) (2 diffs)
-
pcontrol/CheckHost.c (modified) (3 diffs)
-
pcontrol/CheckIdleHost.c (modified) (8 diffs)
-
pcontrol/CheckPoint.c (added)
-
pcontrol/CheckSystem.c (modified) (12 diffs)
-
pcontrol/HostOps.c (modified) (1 diff)
-
pcontrol/JobOps.c (modified) (3 diffs)
-
pcontrol/KillJob.c (modified) (4 diffs)
-
pcontrol/Makefile (modified) (4 diffs)
-
pcontrol/PclientCommand.c (added)
-
pcontrol/StackOps.c (modified) (8 diffs)
-
pcontrol/StartHost.c (modified) (1 diff)
-
pcontrol/StartJob.c (modified) (6 diffs)
-
pcontrol/StopHosts.c (modified) (3 diffs)
-
pcontrol/check.c (modified) (4 diffs)
-
pcontrol/host.c (modified) (4 diffs)
-
pcontrol/init.c (modified) (5 diffs)
-
pcontrol/pclient.c (deleted)
-
pcontrol/pcontrol.c (modified) (2 diffs)
-
pcontrol/run.c (modified) (1 diff)
-
pcontrol/status.c (modified) (3 diffs)
-
pcontrol/stop.c (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
trunk/Ohana/src/opihi/doc/pcontrol.txt
r8297 r8424 1 2 2006.08.18 3 4 Outstanding issues related to pcontrol: 5 6 * disposition of HUNG jobs? 7 * probably should not save the history for pcontrol or pclient 8 (these will be many lines long very quickly...) 9 * need to add options to run/stop for hosts and jobs independently 1 10 2 11 2006.08.11 -
trunk/Ohana/src/opihi/include/pcontrol.h
r8297 r8424 1 1 # include "data.h" 2 2 # include "basic.h" 3 # define THREADED 3 4 4 5 typedef struct timeval Ptime; … … 98 99 int Nobject; 99 100 int NOBJECT; 100 // pthread_mutex_t mutex; 101 # ifdef THREADED 102 pthread_mutex_t mutex; 103 # endif 101 104 } Stack; 102 105 … … 108 111 # define DTIME(A,B) ((A.tv_sec - B.tv_sec) + 1e-6*(A.tv_usec - B.tv_usec)) 109 112 # define ZTIME(A) ((A.tv_sec == 0) && (A.tv_usec == 0)) 113 114 // # define ASSERT(TEST,STRING) { if (!(TEST)) { gprint (GP_ERR, "programming error: %s\n", STRING); abort (); }} 115 // # define ABORT(STRING) { gprint (GP_ERR, "programming error: %s\n", STRING); abort (); } 116 # define ASSERT(TEST,STRING) { if (!(TEST)) { gprint (GP_ERR, "programming error: %s\n", STRING); raise (SIGINT); exit (2); }} 117 # define ABORT(STRING) { gprint (GP_ERR, "programming error: %s\n", STRING); raise (SIGINT); exit (2); } 110 118 111 119 void InitPcontrol (); … … 117 125 void *PullStackByName (Stack *stack, char *name); 118 126 void *PullStackByID (Stack *stack, int id); 119 void *FindStackByID (Stack *stack, int id);120 void * FindStackByName (Stack *stack, char *name);127 int RemoveStackEntry (Stack *stack, int where); 128 void *RemoveStackByID (Stack *stack, int id); 121 129 void LockStack (Stack *stack); 122 130 void UnlockStack (Stack *stack); 123 131 124 int CheckSystem (); 125 int CheckBusyJobs (float delay); 126 int CheckDoneJobs (float delay); 127 int CheckKillJobs (float delay); 128 int CheckDoneHosts (float delay); 129 int CheckLiveHosts (float delay); 130 int CheckDownHosts (float delay); 131 int CheckIdleHosts (float delay); 132 // void *FindStackByID (Stack *stack, int id); 133 // void *FindStackByName (Stack *stack, char *name); 134 135 /*** CheckSystem.c ***/ 136 int CheckSystem (); 137 void *CheckSystem_Threaded (void *data); 138 int CheckBusyJobs (float delay); 139 int CheckDoneJobs (float delay); 140 int CheckKillJobs (float delay); 141 int CheckDoneHosts (float delay); 142 int CheckDownHosts (float delay); 143 int CheckIdleHosts (float delay); 144 int CheckLiveHosts (float delay); 145 int SetRunSystem (int state); 146 147 /*** own files ***/ 148 int CheckHost (Host *host); 149 int StartHost (Host *host); 132 150 int CheckIdleHost (Host *host); 151 int CheckDoneHost (Host *host); 152 int CheckBusyJob (Job *job, Host *host); 153 int CheckDoneJob (Job *job, Host *host); 154 int KillJob (Job *job, Host *host); 155 int StartJob (Job *job, Host *host); 156 int ResetJob (Job *job); 157 int GetJobOutput (char *command, Host *host, IOBuffer *buffer, int Nbytes); 158 int PclientCommand (Host *host, char *command, char *response, IOBuffer *buffer); 159 int rconnect (char *command, char *hostname, char *shell, int *stdio); 160 161 /*** misc files ***/ 162 int VerboseMode (); // in verbose.c 163 void gotsignal (int signum); // in pcontrol.c 164 165 /*** IDops.c ***/ 133 166 void InitIDs (); 134 167 IDtype NextJobID (); 135 168 IDtype NextHostID (); 136 169 void PrintID (gpDest dest, IDtype ID); 137 int CheckBusyJob (Job *job); 138 int StartHost (Host *host); 139 int CheckDoneHost (Host *host); 140 int CheckDoneJob (Job *job); 141 int GetJobOutput (char *command, Host *host, IOBuffer *buffer, int Nbytes); 142 int ResetJob (Job *job); 143 int PclientCommand (Host *host, char *command, char *response, IOBuffer *buffer); 144 int rconnect (char *command, char *hostname, char *shell, int *stdio); 145 int CheckHost (Host *host); 146 147 int PrintJobStack (int Nstack); 148 int PrintHostStack (int Nstack); 149 150 int VerboseMode (); 151 152 void gotsignal (int signum); 170 171 /*** CheckPoint.c ***/ 172 int SetCheckPoint (); 173 int ClearCheckPoint (); 174 int TestCheckPoint (); 153 175 154 176 /*** HostOps.c ***/ 155 177 void InitHostStacks (); 156 178 Stack *GetHostStack (int StackID); 179 char *GetHostStackName (int StackID); 157 180 Stack *GetHostStackByName (char *name); 158 181 int PutHost (Host *host, int StackID, int where); … … 161 184 Host *PullHostFromStackByID (int StackID, IDtype ID); 162 185 Host *PullHostFromStackByName (int StackID, char *name); 163 Host *FindHostByID (IDtype HostID, int *StackID);164 Host *FindHostByName (char *name, int *StackID);165 Host *FindHostInStackByID (int StackID, IDtype ID);166 Host *FindHostInStackByName (int StackID, char *name);167 186 IDtype AddHost (char *hostname); 168 187 void DelHost (Host *host); 169 188 189 /*** StopHosts.c ***/ 170 190 void DownHost (Host *host); 171 191 void OffHost (Host *host); 172 192 int DownHosts (); 193 int StopHost (Host *host); 173 194 int HarvestHost (int pid); 174 int StopHost (Host *host);175 195 176 196 /*** JobOps.c ***/ 177 197 void InitJobStacks (); 178 198 Stack *GetJobStack (int StackID); 199 char *GetJobStackName (int StackID); 179 200 Stack *GetJobStackByName (char *name); 180 201 int PutJob (Job *job, int StackID, int where); … … 182 203 Job *PullJobByID (IDtype JobID, int *StackID); 183 204 Job *PullJobFromStackByID (int StackID, int ID); 184 Job *FindJobByID (IDtype JobID, int *StackID);185 Job *FindJobInStackByID (int StackID, int ID);186 205 IDtype AddJob (char *hostname, JobMode mode, int timeout, int argc, char **argv); 187 206 void DelJob (Job *job); … … 189 208 void LinkJobAndHost (Job *job, Host *host); 190 209 191 int KillJob (Job *job); 192 int StartJob (Job *job); 210 // Job *FindJobByID (IDtype JobID, int *StackID); 211 // Job *FindJobInStackByID (int StackID, int ID); 212 // Host *FindHostByID (IDtype HostID, int *StackID); 213 // Host *FindHostByName (char *name, int *StackID); 214 // Host *FindHostInStackByID (int StackID, IDtype ID); 215 // Host *FindHostInStackByName (int StackID, char *name); -
trunk/Ohana/src/opihi/include/shell.h
r8174 r8424 11 11 # define MACRO_STRING(s) #s 12 12 # define MACRO_NAME(s) MACRO_STRING(s) 13 14 /* enums used by gprint functions */ 15 typedef enum {GP_FILE, GP_BUFF} gpMode; 16 typedef enum {GP_LOG, GP_ERR} gpDest; 13 17 14 18 typedef int CommandF (); … … 33 37 } List; 34 38 39 /* structure used to represent the gprint i/o stream */ 40 typedef struct { 41 FILE *file; 42 IOBuffer *buffer; 43 gpMode mode; 44 gpDest dest; 45 pthread_t thread; 46 } gpStream; 47 35 48 /*** globals used to track the shell language concepts ***/ 36 49 List *lists; /* variable to store the list of all lists */ … … 47 60 void program_init PROTO((int *argc, char **argv)); 48 61 void startup PROTO((int *argc, char **argv)); 62 int opihi PROTO((int argc, char **argv)); 49 63 int multicommand PROTO((char *line)); 50 64 void multicommand_InitServer PROTO((void)); … … 118 132 int macro_list_f PROTO((int, char **)); /* "macro_list" is a readline func */ 119 133 int macro_read PROTO((int, char **)); 120 int macro_write PROTO((int, char **));134 int macro_write PROTO((int, char **)); 121 135 122 char *memstr (char *m1, char *m2, int n);123 int write_fmt (int fd, char *format, ...);124 char *opihi_version ();125 char *strip_version (char *input);136 char *memstr PROTO((char *m1, char *m2, int n)); 137 int write_fmt PROTO((int fd, char *format, ...)); 138 char *opihi_version PROTO(()); 139 char *strip_version PROTO((char *input)); 126 140 127 /*** gprint defines ***/ 128 129 /* enums used by gprint functions */ 130 typedef enum {GP_FILE, GP_BUFF} gpMode; 131 typedef enum {GP_LOG, GP_ERR} gpDest; 132 133 /* structure used to represent the gprint i/o stream */ 134 typedef struct { 135 FILE *file; 136 IOBuffer *buffer; 137 gpMode mode; 138 gpDest dest; 139 pthread_t thread; 140 } gpStream; 141 142 void gprintInit (); 143 gpStream *gprintGetStream (gpDest dest); 144 void gprintSetBuffer (gpDest dest); 145 IOBuffer *gprintGetBuffer (gpDest dest); 146 void gprintSetFile (gpDest dest, char *filename); 147 FILE *gprintGetFile (gpDest dest); 148 int gprint (gpDest dest, char *format, ...); 149 int gwrite (char *buffer, int size, int N, gpDest dest); 141 /* gprint functions */ 142 void gprintInit PROTO(()); 143 gpStream *gprintGetStream PROTO((gpDest dest)); 144 void gprintSetBuffer PROTO((gpDest dest)); 145 IOBuffer *gprintGetBuffer PROTO((gpDest dest)); 146 void gprintSetFile PROTO((gpDest dest, char *filename)); 147 FILE *gprintGetFile PROTO((gpDest dest)); 148 int gprint PROTO((gpDest dest, char *format, ...)); 149 int gwrite PROTO((char *buffer, int size, int N, gpDest dest)); 150 150 151 151 # endif -
trunk/Ohana/src/opihi/pcontrol/CheckBusyJob.c
r8296 r8424 1 1 # include "pcontrol.h" 2 2 3 int CheckBusyJob (Job *job ) {3 int CheckBusyJob (Job *job, Host *host) { 4 4 5 5 int status; … … 8 8 char string[64]; 9 9 IOBuffer buffer; 10 Host *host;11 10 12 11 /* we are checking a job which is currently busy. it has been pulled from the … … 14 13 XXX need to check on state of HOST on return */ 15 14 16 /** must have a valid host : if not? **/17 host = (Host *) job[0].host;15 ASSERT (host == (Host *) job[0].host, "invalid host"); 16 ASSERT (job == (Job *) host[0].job, "invalid job"); 18 17 19 18 InitIOBuffer (&buffer, 0x100); … … 25 24 case PCLIENT_DOWN: 26 25 HarvestHost (host[0].pid); 27 UnlinkJobAndHost (job); 26 // unlink host & job 27 job[0].host = NULL; 28 host[0].job = NULL; 28 29 PutHost (host, PCONTROL_HOST_DOWN, STACK_BOTTOM); 29 30 PutJob (job, PCONTROL_JOB_PENDING, STACK_BOTTOM); … … 32 33 33 34 case PCLIENT_HUNG: 35 HarvestHost (host[0].pid); 36 // unlink host & job 37 job[0].host = NULL; 38 host[0].job = NULL; 39 PutHost (host, PCONTROL_HOST_DOWN, STACK_BOTTOM); 34 40 PutJobSetState (job, PCONTROL_JOB_BUSY, STACK_BOTTOM, PCONTROL_JOB_HUNG); 35 41 FreeIOBuffer (&buffer); … … 37 43 38 44 case PCLIENT_GOOD: 39 if (VerboseMode()) gprint (GP_ERR, "message received (CheckBusyJob) \n");45 if (VerboseMode()) gprint (GP_ERR, "message received (CheckBusyJob)"); 40 46 break; 41 47 42 48 default: 43 gprint (GP_ERR, "programming error: unknown status for pclient command\n"); 44 exit (1); 49 ABORT ("unknown status for pclient command"); 45 50 } 46 51 47 52 /** host is up, need to parse message **/ 48 53 p = memstr (buffer.buffer, "STATUS", buffer.Nbuffer); 49 if (p == NULL) { 50 gprint (GP_ERR, "programming error: missing STATUS in pclient message\n"); 51 exit (1); 52 } 54 ASSERT (p != NULL, "missing STATUS in pclient message"); 55 53 56 sscanf (p, "%*s %s", string); 54 if (!strcmp(string, "NONE")) { 55 gprint (GP_ERR, "programming error: no current job\n"); 56 exit (1); 57 } 57 ASSERT (strcmp(string, "NONE"), "no current job\n"); 58 58 59 /** no status change, return to BUSY stack **/ 59 60 if (!strcmp(string, "BUSY")) { 61 PutHost (host, PCONTROL_HOST_BUSY, STACK_BOTTOM); 60 62 PutJob (job, PCONTROL_JOB_BUSY, STACK_BOTTOM); 61 63 FreeIOBuffer (&buffer); … … 67 69 if (!strcmp(string, "EXIT")) outstate = PCONTROL_JOB_EXIT; 68 70 if (!strcmp(string, "CRASH")) outstate = PCONTROL_JOB_CRASH; 69 if (outstate == PCONTROL_JOB_BUSY) { 70 gprint (GP_ERR, "programming error : should not reach here (CheckJob)\n"); 71 exit (1); 72 } 71 ASSERT (outstate != PCONTROL_JOB_BUSY, "should not reach here (CheckJob)"); 73 72 74 73 /* parse the exit status and sizes of output buffers */ … … 80 79 sscanf (p, "%*s %d", &job[0].stderr_size); 81 80 82 /** job has exited : move to DONE stack (host still BUSY) **/ 81 // job has exited : move to DONE stack 82 // the host is still BUSY until job output is gathered (int CheckDoneJob) 83 // don't unlink job and host yet 84 PutHost (host, PCONTROL_HOST_BUSY, STACK_BOTTOM); 83 85 PutJobSetState (job, PCONTROL_JOB_DONE, STACK_BOTTOM, outstate); 84 86 FreeIOBuffer (&buffer); 85 87 return (TRUE); 86 88 } 87 88 /** need to add timeout check here **/ -
trunk/Ohana/src/opihi/pcontrol/CheckDoneHost.c
r7917 r8424 20 20 FreeIOBuffer (&buffer); 21 21 return (FALSE); 22 / ** do we need to close the connection? **/22 // XXX do we need to close the connection? 23 23 24 24 case PCLIENT_HUNG: 25 // XXX should this be DONE or DOWN?/ 25 26 PutHost (host, PCONTROL_HOST_DONE, STACK_BOTTOM); 26 27 if (VerboseMode()) gprint (GP_ERR, "host %s is not responding\n", host[0].hostname); … … 33 34 34 35 default: 35 if (VerboseMode()) gprint (GP_ERR, "unknown status for pclient command: programming error\n"); 36 exit (1); 36 ABORT ("unknown status for pclient command"); 37 37 } 38 38 39 39 /** successful command, examine result **/ 40 40 p = memstr (buffer.buffer, "STATUS", buffer.Nbuffer); 41 if (p == NULL) { 42 gprint (GP_ERR, "programming error: missing STATUS in pclient message (CheckDoneHost)\n"); 43 exit (1); 44 } 41 ASSERT (p != NULL, "missing STATUS in pclient message (CheckDoneHost)"); 42 45 43 sscanf (p, "%*s %d", &status); 46 44 switch (status) { 47 45 case -1: 48 gprint (GP_ERR, "programming error: reset syntax error\n"); 49 exit (1); 46 ABORT ("reset syntax error"); 50 47 51 48 case 0: … … 63 60 64 61 default: 65 gprint (GP_ERR, "programming error: should not reach here (CheckDoneHost)\n"); 66 exit (1); 62 ABORT ("should not reach here (CheckDoneHost)"); 67 63 } 68 gprint (GP_ERR, "programming error: should not reach here either (CheckDoneHost)\n"); 69 exit (1); 64 ABORT ("should not reach here (CheckDoneHost)"); 70 65 } 71 66 -
trunk/Ohana/src/opihi/pcontrol/CheckDoneJob.c
r8296 r8424 1 1 # include "pcontrol.h" 2 2 3 int CheckDoneJob (Job *job ) {3 int CheckDoneJob (Job *job, Host *host) { 4 4 5 Host *host;5 int success; 6 6 7 if (!GetJobOutput ("stdout", (Host *) job[0].host, &job[0].stdout, job[0].stdout_size)) { 8 /* strip off first and last lines */ 9 PutJob (job, PCONTROL_JOB_DONE, STACK_BOTTOM); 10 return (FALSE); 11 } 7 ASSERT (host == (Host *) job[0].host, "invalid host"); 8 ASSERT (job == (Job *) host[0].job, "invalid job"); 12 9 13 if (!GetJobOutput ("stderr", (Host *) job[0].host, &job[0].stderr, job[0].stderr_size)) { 10 success = TRUE; 11 success &= GetJobOutput ("stdout", host, &job[0].stdout, job[0].stdout_size); 12 success &= GetJobOutput ("stderr", host, &job[0].stderr, job[0].stderr_size); 13 14 if (!success) { 15 // XXX some kind of error? 16 // XXX try again later? 17 PutHost (host, PCONTROL_HOST_BUSY, STACK_BOTTOM); 14 18 PutJob (job, PCONTROL_JOB_DONE, STACK_BOTTOM); 15 19 return (FALSE); … … 17 21 18 22 /* job's state is either EXIT or CRASH (verify?) */ 19 host = UnlinkJobAndHost (job); 23 // unlink host & job 24 job[0].host = NULL; 25 host[0].job = NULL; 20 26 PutHost (host, PCONTROL_HOST_DONE, STACK_BOTTOM); 21 27 PutJob (job, job[0].state, STACK_BOTTOM); -
trunk/Ohana/src/opihi/pcontrol/CheckHost.c
r7917 r8424 5 5 int status; 6 6 IOBuffer buffer; 7 Job *job; 7 8 if (host[0].job != NULL) return (TRUE); 9 10 /* if this host has been marked to be turned off, do that and return */ 11 if (host[0].markoff) { 12 host[0].markoff = FALSE; 13 StopHost (host); 14 OffHost (host); 15 return (TRUE); 16 } 8 17 9 18 InitIOBuffer (&buffer, 0x100); … … 13 22 case 0: 14 23 if (VerboseMode()) gprint (GP_ERR, "host %s is down\n", host[0].hostname); 15 /* if host has a job, job is dead, return to Pending */16 job = (Job *) host[0].job;17 if (job != NULL) {18 UnlinkJobAndHost (job);19 PutJob (job, PCONTROL_JOB_PENDING, STACK_BOTTOM);20 }21 24 HarvestHost (host[0].pid); 22 25 PutHost (host, PCONTROL_HOST_DOWN, STACK_BOTTOM); … … 36 39 return (TRUE); 37 40 } 38 gprint (GP_ERR, "programming error: should not reach here (Check Host)\n"); 39 return (FALSE); 41 ABORT ("should not reach here (Check Host)"); 40 42 } 43 44 // if the host has a job, we skip it (down or crash state will be caught elsewhere) 45 // in fact, just touch the IDLE hosts, not the BUSY hosts? -
trunk/Ohana/src/opihi/pcontrol/CheckIdleHost.c
r8296 r8424 8 8 Job *job; 9 9 10 /* if this host has been marked to be turned off, do that and return */ 11 if (host[0].markoff) { 12 host[0].markoff = FALSE; 13 StopHost (host); 14 OffHost (host); 15 return (TRUE); 16 } 17 10 18 /* search the JOB_PENDING stack for an appropriate job */ 11 19 stack = GetJobStack (PCONTROL_JOB_PENDING); … … 16 24 job = (Job *) stack[0].object[i]; 17 25 if (job[0].mode != PCONTROL_JOB_NEEDHOST) continue; 18 if (job[0].hostname == NULL) { 19 gprint (GP_ERR, "programming error: NEEDHOST hostname missing\n"); 20 exit (2); 21 } 26 ASSERT (job[0].hostname != NULL, "NEEDHOST hostname missing"); 22 27 if (strcasecmp (job[0].hostname, host[0].hostname)) continue; 23 28 … … 29 34 RemoveStackEntry (stack, i); 30 35 UnlockStack (stack); 31 StartJob (job );36 StartJob (job, host); 32 37 return (TRUE); 33 38 } … … 37 42 job = (Job *) stack[0].object[i]; 38 43 if (job[0].mode != PCONTROL_JOB_WANTHOST) continue; 39 if (job[0].hostname == NULL) { 40 gprint (GP_ERR, "programming error: WANTHOST hostname missing\n"); 41 exit (2); 42 } 44 ASSERT (job[0].hostname != NULL, "WANTHOST hostname missing"); 43 45 if (strcasecmp (job[0].hostname, host[0].hostname)) continue; 44 46 … … 50 52 RemoveStackEntry (stack, i); 51 53 UnlockStack (stack); 52 StartJob (job );54 StartJob (job, host); 53 55 return (TRUE); 54 56 } … … 66 68 RemoveStackEntry (stack, i); 67 69 UnlockStack (stack); 68 StartJob (job );69 return (TRUE);70 StartJob (job, host); 71 return (TRUE); 70 72 } 71 73 … … 76 78 job = (Job *) stack[0].object[i]; 77 79 if (job[0].mode != PCONTROL_JOB_WANTHOST) continue; 78 // test the job age and skip if too young80 // XXX test the job age and skip if too young 79 81 80 82 /* we have found an appropriate job; link it to the host and send to StartJob */ … … 85 87 RemoveStackEntry (stack, i); 86 88 UnlockStack (stack); 87 StartJob (job );89 StartJob (job, host); 88 90 return (TRUE); 89 91 } -
trunk/Ohana/src/opihi/pcontrol/CheckSystem.c
r8296 r8424 1 1 # include "pcontrol.h" 2 # define DEBUG 0 2 3 3 4 static struct timeval lastlive = {0, 0}; 5 static int RunSystem = FALSE; 6 7 int SetRunSystem (int state) { 8 int oldstate; 9 oldstate = RunSystem; 10 RunSystem = state; 11 return oldstate; 12 } 4 13 5 14 int CheckSystem () { … … 29 38 } 30 39 31 if ( 0) {40 if (DEBUG) { 32 41 Stack *stack; 33 42 int Nidle, Ndown, Nbusy; … … 44 53 } 45 54 55 void *CheckSystem_Threaded (void *data) { 56 57 struct timeval now; 58 float dtime; 59 60 gprintInit (); 61 62 while (1) { 63 // stop here if the user-thread requests (no objects in flight) 64 TestCheckPoint (); 65 66 // don't run the system checks if RunSystem is FALSE 67 if (!RunSystem) { 68 usleep (50000); 69 continue; 70 } 71 72 // we want to give each block a maximum allowed time 73 CheckIdleHosts(0.020); /* submit a new job */ 74 75 CheckBusyJobs(0.020); /* get job status */ 76 CheckDoneJobs(0.020); /* harvest job stdout/stderr */ 77 CheckKillJobs(0.020); /* harvest job stdout/stderr */ 78 79 CheckDoneHosts(0.020); /* reset the host */ 80 CheckDownHosts(0.100); /* launch the host */ 81 82 /* always allow at least one test */ 83 /* most tests require about 2ms per host. 84 CheckDoneJobs must depend on the size of the output buffer */ 85 86 gettimeofday (&now, (void *) NULL); 87 dtime = DTIME (now, lastlive); 88 if (dtime > 1.0) { 89 CheckLiveHosts(0.040); 90 lastlive = now; 91 } 92 93 if (DEBUG) { 94 Stack *stack; 95 int Nidle, Ndown, Nbusy; 96 stack = GetHostStack (PCONTROL_HOST_IDLE); 97 Nidle = stack[0].Nobject; 98 stack = GetHostStack (PCONTROL_HOST_DOWN); 99 Ndown = stack[0].Nobject; 100 stack = GetHostStack (PCONTROL_HOST_BUSY); 101 Nbusy = stack[0].Nobject; 102 gprint (GP_ERR, "busy, idle, down: %2d %2d %2d\n", Nbusy, Nidle, Ndown); 103 } 104 } 105 return (NULL); 106 } 107 46 108 int CheckBusyJobs (float MaxDelay) { 47 109 48 110 struct timeval start, stop; 49 111 int i, Nobject; 50 Stack *stack; 112 Stack *hoststack; 113 Stack *jobstack; 51 114 Job *job; 115 Host *host; 52 116 float dtime; 53 117 … … 65 129 dtime = 0.0; 66 130 for (i = 0; (i < Nobject) && (dtime < MaxDelay); i++) { 67 /* pull both job and host from their stacks */ 68 /* XXX is the subject to the Dangerous Embrace? */ 131 // pull both job and host from their stacks 69 132 LockStack (hoststack); 70 133 job = PullStackByLocation (jobstack, STACK_TOP); … … 73 136 break; 74 137 } 75 host = RemoveStackByID (hoststack, job[0].host[0].HostID); 138 host = (Host *) job[0].host; 139 RemoveStackByID (hoststack, host[0].HostID); 76 140 UnlockStack (hoststack); 77 141 78 CheckBusyJob (job );79 gettimeofday (&stop, (void *) NULL); 80 dtime = DTIME (stop, start); 81 } 82 if ( 0&& (Nobject > 0)) gprint (GP_ERR, "checked %d of %d jobs\n", i, Nobject);142 CheckBusyJob (job, host); 143 gettimeofday (&stop, (void *) NULL); 144 dtime = DTIME (stop, start); 145 } 146 if (DEBUG && (Nobject > 0)) gprint (GP_ERR, "checked %d of %d jobs\n", i, Nobject); 83 147 return (TRUE); 84 148 } … … 88 152 struct timeval start, stop; 89 153 int i, Nobject; 90 Stack *stack; 154 Stack *hoststack; 155 Stack *jobstack; 91 156 Job *job; 92 float dtime; 93 94 /* Loop through objects on the stack, no more than once. see note above */ 95 stack = GetJobStack (PCONTROL_JOB_DONE); 96 Nobject = stack[0].Nobject; 97 98 /* always allow at least one test */ 99 gettimeofday (&start, (void *) NULL); 100 dtime = 0.0; 101 for (i = 0; (i < Nobject) && (dtime < MaxDelay); i++) { 102 job = PullStackByLocation (stack, STACK_TOP); 103 if (job == NULL) break; 104 CheckDoneJob (job); 105 gettimeofday (&stop, (void *) NULL); 106 dtime = DTIME (stop, start); 107 } 108 if (0 && (Nobject > 0)) gprint (GP_ERR, "checked %d of %d jobs\n", i, Nobject); 157 Host *host; 158 float dtime; 159 160 /* Loop through objects on the stack, no more than once. see note above */ 161 hoststack = GetHostStack (PCONTROL_HOST_BUSY); 162 jobstack = GetJobStack (PCONTROL_JOB_DONE); 163 Nobject = jobstack[0].Nobject; 164 165 /* always allow at least one test */ 166 gettimeofday (&start, (void *) NULL); 167 dtime = 0.0; 168 for (i = 0; (i < Nobject) && (dtime < MaxDelay); i++) { 169 LockStack (hoststack); 170 job = PullStackByLocation (jobstack, STACK_TOP); 171 if (job == NULL) { 172 UnlockStack (hoststack); 173 break; 174 } 175 host = (Host *) job[0].host; 176 RemoveStackByID (hoststack, host[0].HostID); 177 UnlockStack (hoststack); 178 179 CheckDoneJob (job, host); 180 gettimeofday (&stop, (void *) NULL); 181 dtime = DTIME (stop, start); 182 } 183 if (DEBUG && (Nobject > 0)) gprint (GP_ERR, "checked %d of %d jobs\n", i, Nobject); 109 184 return (TRUE); 110 185 } … … 114 189 struct timeval start, stop; 115 190 int i, Nobject; 116 Stack *stack; 191 Stack *hoststack; 192 Stack *jobstack; 117 193 Job *job; 118 float dtime; 119 120 /* Loop through objects on the stack, no more than once. see note above */ 121 stack = GetJobStack (PCONTROL_JOB_KILL); 122 Nobject = stack[0].Nobject; 123 124 /* always allow at least one test */ 125 gettimeofday (&start, (void *) NULL); 126 dtime = 0.0; 127 for (i = 0; (i < Nobject) && (dtime < MaxDelay); i++) { 128 job = PullStackByLocation (stack, STACK_TOP); 129 if (job == NULL) break; 130 KillJob (job); 131 gettimeofday (&stop, (void *) NULL); 132 dtime = DTIME (stop, start); 133 } 134 if (0 && (Nobject > 0)) gprint (GP_ERR, "checked %d of %d jobs\n", i, Nobject); 194 Host *host; 195 float dtime; 196 197 /* Loop through objects on the stack, no more than once. see note above */ 198 hoststack = GetHostStack (PCONTROL_HOST_BUSY); 199 jobstack = GetJobStack (PCONTROL_JOB_KILL); 200 Nobject = jobstack[0].Nobject; 201 202 /* always allow at least one test */ 203 gettimeofday (&start, (void *) NULL); 204 dtime = 0.0; 205 for (i = 0; (i < Nobject) && (dtime < MaxDelay); i++) { 206 LockStack (hoststack); 207 job = PullStackByLocation (jobstack, STACK_TOP); 208 if (job == NULL) { 209 UnlockStack (hoststack); 210 break; 211 } 212 host = (Host *) job[0].host; 213 RemoveStackByID (hoststack, host[0].HostID); 214 UnlockStack (hoststack); 215 216 KillJob (job, host); 217 gettimeofday (&stop, (void *) NULL); 218 dtime = DTIME (stop, start); 219 } 220 if (DEBUG && (Nobject > 0)) gprint (GP_ERR, "checked %d of %d jobs\n", i, Nobject); 135 221 return (TRUE); 136 222 } … … 158 244 dtime = DTIME (stop, start); 159 245 } 160 if ( 0) gprint (GP_ERR, "checked %d hosts\n", i);246 if (DEBUG) gprint (GP_ERR, "checked %d hosts\n", i); 161 247 return (TRUE); 162 248 } … … 180 266 host = PullStackByLocation (stack, STACK_TOP); 181 267 if (host == NULL) break; 268 if (host[0].markoff) { 269 host[0].markoff = FALSE; 270 OffHost (host); 271 return (TRUE); 272 } 182 273 dtime = DTIME (host[0].nexttry, start); 183 274 if (dtime > 0) { … … 189 280 dtime = DTIME (stop, start); 190 281 } 191 if ( 0) gprint (GP_ERR, "checked %d hosts\n", i);282 if (DEBUG) gprint (GP_ERR, "checked %d hosts\n", i); 192 283 return (TRUE); 193 284 } … … 219 310 dtime = DTIME (stop, start); 220 311 } 221 if ( 0) gprint (GP_ERR, "checked %d hosts\n", i);222 return (TRUE); 223 } 224 225 /* this is just a heartbeat check */312 if (DEBUG) gprint (GP_ERR, "checked %d hosts\n", i); 313 return (TRUE); 314 } 315 316 /* this is just a heartbeat check (only IDLE hosts) */ 226 317 int CheckLiveHosts (float MaxDelay) { 227 318 … … 246 337 dtime = DTIME (stop, start); 247 338 } 248 if (0) gprint (GP_ERR, "checked %d idle hosts\n", i); 249 250 /* Loop through objects on the stack, no more than once. see note above */ 251 stack = GetHostStack (PCONTROL_HOST_BUSY); 252 Nobject = stack[0].Nobject; 253 254 dtime = 0.0; 255 for (i = 0; (i < Nobject) && (dtime < MaxDelay); i++) { 256 host = PullStackByLocation (stack, STACK_TOP); 257 if (host == NULL) break; 258 CheckHost (host); 259 gettimeofday (&stop, (void *) NULL); 260 dtime = DTIME (stop, start); 261 } 262 if (0) gprint (GP_ERR, "checked %d busy hosts\n", i); 339 if (DEBUG) gprint (GP_ERR, "checked %d idle hosts\n", i); 263 340 return (TRUE); 264 341 } -
trunk/Ohana/src/opihi/pcontrol/HostOps.c
r8296 r8424 15 15 } 16 16 17 char *GetHostStackName (int StackID) { 18 switch (StackID) { 19 case PCONTROL_HOST_IDLE: return ("IDLE"); 20 case PCONTROL_HOST_DOWN: return ("DOWN"); 21 case PCONTROL_HOST_DONE: return ("DONE"); 22 case PCONTROL_HOST_BUSY: return ("BUSY"); 23 case PCONTROL_HOST_OFF: return ("OFF"); 24 } 25 gprint (GP_ERR, "error: unknown host stack : programming error\n"); 26 exit (1); 27 } 28 17 29 Stack *GetHostStack (int StackID) { 18 30 switch (StackID) { 19 case PCONTROL_HOST_IDLE: 20 return (HostPool_Idle); 21 case PCONTROL_HOST_DOWN: 22 return (HostPool_Down); 23 case PCONTROL_HOST_DONE: 24 return (HostPool_Done); 25 case PCONTROL_HOST_BUSY: 26 return (HostPool_Busy); 27 case PCONTROL_HOST_OFF: 28 return (HostPool_Off); 29 default: 30 gprint (GP_ERR, "error: unknown host stack : programming error\n"); 31 exit (1); 31 case PCONTROL_HOST_IDLE: return (HostPool_Idle); 32 case PCONTROL_HOST_DOWN: return (HostPool_Down); 33 case PCONTROL_HOST_DONE: return (HostPool_Done); 34 case PCONTROL_HOST_BUSY: return (HostPool_Busy); 35 case PCONTROL_HOST_OFF: return (HostPool_Off); 32 36 } 33 37 gprint (GP_ERR, "error: unknown host stack : programming error\n"); -
trunk/Ohana/src/opihi/pcontrol/JobOps.c
r8296 r8424 17 17 } 18 18 19 char *GetJobStackName (int StackID) { 20 switch (StackID) { 21 case PCONTROL_JOB_PENDING: return ("PENDING"); 22 case PCONTROL_JOB_BUSY: return ("BUSY"); 23 case PCONTROL_JOB_DONE: return ("DONE"); 24 case PCONTROL_JOB_KILL: return ("KILL"); 25 case PCONTROL_JOB_EXIT: return ("EXIT"); 26 case PCONTROL_JOB_CRASH: return ("CRASH"); 27 } 28 gprint (GP_ERR, "error: unknown host stack : programming error\n"); 29 exit (1); 30 } 31 19 32 Stack *GetJobStack (int StackID) { 20 33 switch (StackID) { 21 case PCONTROL_JOB_PENDING: 22 return (JobPool_Pending); 23 case PCONTROL_JOB_BUSY: 24 return (JobPool_Busy); 25 case PCONTROL_JOB_DONE: 26 return (JobPool_Done); 27 case PCONTROL_JOB_KILL: 28 return (JobPool_Kill); 29 case PCONTROL_JOB_EXIT: 30 return (JobPool_Exit); 31 case PCONTROL_JOB_CRASH: 32 return (JobPool_Crash); 33 default: 34 gprint (GP_ERR, "error: unknown job stack : programming error\n"); 35 exit (1); 34 case PCONTROL_JOB_PENDING: return (JobPool_Pending); 35 case PCONTROL_JOB_BUSY: return (JobPool_Busy); 36 case PCONTROL_JOB_DONE: return (JobPool_Done); 37 case PCONTROL_JOB_KILL: return (JobPool_Kill); 38 case PCONTROL_JOB_EXIT: return (JobPool_Exit); 39 case PCONTROL_JOB_CRASH: return (JobPool_Crash); 36 40 } 37 41 gprint (GP_ERR, "error: unknown job stack : programming error\n"); … … 191 195 JobID = job[0].JobID; 192 196 PutJob (job, PCONTROL_JOB_PENDING, STACK_BOTTOM); 193 gprint (GP_ERR, "added new job\n");197 if (VerboseMode()) gprint (GP_ERR, "added new job\n"); 194 198 return (JobID); 195 199 } … … 210 214 FREE (job); 211 215 } 212 213 /* unlink job and host, pull host from its stack */214 Host *UnlinkJobAndHost (Job *job) {215 216 Host *host;217 218 host = (Host *) job[0].host;219 if (host == NULL) {220 gprint (GP_ERR, "programming error: job has no host\n");221 exit (2);222 }223 224 /* unlink host & job */225 job[0].host = NULL;226 host[0].job = NULL;227 228 /* remove host from correct stack */229 XXXX does this step asuume the host is in this stack??230 if (PullHostFromStackByID (host[0].stack, host[0].HostID) == NULL) {231 gprint (GP_ERR, "programming error: host is not found in current stack\n");232 exit (2);233 }234 return (host);235 } -
trunk/Ohana/src/opihi/pcontrol/KillJob.c
r8296 r8424 1 1 # include "pcontrol.h" 2 2 3 int KillJob (Job *job ) {3 int KillJob (Job *job, Host *host) { 4 4 5 Host *host;6 5 IOBuffer buffer; 7 6 int status; 8 7 char *p; 9 8 10 /** must have a valid host : if not? **/11 host = (Host *) job[0].host;9 ASSERT (host == (Host *) job[0].host, "invalid host"); 10 ASSERT (job == (Job *) host[0].job, "invalid job"); 12 11 13 12 InitIOBuffer (&buffer, 0x100); … … 19 18 case PCLIENT_DOWN: 20 19 HarvestHost (host[0].pid); 21 UnlinkJobAndHost (job); 22 PutJob (job, PCONTROL_JOB_PENDING, STACK_BOTTOM); 20 // unlink host & job 21 job[0].host = NULL; 22 host[0].job = NULL; 23 23 PutHost (host, PCONTROL_HOST_DOWN, STACK_BOTTOM); 24 PutJob (job, PCONTROL_JOB_CRASH, STACK_BOTTOM); 24 25 FreeIOBuffer (&buffer); 25 26 return (FALSE); 26 27 27 28 case PCLIENT_HUNG: 29 HarvestHost (host[0].pid); 30 // unlink host & job 31 job[0].host = NULL; 32 host[0].job = NULL; 33 PutHost (host, PCONTROL_HOST_DOWN, STACK_BOTTOM); 28 34 PutJobSetState (job, PCONTROL_JOB_BUSY, STACK_BOTTOM, PCONTROL_JOB_HUNG); 29 35 FreeIOBuffer (&buffer); … … 31 37 32 38 case PCLIENT_GOOD: 33 gprint (GP_ERR, "message received (KillJob)\n");39 if (VerboseMode()) gprint (GP_ERR, "message received (KillJob)\n"); 34 40 break; 35 41 36 42 default: 37 gprint (GP_ERR, "unknown status for pclient command: programming error\n"); 38 exit (1); 43 ABORT ("unknown status for pclient command"); 39 44 } 40 45 41 46 /** host is up, need to parse message **/ 42 47 p = memstr (buffer.buffer, "STATUS", buffer.Nbuffer); 43 if (p == NULL) { 44 gprint (GP_ERR, "missing STATUS in pclient message : programming error\n"); 45 exit (1); 46 } 47 gprint (GP_ERR, "client message: %s\n", buffer.buffer); 48 ASSERT (p != NULL, "missing STATUS in pclient message"); 49 if (VerboseMode()) gprint (GP_ERR, "client message: %s\n", buffer.buffer); 50 48 51 sscanf (p, "%*s %d", &status); 49 52 FreeIOBuffer (&buffer); … … 52 55 switch (status) { 53 56 case -1: 54 gprint (GP_ERR, "programming error (syntax error to pclient)\n"); 55 return (FALSE); 56 break; 57 ABORT ("syntax error to pclient"); 57 58 case 0: 58 59 gprint (GP_ERR, "failure to kill child process\n"); 59 PutJob (job, PCONTROL_JOB_BUSY, STACK_BOTTOM); 60 PutHost (host, PCONTROL_HOST_BUSY, STACK_BOTTOM); 61 PutJob (job, PCONTROL_JOB_KILL, STACK_BOTTOM); 60 62 return (FALSE); 61 63 case 1: 62 PutJobSetState (job, PCONTROL_JOB_DONE, STACK_BOTTOM, PCONTROL_JOB_CRASH); 64 gprint (GP_ERR, "killing job %s on %s\n", job[0].argv[0], host[0].hostname); 65 // unlink host & job 66 job[0].host = NULL; 67 host[0].job = NULL; 68 PutHost (host, PCONTROL_HOST_IDLE, STACK_BOTTOM); 69 PutJob (job, PCONTROL_JOB_CRASH, STACK_BOTTOM); 63 70 return (TRUE); 64 71 case 2: 65 gprint (GP_ERR, "programming error (client has no job)\n"); 66 return (FALSE); 72 ABORT ("client has no job"); 67 73 } 68 69 gprint (GP_ERR, "programming error : should not reach here (CheckJob)\n"); 70 exit (1); 74 ABORT ("should not reach here (KillJob)"); 71 75 } 72 73 /** XXX need to do something appropriate with host? ***/ -
trunk/Ohana/src/opihi/pcontrol/Makefile
r8296 r8424 19 19 # link flags 20 20 LIBS = -L$(LIB) -L$(LLIB) -L$(XLIB) 21 LIBS1 = -lsocket -lnsl -lreadline $(TLIB) -lkapa -lFITS -lohana -lX11 -l m21 LIBS1 = -lsocket -lnsl -lreadline $(TLIB) -lkapa -lFITS -lohana -lX11 -lpthread -lm 22 22 LIBS2 = -lbasiccmd -lshell -ldata 23 23 LFLAGS = $(LIBS) $(LIBS2) $(LIBS1) … … 27 27 funcs = \ 28 28 $(SDIR)/init.$(ARCH).o \ 29 $(SDIR)/pclient.$(ARCH).o \30 29 $(SDIR)/pcontrol.$(ARCH).o \ 31 30 $(SDIR)/rconnect.$(ARCH).o \ … … 35 34 $(SDIR)/CheckHost.$(ARCH).o \ 36 35 $(SDIR)/CheckIdleHost.$(ARCH).o \ 36 $(SDIR)/CheckPoint.$(ARCH).o \ 37 37 $(SDIR)/CheckSystem.$(ARCH).o \ 38 38 $(SDIR)/GetJobOutput.$(ARCH).o \ … … 41 41 $(SDIR)/JobOps.$(ARCH).o \ 42 42 $(SDIR)/StackOps.$(ARCH).o \ 43 $(SDIR)/PclientCommand.$(ARCH).o \ 43 44 $(SDIR)/ResetJob.$(ARCH).o \ 44 45 $(SDIR)/StartHost.$(ARCH).o \ -
trunk/Ohana/src/opihi/pcontrol/StackOps.c
r8296 r8424 25 25 ALLOCATE (stack[0].id, int, stack[0].NOBJECT); 26 26 27 // we need to use a mutex of type 28 // stack[0].mutex = PTHREAD_MUTEX_INITIALIZER;29 27 # ifdef THREADED 28 pthread_mutex_init (&stack[0].mutex, NULL); 29 # endif 30 30 return (stack); 31 31 } … … 72 72 void *PullStackByLocation (Stack *stack, int where) { 73 73 74 int i;75 74 void *object; 76 75 … … 96 95 void *PullStackByName (Stack *stack, char *name) { 97 96 98 int i , j;97 int i; 99 98 void *object; 100 99 … … 106 105 /* here is the element of interest */ 107 106 object = stack[0].object[i]; 108 RemoveStackEntry ( i);107 RemoveStackEntry (stack, i); 109 108 UnlockStack (stack); 110 109 return (object); … … 117 116 void *PullStackByID (Stack *stack, int id) { 118 117 119 int i , j;118 int i; 120 119 void *object; 121 120 … … 137 136 /* should only be called if you know where is a valid entry */ 138 137 int RemoveStackEntry (Stack *stack, int where) { 138 139 int i; 139 140 140 141 if (where < 0) abort(); … … 152 153 } 153 154 154 /* should only be called if manually lock the stack */155 intRemoveStackByID (Stack *stack, int id) {156 157 int i , j;155 /* should only be called if you manually lock the stack */ 156 void *RemoveStackByID (Stack *stack, int id) { 157 158 int i; 158 159 void *object; 159 160 … … 205 206 206 207 void LockStack (Stack *stack) { 208 # ifdef THREADED 209 pthread_mutex_lock (&stack[0].mutex); 210 # endif 207 211 return; 208 212 } 209 213 210 214 void UnlockStack (Stack *stack) { 215 # ifdef THREADED 216 pthread_mutex_unlock (&stack[0].mutex); 217 # endif 211 218 return; 212 219 } 213 214 // Safe with PTHREAD_MUTEX_INITIALIZER lock -
trunk/Ohana/src/opihi/pcontrol/StartHost.c
r8296 r8424 13 13 if (VarConfig ("COMMAND", "%s", command) == NULL) strcpy (command, "ssh"); 14 14 if (VarConfig ("SHELL", "%s", shell) == NULL) strcpy (shell, "pclient"); 15 16 gprint (GP_ERR, "starting host within thread %d\n", pthread_self()); 15 17 16 18 pid = rconnect (command, host[0].hostname, shell, stdio); -
trunk/Ohana/src/opihi/pcontrol/StartJob.c
r7917 r8424 1 1 # include "pcontrol.h" 2 2 3 int StartJob (Job *job ) {3 int StartJob (Job *job, Host *host) { 4 4 5 5 int i, Nline, status; 6 6 char *line, *p; 7 Host *host;8 7 IOBuffer buffer; 9 8 … … 11 10 12 11 /* job must have assigned host */ 13 host = (Host *) job[0].host; 14 if (host == NULL) { 15 gprint (GP_ERR, "programming error: no assigned host\n"); 16 exit (1); 17 } 12 ASSERT (host == (Host *) job[0].host, "invalid host"); 13 ASSERT (job == (Job *) host[0].job, "invalid job"); 18 14 19 15 /* construct command line : job arg0 arg1 ... argN\n */ … … 48 44 49 45 default: 50 if (VerboseMode()) gprint (GP_ERR, "unknown status for pclient command: programming error\n"); 51 exit (1); 46 ABORT ("unknown status for pclient command"); 52 47 } 53 48 54 49 /* check on result of pclient command */ 55 50 p = memstr (buffer.buffer, "STATUS", buffer.Nbuffer); 56 if (p == NULL) { 57 gprint (GP_ERR, "programming error: missing STATUS in pclient message\n"); 58 exit (1); 59 } 51 ASSERT (p != NULL, "missing STATUS in pclient message"); 52 60 53 sscanf (p, "%*s %d", &status); 61 54 switch (status) { … … 65 58 66 59 case -2: 67 gprint (GP_ERR, "programming error: syntax error in pclient command\n"); 68 exit (1); 60 ABORT ("syntax error in pclient command"); 69 61 70 62 case -3: 71 gprint (GP_ERR, "programming error: existing child on pclient\n"); 72 exit (1); 63 ABORT ("existing child on pclient"); 73 64 74 65 default: … … 81 72 } 82 73 /* we should never reach here */ 83 gprint (GP_ERR, "programming error: should not reach here (StartJob)\n"); 84 exit (1); 74 ABORT ("should not reach here (StartJob)"); 85 75 86 76 failure: 87 / * unlink host & job */77 // unlink host & job 88 78 job[0].host = NULL; 89 79 host[0].job = NULL; … … 94 84 return (FALSE); 95 85 } 96 97 /** note : host and job popped off stacks : can't use UnlinkJobAndHost **/ -
trunk/Ohana/src/opihi/pcontrol/StopHosts.c
r8296 r8424 19 19 int DownHosts () { 20 20 21 int i, Nobject;22 21 Stack *stack; 23 22 Host *host; … … 68 67 int HarvestHost (int pid) { 69 68 70 int result; 71 int waitstatus; 69 int i, result, waitstatus; 72 70 73 /* I probably should loop a few time with max timeout larger than 10ms... */ 74 usleep (10000); 75 result = waitpid (pid, &waitstatus, WNOHANG); 71 gprint (GP_ERR, "harvesting within thread %d\n", pthread_self()); 72 gprint (GP_ERR, "child process %d is down, wait for exit status\n", pid); 73 74 // Loop a few times waiting for child to exit 75 for (i = 0; i < 50; i++) { 76 result = waitpid (pid, &waitstatus, WNOHANG); 77 if ((result == -1) && (errno == ECHILD)) { 78 usleep (10000); 79 continue; 80 } else { 81 break; 82 } 83 } 76 84 switch (result) { 77 85 case -1: /* error with waitpid */ … … 81 89 gprint (GP_ERR, "did process already exit? programming error?\n"); 82 90 break; 91 case EINTR: 83 92 case EINVAL: 84 gprint (GP_ERR, "error EINVAL (waitpid): programming error\n");85 exit (1);86 case EINTR:87 gprint (GP_ERR, "error EINTR (waitpid): programming error\n");88 exit (1);89 93 default: 90 gprint (GP_ERR, "unknown error for waitpid (%d): programming error\n", errno);91 exit (1);94 perror ("unexpected error"); 95 ABORT ("(HarvestHost)"); 92 96 } 93 97 break; -
trunk/Ohana/src/opihi/pcontrol/check.c
r8296 r8424 1 1 # include "pcontrol.h" 2 3 char jobstate[7][32] = {"PENDING", "BUSY", "HUNG", "DONE", "KILL", "EXIT", "CRASH"};4 char hoststate[5][32] = {"IDLE", "BUSY", "DOWN", "DONE", "OFF"};5 2 6 3 int check (int argc, char **argv) { … … 18 15 if (!strcasecmp (argv[1], "JOB")) { 19 16 JobID = atoi (argv[2]); 17 18 SetCheckPoint (); // ensure the JOB is on one of the stacks 20 19 job = PullJobByID (JobID, &StackID); 21 20 if (job == NULL) { 22 21 gprint (GP_LOG, "job not found\n"); 22 ClearCheckPoint (); 23 23 return (FALSE); 24 24 } 25 gprint (GP_LOG, "STATUS %s\n", jobstate[StackID]);25 gprint (GP_LOG, "STATUS %s\n", GetJobStackName(StackID)); 26 26 gprint (GP_LOG, "EXITST %d\n", job[0].exit_status); 27 27 gprint (GP_LOG, "STDOUT %d\n", job[0].stdout_size); 28 28 gprint (GP_LOG, "STDERR %d\n", job[0].stderr_size); 29 29 PutJob (job, StackID, STACK_BOTTOM); 30 ClearCheckPoint (); 30 31 return (TRUE); 31 32 } … … 33 34 if (!strcasecmp (argv[1], "HOST")) { 34 35 HostID = atoi (argv[2]); 36 37 SetCheckPoint (); // ensure the HOST is on one of the stacks 35 38 host = PullHostByID (HostID, &StackID); 36 39 if (host == NULL) { 37 40 gprint (GP_LOG, "host not found\n"); 41 ClearCheckPoint (); 38 42 return (FALSE); 39 43 } 40 gprint (GP_LOG, "host %s\n", hoststate[StackID]);44 gprint (GP_LOG, "host %s\n", GetHostStackName(StackID)); 41 45 PutHost (host, StackID, STACK_BOTTOM); 46 ClearCheckPoint (); 42 47 return (TRUE); 43 48 } … … 46 51 return (FALSE); 47 52 } 48 49 XXX how do I handle objects which are in flight?? -
trunk/Ohana/src/opihi/pcontrol/host.c
r8296 r8424 1 1 # include "pcontrol.h" 2 2 3 // we use CheckPoints in this function to prevent objects in flight from being missing. 3 4 int host (int argc, char **argv) { 4 5 5 int N, Ns;6 6 int StackID; 7 7 IDtype HostID; … … 21 21 return (FALSE); 22 22 } 23 host[0].markoff = FALSE; 23 24 DownHost (host); 24 25 return (TRUE); 25 26 } 26 27 if (!strcasecmp (argv[1], "RETRY")) { 28 // no need to use a check point [thief: CheckDownHost (DOWN->IDLE)] 27 29 host = PullHostFromStackByName (PCONTROL_HOST_DOWN, argv[2]); 28 30 if (!host) { … … 39 41 } 40 42 if (!strcasecmp (argv[1], "CHECK")) { 43 SetCheckPoint (); // ensure the host is on one of the stacks 41 44 host = PullHostByName (argv[2], &StackID); 42 switch (StackID) { 43 case PCONTROL_HOST_IDLE: 44 gprint (GP_LOG, "host %s is IDLE\n", argv[2]); 45 case PCONTROL_HOST_BUSY: 46 gprint (GP_LOG, "host %s is BUSY\n", argv[2]); 47 case PCONTROL_HOST_DONE: 48 gprint (GP_LOG, "host %s is DONE\n", argv[2]); 49 case PCONTROL_HOST_DOWN: 50 gprint (GP_LOG, "host %s is DOWN\n", argv[2]); 51 case PCONTROL_HOST_OFF: 52 gprint (GP_LOG, "host %s is OFF\n", argv[2]); 53 default: 54 gprint (GP_LOG, "host %s not found\n", argv[2]); 55 return (FALSE); 45 if (host == NULL) { 46 gprint (GP_LOG, "host %s not found\n", argv[2]); 47 ClearCheckPoint (); 48 return (FALSE); 56 49 } 57 50 PutHost (host, StackID, STACK_BOTTOM); 58 return (FALSE); 51 ClearCheckPoint (); 52 53 gprint (GP_LOG, "host %s is %s\n", argv[2], GetHostStackName (StackID)); 54 return (TRUE); 59 55 } 60 56 if (!strcasecmp (argv[1], "OFF")) { 57 SetCheckPoint (); // ensure we can find the specified host 58 host = PullHostByName (argv[2], &StackID); 59 if (host == NULL) { 60 gprint (GP_LOG, "host %s not found\n", argv[2]); 61 ClearCheckPoint (); 62 return (FALSE); 63 } 64 host[0].markoff = TRUE; 65 PutHost (host, StackID, STACK_BOTTOM); 66 ClearCheckPoint (); 67 return (TRUE); 68 } 69 70 # if 0 61 71 host = PullHostFromStackByName (PCONTROL_HOST_IDLE, argv[2]); 62 72 if (host) { 63 73 StopHost (host); 64 74 OffHost (host); 75 ClearCheckPoint (); 65 76 return (TRUE); 66 77 } … … 68 79 if (host) { 69 80 OffHost (host); 81 ClearCheckPoint (); 70 82 return (TRUE); 71 83 } 72 /* XXX the 'markoff' flag is not being used */84 /* XXX the 'markoff' flag is not being checked */ 73 85 host = PullHostFromStackByName (PCONTROL_HOST_BUSY, argv[2]); 74 86 if (host) { 75 87 host[0].markoff = TRUE; 76 88 PutHost (host, PCONTROL_HOST_BUSY, STACK_BOTTOM); 89 ClearCheckPoint (); 77 90 return (TRUE); 78 91 } 79 gprint (GP_LOG, "host %s is not BUSY, IDLE, or DOWN\n", argv[2]); 92 /* XXX the 'markoff' flag is not being checked */ 93 host = PullHostFromStackByName (PCONTROL_HOST_DONE, argv[2]); 94 if (host) { 95 host[0].markoff = TRUE; 96 PutHost (host, PCONTROL_HOST_DONE, STACK_BOTTOM); 97 ClearCheckPoint (); 98 return (TRUE); 99 } 100 gprint (GP_LOG, "host %s is not found (already off?)\n", argv[2]); 101 ClearCheckPoint (); 80 102 return (FALSE); 81 103 } 104 # endif 105 82 106 if (!strcasecmp (argv[1], "DELETE")) { 107 // a check point is not required: no possible thief 83 108 host = PullHostFromStackByName (PCONTROL_HOST_OFF, argv[2]); 84 109 if (!host) { -
trunk/Ohana/src/opihi/pcontrol/init.c
r8296 r8424 8 8 int jobstack PROTO((int, char **)); 9 9 int kill_pc PROTO((int, char **)); 10 int pulse PROTO((int, char **));11 10 int status PROTO((int, char **)); 12 11 int run PROTO((int, char **)); … … 16 15 int verbose PROTO((int, char **)); 17 16 int version PROTO((int, char **)); 17 18 // pulse is only available in the un-threaded version 19 int pulse PROTO((int, char **)); 18 20 19 21 static Command cmds[] = { … … 25 27 {"verbose", verbose, "set the verbose mode for job"}, 26 28 {"version", version, "show version information"}, 27 {"pulse", pulse, "set system pulse"},28 29 {"job", job, "add job"}, 29 30 {"jobstack", jobstack, "list jobs for a single stack"}, … … 33 34 {"stderr", stderr_pc, "get stderr buffer for job"}, 34 35 {"stdout", stdout_pc, "get stdout buffer for job"}, 36 # ifndef THREADED 37 {"pulse", pulse, "set system pulse"}, 38 # endif 35 39 }; 36 40 … … 42 46 AddCommand (&cmds[i]); 43 47 } 44 45 /* XXX temporary : put this elsewhere? */46 InitJobStacks ();47 InitHostStacks ();48 48 } -
trunk/Ohana/src/opihi/pcontrol/pcontrol.c
r8179 r8424 10 10 void program_init (int *argc, char **argv) { 11 11 12 # ifdef THREADED 13 pthread_t clientsThread; 14 # endif 15 12 16 auto_break = TRUE; 13 17 … … 18 22 gprintInit (); 19 23 24 InitJobStacks (); 25 InitHostStacks (); 26 20 27 rl_readline_name = opihi_name; 21 28 rl_attempted_completion_function = command_completer; 29 # ifdef THREADED 30 pthread_create (&clientsThread, NULL, &CheckSystem_Threaded, NULL); 31 rl_event_hook = NULL; 32 rl_set_keyboard_input_timeout (1000); 33 # else 22 34 rl_event_hook = CheckSystem; 23 35 rl_set_keyboard_input_timeout (1000); 36 # endif 24 37 25 38 set_str_variable ("HISTORY", opihi_history); -
trunk/Ohana/src/opihi/pcontrol/run.c
r8296 r8424 8 8 } 9 9 10 # ifdef THREADED 11 SetRunSystem (TRUE); 12 # else 10 13 rl_event_hook = CheckSystem; 14 # endif 11 15 12 16 return (TRUE); 13 17 } 14 18 15 int run_threaded (int argc, char **argv) {16 17 if (argc != 1) {18 gprint (GP_ERR, "USAGE: run\n");19 return (FALSE);20 }21 22 // some action23 24 return (TRUE);25 } -
trunk/Ohana/src/opihi/pcontrol/status.c
r8296 r8424 1 1 # include "pcontrol.h" 2 2 3 char jobname[7][32] = {"PENDING", "BUSY", "HUNG", "DONE", "KILL", "EXIT", "CRASH"};4 char hostname[5][32] = {"IDLE", "BUSY", "DOWN", "DONE", "OFF"};3 int PrintJobStack (int Nstack); 4 int PrintHostStack (int Nstack); 5 5 6 6 int status (int argc, char **argv) { … … 31 31 LockStack (stack); 32 32 Nobject = stack[0].Nobject; 33 gprint (GP_LOG, "job stack %s: %d objects\n", jobname[Nstack], Nobject);33 gprint (GP_LOG, "job stack %s: %d objects\n", GetJobStackName(Nstack), Nobject); 34 34 35 35 for (i = 0; i < Nobject; i++) { … … 57 57 LockStack (stack); 58 58 Nobject = stack[0].Nobject; 59 gprint (GP_LOG, "host stack %s: %d objects\n", hostname[Nstack], Nobject);59 gprint (GP_LOG, "host stack %s: %d objects\n", GetHostStackName(Nstack), Nobject); 60 60 61 61 for (i = 0; i < Nobject; i++) { -
trunk/Ohana/src/opihi/pcontrol/stop.c
r7917 r8424 8 8 } 9 9 10 # ifdef THREADED 11 SetRunSystem (FALSE); 12 # else 10 13 rl_event_hook = NULL; 14 # endif 11 15 12 16 return (TRUE);
Note:
See TracChangeset
for help on using the changeset viewer.
