Changeset 23530
- Timestamp:
- Mar 25, 2009, 11:56:09 AM (17 years ago)
- Location:
- trunk/Ohana/src/opihi
- Files:
-
- 43 edited
- 3 copied
-
include/pantasks.h (modified) (3 diffs)
-
include/shell.h (modified) (1 diff)
-
lib.shell/SocketOps.c (modified) (1 diff)
-
lib.shell/gprint.c (modified) (3 diffs)
-
pantasks/CheckController.c (modified) (9 diffs)
-
pantasks/CheckJobs.c (modified) (11 diffs)
-
pantasks/CheckPassword.c (modified) (1 diff)
-
pantasks/CheckTasks.c (modified) (3 diffs)
-
pantasks/ControllerOps.c (modified) (10 diffs)
-
pantasks/InputQueue.c (modified) (6 diffs)
-
pantasks/JobOps.c (modified) (4 diffs)
-
pantasks/ListenClients.c (modified) (13 diffs)
-
pantasks/LocalJob.c (modified) (1 diff)
-
pantasks/Makefile (modified) (4 diffs)
-
pantasks/controller.c (modified) (1 diff)
-
pantasks/controller_host.c (modified) (4 diffs)
-
pantasks/controller_jobstack.c (modified) (1 diff)
-
pantasks/controller_status.c (modified) (1 diff)
-
pantasks/controller_threads.c (modified) (1 diff)
-
pantasks/controller_verbose.c (modified) (1 diff)
-
pantasks/delete.c (modified) (5 diffs)
-
pantasks/flush.c (modified) (1 diff)
-
pantasks/init.c (modified) (2 diffs)
-
pantasks/init_server.c (modified) (4 diffs)
-
pantasks/jobs_and_tasks_thread.c (copied) (copied from branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/jobs_and_tasks_thread.c )
-
pantasks/kill.c (modified) (4 diffs)
-
pantasks/notes.txt (modified) (1 diff)
-
pantasks/pantasks.c.in (modified) (2 diffs)
-
pantasks/pantasks_server.c.in (modified) (2 diffs)
-
pantasks/server.c (modified) (2 diffs)
-
pantasks/server_run.c (modified) (3 diffs)
-
pantasks/showtask.c (modified) (1 diff)
-
pantasks/status_server.c (modified) (7 diffs)
-
pantasks/task.c (modified) (4 diffs)
-
pantasks/task_active.c (modified) (2 diffs)
-
pantasks/task_command.c (modified) (3 diffs)
-
pantasks/task_host.c (modified) (2 diffs)
-
pantasks/task_macros.c (modified) (3 diffs)
-
pantasks/task_nmax.c (modified) (2 diffs)
-
pantasks/task_options.c (modified) (3 diffs)
-
pantasks/task_periods.c (modified) (3 diffs)
-
pantasks/task_stdout.c (modified) (6 diffs)
-
pantasks/task_trange.c (modified) (5 diffs)
-
pantasks/test/threadload.sh (copied) (copied from branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/test/threadload.sh )
-
pantasks/test/threadload2.sh (copied) (copied from branches/eam_branches/eam_branch_20090322/Ohana/src/opihi/pantasks/test/threadload2.sh )
-
pantasks/thread_locks.c (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
trunk/Ohana/src/opihi/include/pantasks.h
r21153 r23530 244 244 int CheckControllerOutput (void); 245 245 int PrintControllerOutput (void); 246 void PrintControllerBusyJobs (); 246 247 247 248 int AddHost (char *hostname, int max_threads); … … 260 261 261 262 // functions related to the server threads 263 void *CheckJobsAndTasksThread (void *data); 264 262 265 void CheckTasksSetState (int state); 263 266 int CheckTasksGetState (void); 264 void *CheckTasksThread (void *data);265 267 266 268 void CheckJobsSetState (int state); 267 269 int CheckJobsGetState (void); 268 void *CheckJobsThread (void *data); 270 271 // void *CheckTasksThread (void *data); 272 // void *CheckJobsThread (void *data); 269 273 270 274 void CheckControllerSetState (int state); … … 283 287 void CheckInputs (void); 284 288 285 void SerialThreadLock (void); 286 void SerialThreadUnlock (void); 289 void ClientThreadLock (void); 290 void ClientThreadUnlock (void); 291 void CommandThreadLock (void); 292 void CommandThreadUnlock (void); 293 void ControlThreadLock (void); 294 void ControlThreadUnlock (void); 295 void JobTaskThreadLock (void); 296 void JobTaskThreadUnlock (void); 287 297 288 298 int InitPassword (void); -
trunk/Ohana/src/opihi/include/shell.h
r21153 r23530 166 166 int gprint PROTO((gpDest dest, char *format, ...)); 167 167 int gwrite PROTO((char *buffer, int size, int N, gpDest dest)); 168 int gprint_syserror PROTO((gpDest dest, int myError, char *format, ...)); 169 int gprintv PROTO((gpDest dest, char *format, va_list argp)); 168 170 169 171 /* socket functions */ -
trunk/Ohana/src/opihi/lib.shell/SocketOps.c
r21041 r23530 5 5 # define DEBUG 0 6 6 7 // these three static variables are only modified in the setup command before 8 // the threads are started. Thread-safety is not a problem for these. 7 9 static int NVALID; 8 10 static int Nvalid; -
trunk/Ohana/src/opihi/lib.shell/gprint.c
r22423 r23530 287 287 288 288 int status; 289 gpStream *stream;290 289 va_list argp; 291 290 291 va_start (argp, format); 292 status = gprintv (dest, format, argp); 293 va_end (argp); 294 return (status); 295 } 296 297 int gprintv (gpDest dest, char *format, va_list argp) { 298 299 int status; 300 gpStream *stream; 301 292 302 // this thread only writes to its own stream 293 303 stream = gprintGetStream (dest); 294 295 va_start (argp, format);296 304 297 305 if (stream[0].mode == GP_FILE) { … … 303 311 vPrintIOBuffer (stream[0].buffer, format, argp); 304 312 } 305 va_end (argp);306 313 return (TRUE); 307 314 } … … 332 339 } 333 340 334 /* I'm going to need to have different output targets for different threads 335 we can do this with these functions: 336 337 pthread_t pthread_self(void); 338 int pthread_equal(pthread_t thread1, pthread_t thread2); 339 // returns TRUE if equal, FALSE if not 340 341 */ 342 341 # define MAX_ERROR_LENGTH 256 // Maximum length string for error messages 342 343 // print an error (based on errno values) to gprint destination 344 int gprint_syserror (gpDest dest, int myError, char *format, ...) { 345 346 char errorBuf[MAX_ERROR_LENGTH]; 347 char *errorMsg; 348 va_list argp; 349 350 // there are two strerror_r implementations; choose the right one: 351 #if ((_POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600) && ! _GNU_SOURCE) 352 errorMsg = strerror_r (myError, errorBuf, MAX_ERROR_LENGTH); 353 #else 354 strerror_r (myError, errorBuf, MAX_ERROR_LENGTH); 355 errorMsg = errorBuf; 356 #endif 357 358 va_start (argp, format); 359 gprintv (dest, format, argp); 360 va_end (argp); 361 362 gprintv (dest, "%s\n", errorMsg); 363 return TRUE; 364 } -
trunk/Ohana/src/opihi/pantasks/CheckController.c
r14590 r23530 1 1 # include "pantasks.h" 2 2 3 static struct timeval start; 4 void TimerMark (); 5 float TimerElapsed (int reset); 3 void TimerMark (struct timeval *start); 4 float TimerElapsed (struct timeval *start, int reset); 6 5 7 6 int CheckController () { … … 11 10 Job *job; 12 11 IOBuffer buffer; 12 struct timeval start; 13 13 14 14 /* get the list of completed jobs (exit / crash), update the job status */ … … 17 17 /*** check EXIT jobs ***/ 18 18 InitIOBuffer (&buffer, 0x100); 19 // TimerMark ();20 // status = ControllerCommand ("stop", CONTROLLER_PROMPT, &buffer);21 // if (VerboseMode()) gprint (GP_ERR, "stop controller %f\n", TimerElapsed(TRUE));22 19 23 TimerMark ( );20 TimerMark (&start); 24 21 FlushIOBuffer (&buffer); 22 25 23 status = ControllerCommand ("jobstack exit", CONTROLLER_PROMPT, &buffer); 26 if (VerboseMode()) gprint (GP_ERR, "check exit stack %f\n", TimerElapsed( TRUE));24 if (VerboseMode()) gprint (GP_ERR, "check exit stack %f\n", TimerElapsed(&start, TRUE)); 27 25 if (!status) goto escape; 28 26 … … 34 32 status = sscanf (buffer.buffer, "%*s %d", &Njobs); 35 33 if (status != 1) goto escape; 36 if (VerboseMode()) gprint (GP_ERR, "parse %d jobs on stack %f\n", Njobs, TimerElapsed( TRUE));34 if (VerboseMode()) gprint (GP_ERR, "parse %d jobs on stack %f\n", Njobs, TimerElapsed(&start, TRUE)); 37 35 38 36 p = buffer.buffer; … … 46 44 status = sscanf (p, "%d", &JobID); 47 45 46 // the operations within this locked block only interact with the controller or 47 // modify the properties of the selected job 48 JobTaskLock(); 48 49 job = FindControllerJob (JobID); 49 50 if (job == NULL) { 50 51 gprint (GP_ERR, "misplaced job? %d not in EXIT job list\n", JobID); 52 JobTaskUnlock(); 51 53 continue; 52 54 } 53 55 /* this checks the individual job status, grabs stdout/stderr */ 54 56 CheckControllerJob (job); 57 JobTaskUnlock(); 55 58 } 56 if (VerboseMode()) gprint (GP_ERR, "clear %d exit jobs %f\n", i, TimerElapsed( TRUE));59 if (VerboseMode()) gprint (GP_ERR, "clear %d exit jobs %f\n", i, TimerElapsed(&start, TRUE)); 57 60 58 61 /*** check CRASH jobs ***/ … … 67 70 status = sscanf (buffer.buffer, "%*s %d", &Njobs); 68 71 if (status != 1) goto escape; 69 if (VerboseMode()) gprint (GP_ERR, "check crash stack %f\n", TimerElapsed( TRUE));72 if (VerboseMode()) gprint (GP_ERR, "check crash stack %f\n", TimerElapsed(&start, TRUE)); 70 73 71 74 p = buffer.buffer; … … 77 80 } 78 81 p = q + 1; 79 80 82 status = sscanf (p, "%d", &JobID); 83 84 // the operations within this locked block only interact with the controller or 85 // modify the properties of the selected job 86 JobTaskLock(); 81 87 job = FindControllerJob (JobID); 82 88 if (job == NULL) { 83 89 gprint (GP_ERR, "misplaced job? %d not in CRASH job list\n", JobID); 90 JobTaskUnlock(); 84 91 continue; 85 92 } 86 93 /* this checks the individual job status, grabs stdout/stderr */ 87 94 CheckControllerJob (job); 95 JobTaskUnlock(); 88 96 } 89 if (VerboseMode()) gprint (GP_ERR, "clear %d crash jobs %f\n", i, TimerElapsed( TRUE));97 if (VerboseMode()) gprint (GP_ERR, "clear %d crash jobs %f\n", i, TimerElapsed(&start, TRUE)); 90 98 91 99 FlushIOBuffer (&buffer); 92 // status = ControllerCommand ("run", CONTROLLER_PROMPT, &buffer);93 100 FreeIOBuffer (&buffer); 94 101 return (TRUE); … … 96 103 escape: 97 104 FlushIOBuffer (&buffer); 98 // status = ControllerCommand ("run", CONTROLLER_PROMPT, &buffer);99 105 FreeIOBuffer (&buffer); 100 106 return (FALSE); 101 107 } 102 108 103 void TimerMark ( ) {104 gettimeofday ( &start, (void *) NULL);109 void TimerMark (struct timeval *start) { 110 gettimeofday (start, (void *) NULL); 105 111 } 106 112 107 float TimerElapsed ( int reset) {113 float TimerElapsed (struct timeval *start, int reset) { 108 114 109 115 float dtime; … … 111 117 112 118 gettimeofday (&stop, (void *) NULL); 113 dtime = DTIME (stop, start );114 if (reset) gettimeofday ( &start, (void *) NULL);119 dtime = DTIME (stop, start[0]); 120 if (reset) gettimeofday (start, (void *) NULL); 115 121 return (dtime); 116 122 } -
trunk/Ohana/src/opihi/pantasks/CheckJobs.c
r15871 r23530 18 18 next_timeout = 1.0; 19 19 20 JobTaskLock(); 20 21 /** test all jobs: ready to test? finished? **/ 21 22 while ((job = NextJob ()) != NULL) { … … 77 78 /* set taskarg variables */ 78 79 for (i = 0; i < job[0].argc; i++) { 79 sprintf (varname, "taskarg:%d", i);80 set_str_variable (varname, job[0].argv[i]);80 sprintf (varname, "taskarg:%d", i); 81 set_str_variable (varname, job[0].argv[i]); 81 82 } 82 83 set_int_variable ("taskarg:n", job[0].argc); … … 84 85 /* set options variables */ 85 86 for (i = 0; i < job[0].optc; i++) { 86 sprintf (varname, "options:%d", i);87 set_str_variable (varname, job[0].optv[i]);87 sprintf (varname, "options:%d", i); 88 set_str_variable (varname, job[0].optv[i]); 88 89 } 89 90 set_int_variable ("options:n", job[0].optc); … … 109 110 if (VerboseMode()) gprint (GP_LOG, "job %s (%d) crash\n", task[0].name, job[0].JobID); 110 111 if (task[0].crash != NULL) { 112 // we need to unlock JobTask since JobTaskLock is called inside CommandLock in the client thread 113 JobTaskUnlock(); 114 CommandLock(); 111 115 exec_loop (task[0].crash); 116 CommandUnlock(); 117 JobTaskLock(); 112 118 } 113 119 } … … 133 139 } 134 140 } 135 if (macro != NULL) exec_loop (macro); 141 if (macro != NULL) { 142 // we need to unlock JobTask since JobTaskLock is called inside CommandLock in the client thread 143 JobTaskUnlock(); 144 CommandLock(); 145 exec_loop (macro); 146 CommandUnlock(); 147 JobTaskLock(); 148 } 136 149 } 137 150 … … 146 159 DeleteJob (job); 147 160 continue; 148 break;149 161 150 162 default: … … 156 168 /* check for timeout - (local jobs only) 157 169 we only check timeout after a poll (forces at least one poll) 158 */170 */ 159 171 if (job[0].mode == JOB_LOCAL) { 160 172 if (GetTaskTimer(job[0].start, FALSE) < task[0].timeout_period) { … … 179 191 /* set taskarg variables */ 180 192 for (i = 0; i < job[0].argc; i++) { 181 sprintf (varname, "taskarg:%d", i);182 set_str_variable (varname, job[0].argv[i]);193 sprintf (varname, "taskarg:%d", i); 194 set_str_variable (varname, job[0].argv[i]); 183 195 } 184 196 set_int_variable ("taskarg:n", job[0].argc); … … 193 205 /* run task[0].timeout macro, if it exists */ 194 206 if (task[0].timeout != NULL) { 207 // we need to unlock JobTask since JobTaskLock is called inside CommandLock in the client thread 208 JobTaskUnlock(); 209 CommandLock(); 195 210 exec_loop (task[0].timeout); 211 CommandUnlock(); 212 JobTaskLock(); 196 213 } 197 214 … … 205 222 } 206 223 // fprintf (stderr, "check %d jobs\n", Ncheck); 224 JobTaskUnlock(); 207 225 return (next_timeout); 208 226 } … … 210 228 /* 211 229 212 job / task timeline:213 214 task:215 0 exec216 start create217 task clock new job218 219 job:220 0 1xpoll 2xpoll 3xpoll221 start check check check222 job clock status status status223 224 . . . timeout225 run226 timeout227 228 must be at least one poll before timeout229 (timeout >= poll)230 job / task timeline: 231 232 task: 233 0 exec 234 start create 235 task clock new job 236 237 job: 238 0 1xpoll 2xpoll 3xpoll 239 start check check check 240 job clock status status status 241 242 . . . timeout 243 run 244 timeout 245 246 must be at least one poll before timeout 247 (timeout >= poll) 230 248 */ -
trunk/Ohana/src/opihi/pantasks/CheckPassword.c
r8548 r23530 2 2 # define DEBUG 0 3 3 4 // this static var is only used by InitPassword and CheckPassword below. 5 // Both functions are only called by the main thread. 4 6 static char PASSWORD[256]; 5 7 -
trunk/Ohana/src/opihi/pantasks/CheckTasks.c
r23329 r23530 7 7 int status; 8 8 float time_running, next_timeout, fuzz; 9 // struct timeval now;10 9 11 10 // actual maximum delay is controlled in job_threads.c 12 11 next_timeout = 1.0; 13 12 13 JobTaskLock(); 14 14 /** test all tasks: ready to test? ready to run? **/ 15 15 while ((task = NextTask ()) != NULL) { … … 54 54 /* ready to run? : run task.exec macro */ 55 55 if (task[0].exec != NULL) { 56 // we need to unlock JobTask since JobTaskLock is called inside CommandLock in the client thread 57 JobTaskUnlock(); 58 CommandLock(); 56 59 status = exec_loop (task[0].exec); 60 CommandUnlock(); 61 JobTaskLock(); 57 62 if (!status) { 58 63 continue; 59 64 } 60 65 } 61 62 // gettimeofday (&now, (void *) NULL);63 // fprintf (stderr, "t1: %d %6d - \n", now.tv_sec, now.tv_usec);64 66 65 67 /* check if there are errors with this task */ … … 68 70 } 69 71 70 // gettimeofday (&now, (void *) NULL);71 // fprintf (stderr, "t2: %d %6d - \n", now.tv_sec, now.tv_usec);72 73 72 /* construct job from task */ 74 73 job = CreateJob (task); 75 74 if (DEBUG) fprintf (stderr, "create job: (%zx) %d of %d\n", (size_t) job[0].stdout_buff.buffer, job[0].stdout_buff.Nbuffer, job[0].stdout_buff.Nalloc); 76 75 77 // gettimeofday (&now, (void *) NULL); 78 // fprintf (stderr, "t3: %d %6d - \n", now.tv_sec, now.tv_usec); 79 80 /* execute job - XXX add status test */ 81 SubmitJob (job); 82 83 // fprintf (stderr, "nl: %d %6d - ", 84 // task[0].last.tv_sec, task[0].last.tv_usec); 85 86 /* increment job counters */ 87 task[0].Njobs ++; 88 task[0].Npending ++; 89 90 // fprintf (stderr, "%d %6d\n", 91 // task[0].last.tv_sec, task[0].last.tv_usec); 76 /* execute job */ 77 if (!SubmitJob (job)) { 78 DeleteJob (job); 79 continue; 80 } 81 task[0].Njobs ++; // number of jobs successfully submitted 92 82 93 83 /* increment Nrun for inclusive ranges with Nmax */ 94 84 BumpTimeRanges (task[0].ranges, task[0].Nranges); 95 85 } 86 JobTaskUnlock(); 96 87 return (next_timeout); 97 88 } -
trunk/Ohana/src/opihi/pantasks/ControllerOps.c
r20032 r23530 12 12 13 13 /* local static variables to track the controller host properties */ 14 /* these are used by AddHost and DeleteHost, and are only called by controller_host.c (clientThread) */ 15 /* or by RestartController : lock between these two? */ 14 16 static Host *hosts = NULL; 15 17 static int Nhosts = 0; … … 31 33 } 32 34 35 // XXX possible race condition problem: if we delete a host while the controller 36 // is being restarted. to fix this, we need to keep the deletion in controller_thread, 37 // but perhaps mark it in the client_thread? 33 38 int DeleteHost (char *hostname) { 34 39 … … 261 266 } 262 267 268 // This function is called by the JobTaskThread via SubmitJob. We need to unlock the 269 // JobTaskLock to avoid a dead lock with the JobTaskLock called in CheckController 270 JobTaskUnlock(); 271 ControlLock(__func__); 263 272 InitIOBuffer (&buffer, 0x100); 264 273 status = ControllerCommand (cmd, CONTROLLER_PROMPT, &buffer); 265 274 free (cmd); 275 ControlUnlock(__func__); 276 JobTaskLock(); 277 266 278 267 279 /* extract the job PID from the controller response */ … … 411 423 if ((status == -1) && (errno == EPIPE)) { 412 424 StopController (); 413 gprint (GP_ERR, "controller is down (pipe closed), restarting\n");425 fprintf (stderr, "controller is down (pipe closed), restarting\n"); 414 426 if (!RestartController ()) { 415 427 return (FALSE); … … 434 446 if (status == 0) { 435 447 StopController (); 436 gprint (GP_ERR, "controller is down (EOF), restarting\n");448 fprintf (stderr, "controller is down (EOF), restarting\n"); 437 449 if (!RestartController ()) { 438 450 return (FALSE); … … 441 453 } 442 454 if (status == -1) { 443 gprint (GP_ERR, "controller is not responding (%d tries)\n", j);455 fprintf (stderr, "controller is not responding (%d tries)\n", j); 444 456 gwrite (buffer[0].buffer, 1, buffer[0].Nbuffer, GP_ERR); 445 457 } 446 458 } 447 459 if (status == -1) { 448 gprint (GP_ERR, "controller still not responding, giving up\n");460 fprintf (stderr, "controller still not responding, giving up\n"); 449 461 return (FALSE); 450 462 } … … 456 468 bzero (buffer[0].buffer + buffer[0].Nbuffer, buffer[0].Nalloc - buffer[0].Nbuffer); 457 469 } 458 /* if (VerboseMode()) gprint (GP_ERR, "message received, %d cycles\n", i); */470 if (VerboseMode()) fprintf (stderr, "message received, %d cycles\n", i); 459 471 return (TRUE); 460 472 } … … 492 504 } 493 505 506 void PrintControllerBusyJobs () { 507 508 int status; 509 char command[1024]; 510 IOBuffer buffer; 511 512 /* check if controller is running */ 513 status = CheckControllerStatus (); 514 if (!status) { 515 return; 516 } 517 518 sprintf (command, "jobstack busy"); 519 InitIOBuffer (&buffer, 0x100); 520 521 status = ControllerCommand (command, CONTROLLER_PROMPT, &buffer); 522 523 if (status) { 524 gprint (GP_LOG, " jobs currently running remotely:\n"); 525 gwrite (buffer.buffer, 1, buffer.Nbuffer, GP_LOG); 526 } else { 527 gprint (GP_LOG, "controller is not responding\n"); 528 } 529 FreeIOBuffer (&buffer); 530 return; 531 } 532 494 533 int PrintControllerOutput () { 495 534 … … 581 620 InitIOBuffer (&buffer, 0x100); 582 621 583 // XXX lock the host table? SerialThreadLock ();622 // XXX lock the host table? no: that would risk a dead lock between client and controller threads: 584 623 gprint (GP_ERR, "pcontrol restarted, reloading hosts\n"); 585 624 for (i = 0; i < Nhosts; i++) { … … 588 627 status = ControllerCommand (command, CONTROLLER_PROMPT, &buffer); 589 628 } 590 // SerialThreadUnlock ();591 629 592 630 if (status) gwrite (buffer.buffer, 1, buffer.Nbuffer, GP_LOG); -
trunk/Ohana/src/opihi/pantasks/InputQueue.c
r16449 r23530 1 1 # include "pantasks.h" 2 // we use fprintf for DEBUG statements to avoid deadlocking issues 2 3 3 4 static int Ninputs = 0; … … 23 24 void AddNewInput (char *input) { 24 25 26 // XXX define the InputMutex 25 27 SerialThreadLock (); 26 if (DEBUG) gprint (GP_LOG, "adding a new input (%s)\n", input); 28 29 if (DEBUG) fprintf (stderr, "adding a new input (%s)\n", input); 27 30 inputs[Ninputs] = input; 28 31 Ninputs ++; … … 31 34 REALLOCATE (inputs, char *, NINPUTS); 32 35 } 33 if (DEBUG) gprint (GP_LOG, "done new input (%s)\n", input);36 if (DEBUG) fprintf (stderr, "done new input (%s)\n", input); 34 37 SerialThreadUnlock (); 35 38 } … … 40 43 int i, j; 41 44 42 if (DEBUG) gprint (GP_LOG, "deleting an input (%s)\n", input); 45 if (DEBUG) fprintf (stderr, "deleting an input (%s)\n", input); 46 47 // XXX lock here 43 48 for (i = 0; i < Ninputs; i++) { 44 49 if (inputs[i] == input) { … … 52 57 REALLOCATE (inputs, char *, NINPUTS); 53 58 } 54 if (DEBUG) gprint (GP_LOG, "deleted an input\n"); 59 // XXX unlock here 60 if (DEBUG) fprintf (stderr, "deleted an input\n"); 55 61 return TRUE; 56 62 } 57 63 } 58 64 // did not find the input 65 // XXX unlock here 59 66 return FALSE; 60 67 } … … 66 73 char *input, *line, *outline, tmp; 67 74 68 if (Ninputs < 1) return; 75 // XXX lock here 76 if (Ninputs < 1) { 77 // XXX unlock here 78 return; 79 } 69 80 70 81 input = inputs[0]; 71 if (DEBUG) gprint (GP_LOG, "got an input (%s)\n", input);82 if (DEBUG) fprintf (stderr, "got an input (%s)\n", input); 72 83 73 84 Nbytes = snprintf (&tmp, 0, "input %s", input); 74 85 ALLOCATE (line, char, Nbytes + 1); 75 86 snprintf (line, Nbytes + 1, "input %s", input); 87 // XXX unlock here 76 88 77 89 status = command (line, &outline, TRUE); -
trunk/Ohana/src/opihi/pantasks/JobOps.c
r18160 r23530 88 88 } 89 89 90 /* check if controller is running */91 status = CheckControllerStatus ();92 if (!status) {93 return;94 }95 96 sprintf (command, "jobstack busy");97 InitIOBuffer (&buffer, 0x100);98 99 SerialThreadLock ();100 status = ControllerCommand (command, CONTROLLER_PROMPT, &buffer);101 SerialThreadUnlock ();102 103 if (status) {104 gprint (GP_LOG, " jobs currently running remotely:\n");105 gwrite (buffer.buffer, 1, buffer.Nbuffer, GP_LOG);106 } else {107 gprint (GP_LOG, "controller is not responding\n");108 }109 FreeIOBuffer (&buffer);110 90 return; 111 91 } … … 169 149 REALLOCATE (jobs, Job *, NJOBS); 170 150 } 151 152 /* increment job counters */ 153 task[0].Npending ++; 154 171 155 return (jobs[Njobs-1]); 172 156 } … … 265 249 int SubmitJob (Job *job) { 266 250 251 int status; 252 267 253 if (job[0].mode == JOB_LOCAL) { 268 254 if (DEBUG) fprintf (stderr, "submit job: (%zx) %d of %d\n", (size_t) job[0].stdout_buff.buffer, job[0].stdout_buff.Nbuffer, job[0].stdout_buff.Nalloc); 269 SubmitLocalJob (job);255 status = SubmitLocalJob (job); 270 256 } else { 271 SubmitControllerJob (job); 257 status = SubmitControllerJob (job); 258 } 259 if (!status) { 260 return FALSE; 272 261 } 273 262 … … 285 274 if (job[0].mode == JOB_LOCAL) { 286 275 CheckLocalJob (job); 287 } else { 288 /* controller jobs are now checked en masse by CheckController */ 289 /* CheckControllerJob (job); */ 276 /* controller jobs are checked en masse by CheckController */ 290 277 } 291 278 return (job[0].state); -
trunk/Ohana/src/opihi/pantasks/ListenClients.c
r16905 r23530 2 2 # define DEBUG 0 3 3 4 // XXX make the calling functions thread-safe 4 5 static int NCLIENTS; 5 6 static int Nclients; … … 9 10 void InitClients () { 10 11 12 ClientLock(); 11 13 Nclients = 0; 12 14 NCLIENTS = 10; 13 15 ALLOCATE (clients, int, NCLIENTS); 14 16 ALLOCATE (buffers, IOBuffer *, NCLIENTS); 17 ClientUnlock(); 15 18 } 16 19 … … 18 21 void AddNewClient (int client) { 19 22 20 if (DEBUG) gprint (GP_LOG, "adding a new client (%d)\n", client); 23 ClientLock(); 24 if (DEBUG) fprintf (stderr, "adding a new client (%d)\n", client); 21 25 clients[Nclients] = client; 22 26 ALLOCATE (buffers[Nclients], IOBuffer, 1); … … 28 32 REALLOCATE (buffers, IOBuffer *, NCLIENTS); 29 33 } 34 ClientUnlock(); 30 35 } 31 36 … … 35 40 int i, j; 36 41 37 if (DEBUG) gprint (GP_LOG, "deleting a client (%d)\n", client); 42 ClientLock(); 43 if (DEBUG) fprintf (stderr, "deleting a client (%d)\n", client); 38 44 for (i = 0; i < Nclients; i++) { 39 45 if (clients[i] == client) { … … 51 57 REALLOCATE (buffers, IOBuffer *, NCLIENTS); 52 58 } 59 ClientUnlock(); 53 60 return TRUE; 54 61 } 55 62 } 56 63 // did not find the client 64 ClientUnlock(); 57 65 return FALSE; 58 66 } … … 70 78 gprintInit (); // each thread needs to init the printing system 71 79 72 // define server output log files 73 if (VarConfig ("PANTASKS_SERVER_STDOUT", "%s", log_stdout) != NULL) { 74 gprintSetFileThisThread (GP_LOG, log_stdout); 75 } else { 76 strcpy (log_stdout, "stdout"); 77 } 78 if (VarConfig ("PANTASKS_SERVER_STDERR", "%s", log_stderr) != NULL) { 79 gprintSetFileThisThread (GP_ERR, log_stderr); 80 } else { 81 strcpy (log_stderr, "stderr"); 82 } 80 /* set buffers for the output for this client */ 81 gprintSetBuffer (GP_LOG); 82 gprintSetBuffer (GP_ERR); 83 83 84 84 while (1) { … … 90 90 91 91 /* place all of the clients in the fdSet */ 92 Ncurrent = Nclients;93 92 Nmax = 0; 94 93 FD_ZERO (&fdSet); 94 ClientLock(); 95 Ncurrent = Nclients; 96 ClientUnlock(); 95 97 for (i = 0; i < Ncurrent; i++) { 96 98 Nmax = MAX (Nmax, clients[i]); … … 100 102 101 103 /* block until we have some data on the pipes (or timeout) */ 102 if (DEBUG) gprint (GP_ERR, "listening to %d clients\n", Ncurrent);104 if (DEBUG) fprintf (stderr, "listening to %d clients\n", Ncurrent); 103 105 status = select (Nmax, &fdSet, NULL, NULL, &timeout); 104 106 if (status == -1) { … … 122 124 if ((Nread == 0) || (Nread == -2)) { 123 125 /* error: do something */ 124 if (DEBUG && (Nread == 0)) gprint (GP_ERR, "socket is closed\n");125 if (DEBUG && (Nread == -2)) gprint (GP_ERR, "error reading from socket\n");126 if (DEBUG && (Nread == 0)) fprintf (stderr, "socket is closed\n"); 127 if (DEBUG && (Nread == -2)) fprintf (stderr, "error reading from socket\n"); 126 128 DeleteClient (clients[i]); 127 continue;129 break; // the other thread could also have modified the list; restart with new Ncurrent 128 130 } 129 131 130 if (DEBUG) gprint (GP_ERR, "read %d total bytes\n", buffers[i][0].Nbuffer);132 if (DEBUG) fprintf (stderr, "read %d total bytes\n", buffers[i][0].Nbuffer); 131 133 132 134 /* see if we have a complete message waiting; if not, keep waiting for messages */ … … 141 143 if (*line) { 142 144 143 /* set buffers for the output for this client */144 gprintSetBuffer (GP_LOG);145 gprintSetBuffer (GP_ERR);146 147 145 /* run the command, return the exit status */ 146 CommandLock(); 148 147 status = multicommand (line); 148 CommandUnlock(); 149 149 SendMessage (clients[i], "STATUS %d", status); 150 150 … … 156 156 SendMessageFixed (clients[i], 0, ""); 157 157 } 158 FlushIOBuffer (outbuffer); 158 159 159 160 // return the stdout messages first … … 164 165 SendMessageFixed (clients[i], 0, ""); 165 166 } 166 167 /* clear and reset the output buffers to their last output file names */ 168 gprintSetFileAllThreads (GP_LOG, NULL); 169 gprintSetFileAllThreads (GP_ERR, NULL); 167 FlushIOBuffer (outbuffer); 170 168 } 171 169 free (line); -
trunk/Ohana/src/opihi/pantasks/LocalJob.c
r18161 r23530 127 127 128 128 pid = fork (); 129 if (pid == -1) { 130 gprint_syserror (GP_ERR, errno, "error starting local job: "); 131 goto pipe_error; 132 } 133 129 134 if (!pid) { /* must be child process */ 130 135 if (VerboseMode()) gprint (GP_ERR, "starting local job\n"); -
trunk/Ohana/src/opihi/pantasks/Makefile
r18085 r23530 34 34 $(SRC)/pantasks.$(ARCH).o \ 35 35 $(SRC)/thread_locks.$(ARCH).o \ 36 $(SRC)/job_threads.$(ARCH).o \ 37 $(SRC)/task_threads.$(ARCH).o \ 36 $(SRC)/jobs_and_tasks_thread.$(ARCH).o \ 38 37 $(SRC)/controller_threads.$(ARCH).o 39 38 … … 41 40 $(SRC)/pantasks_server.$(ARCH).o \ 42 41 $(SRC)/server_run.$(ARCH).o \ 43 $(SRC)/server_load.$(ARCH).o \44 $(SRC)/InputQueue.$(ARCH).o \45 42 $(SRC)/ListenClients.$(ARCH).o \ 46 43 $(SRC)/server.$(ARCH).o \ … … 49 46 $(SRC)/CheckPassword.$(ARCH).o \ 50 47 $(SRC)/thread_locks.$(ARCH).o \ 51 $(SRC)/job_threads.$(ARCH).o \ 52 $(SRC)/task_threads.$(ARCH).o \ 53 $(SRC)/controller_threads.$(ARCH).o \ 54 $(SRC)/input_threads.$(ARCH).o 48 $(SRC)/jobs_and_tasks_thread.$(ARCH).o \ 49 $(SRC)/controller_threads.$(ARCH).o 55 50 56 51 funcs = \ … … 66 61 67 62 cmds = \ 68 $(SRC)/pulse.$(ARCH).o \69 63 $(SRC)/status.$(ARCH).o \ 70 64 $(SRC)/flush.$(ARCH).o \ -
trunk/Ohana/src/opihi/pantasks/controller.c
r18085 r23530 50 50 } 51 51 52 ControlLock(__func__); 52 53 status = (*func)(argc - 1, argv + 1); 54 ControlUnlock(__func__); 53 55 return (status); 54 56 } -
trunk/Ohana/src/opihi/pantasks/controller_host.c
r19125 r23530 15 15 16 16 if (argc != 3) goto usage; 17 if (max_threads && strcasecmp (argv[1], "ADD")) goto usage; 17 18 18 19 /* start controller connection (if needed) */ … … 27 28 if (!strcasecmp (argv[1], "ADD")) { 28 29 AddHost (argv[2], max_threads); 29 } else { 30 if (max_threads) goto usage; 31 } 30 } 32 31 33 32 if (!strcasecmp (argv[1], "DELETE")) { … … 42 41 InitIOBuffer (&buffer, 0x100); 43 42 44 SerialThreadLock ();45 43 status = ControllerCommand (command, CONTROLLER_PROMPT, &buffer); 46 SerialThreadUnlock ();47 44 48 45 if (status) gwrite (buffer.buffer, 1, buffer.Nbuffer, GP_LOG); … … 57 54 return (FALSE); 58 55 } 59 60 /* should I keep an internal host table so I can reload the61 hosts if the controller exits?62 63 alternatively, that could be a user-level choice64 */ -
trunk/Ohana/src/opihi/pantasks/controller_jobstack.c
r18085 r23530 24 24 InitIOBuffer (&buffer, 0x100); 25 25 26 SerialThreadLock ();27 26 status = ControllerCommand (command, CONTROLLER_PROMPT, &buffer); 28 SerialThreadUnlock ();29 27 30 28 if (status) { -
trunk/Ohana/src/opihi/pantasks/controller_status.c
r8548 r23530 23 23 InitIOBuffer (&buffer, 0x100); 24 24 25 SerialThreadLock ();26 25 status = ControllerCommand (command, CONTROLLER_PROMPT, &buffer); 27 SerialThreadUnlock ();28 26 29 27 if (status) { -
trunk/Ohana/src/opihi/pantasks/controller_threads.c
r15791 r23530 34 34 35 35 // one run of the task checker 36 SerialThreadLock ();36 ControlLock(__func__); 37 37 CheckController (); 38 ControlUnlock(__func__); 39 40 ControlLock(__func__); 38 41 CheckControllerOutput (); 39 SerialThreadUnlock (); 42 ControlUnlock(__func__); 43 40 44 if (VerboseMode() == 2) fprintf (stderr, "C"); 41 45 // fprintf (stderr, "**** C ****"); -
trunk/Ohana/src/opihi/pantasks/controller_verbose.c
r18085 r23530 24 24 InitIOBuffer (&buffer, 0x100); 25 25 26 SerialThreadLock ();27 26 status = ControllerCommand (command, CONTROLLER_PROMPT, &buffer); 28 SerialThreadUnlock ();29 27 30 28 if (status) { -
trunk/Ohana/src/opihi/pantasks/delete.c
r13542 r23530 11 11 JobID = atoi (argv[2]); 12 12 13 JobTaskLock(); 13 14 job = FindJob (JobID); 14 15 if (job == NULL) { 15 16 gprint (GP_LOG, "job not found\n"); 17 JobTaskUnlock(); 16 18 return (TRUE); 17 19 } … … 21 23 job[0].state = JOB_HUNG; 22 24 if (VerboseMode()) gprint (GP_LOG, "child process %d is hung, cannot kill\n", job[0].pid); 25 JobTaskUnlock(); 23 26 return (FALSE); 24 27 } … … 27 30 job[0].state = JOB_HUNG; 28 31 if (VerboseMode()) gprint (GP_LOG, "child process %d is hung, cannot kill\n", job[0].pid); 32 JobTaskUnlock(); 29 33 return (FALSE); 30 34 } … … 32 36 DeleteJob (job); 33 37 gprint (GP_LOG, "job removed\n"); 38 JobTaskUnlock(); 34 39 return (TRUE); 35 40 } … … 41 46 42 47 usage: 43 gprint (GP_ERR, "USAGE: delete job (JobID)\n");44 gprint (GP_ERR, "USAGE: delete task (TaskName)\n");45 return (FALSE);48 gprint (GP_ERR, "USAGE: delete job (JobID)\n"); 49 gprint (GP_ERR, "USAGE: delete task (TaskName)\n"); 50 return (FALSE); 46 51 } 47 52 -
trunk/Ohana/src/opihi/pantasks/flush.c
r14590 r23530 6 6 7 7 if (!strcasecmp (argv[1], "jobs")) { 8 JobTaskLock(); 8 9 FlushJobs (); 10 JobTaskUnlock(); 9 11 return (TRUE); 10 12 } -
trunk/Ohana/src/opihi/pantasks/init.c
r16453 r23530 18 18 int halt PROTO((int, char **)); 19 19 int flush_jobs PROTO((int, char **)); 20 int pulse PROTO((int, char **));21 20 int showtask PROTO((int, char **)); 22 21 int status_sys PROTO((int, char **)); … … 40 39 {1, "options", task_options, "define optional variables associated with the job task"}, 41 40 {1, "periods", task_periods, "define time scales for a task"}, 42 {1, "pulse", pulse, "set the scheduler update period"},43 41 {1, "run", run, "run the scheduler"}, 44 42 {1, "flush", flush_jobs, "flush all jobs from the queue"}, -
trunk/Ohana/src/opihi/pantasks/init_server.c
r16903 r23530 14 14 int task_stdout PROTO((int, char **)); 15 15 int task_stderr PROTO((int, char **)); 16 int pulse PROTO((int, char **));17 16 int flush_jobs PROTO((int, char **)); 18 17 int status_server PROTO((int, char **)); … … 41 40 {1, "npending", task_npending, "define maximum number of outstanding jobs for a task"}, 42 41 {1, "periods", task_periods, "define time scales for a task"}, 43 {1, "pulse", pulse, "set the scheduler update period"},44 42 {1, "flush", flush_jobs, "flush all jobs from the queue"}, 45 43 {1, "server", server, "server-specific commands"}, … … 67 65 InitJobs (); 68 66 InitJobIDs (); 69 InitInputs ();70 67 71 68 for (i = 0; i < sizeof (cmds) / sizeof (Command); i++) { … … 78 75 FreeJobs (); 79 76 FreeJobIDs (); 80 FreeInputs ();81 77 } -
trunk/Ohana/src/opihi/pantasks/kill.c
r13542 r23530 12 12 JobID = atoi (argv[1]); 13 13 14 JobTaskLock(); 14 15 job = FindJob (JobID); 15 16 if (job == NULL) { 16 17 gprint (GP_LOG, "job not found\n"); 18 JobTaskUnlock(); 17 19 return (TRUE); 18 20 } … … 22 24 job[0].state = JOB_HUNG; 23 25 if (VerboseMode()) gprint (GP_LOG, "child process %d is hung, cannot kill\n", job[0].pid); 26 JobTaskUnlock(); 24 27 return (FALSE); 25 28 } … … 28 31 job[0].state = JOB_HUNG; 29 32 if (VerboseMode()) gprint (GP_LOG, "child process %d is hung, cannot kill\n", job[0].pid); 33 JobTaskUnlock(); 30 34 return (FALSE); 31 35 } … … 33 37 DeleteJob (job); 34 38 gprint (GP_LOG, "job removed\n"); 39 JobTaskUnlock(); 35 40 return (TRUE); 36 41 } -
trunk/Ohana/src/opihi/pantasks/notes.txt
r7892 r23530 1 2 stdout_cntl: 3 GetJobOutout 4 CheckControllerJob 5 CheckController 6 controller_threads (controlThread) 7 StartController 8 ControllerCommand 9 CheckController 10 controller_threads (controlThread) 11 controller_check (clientThread) 12 controller_host (clientThread) 13 controller_jobstack (clientThread) 14 controller_run (clientThread) 15 controller_status (clientThread) 16 controller_verbose (clientThread) 17 DeleteControllerJob 18 CheckControllerJob 19 CheckController 20 controller_threads (controlThread) 21 CheckControllerJobStatus 22 CheckControllerJob 23 CheckController 24 controller_threads (controlThread) 25 SubmitControllerJob 26 SubmitJob 27 CheckTasks (JobTaskThread) 28 PrintControllerBusyJobs 29 KillControllerJob 30 QuitController 31 RestartController 32 CheckControllerOutput 33 controller_output (clientThread) 34 controller_threads (controlThread) 35 36 37 38 39 40 41 42 20090322 43 44 We've been surviving with a slightly broken threading / locking 45 model. I would like to clean it up. Previously, the threads where 46 blocking at a very coarse level, and there were certain interactions 47 that were not thread-safe. Here are my notes on re-working this: 48 49 * we have 6 threads: 50 51 * main (top-level parent) : after it spawns the 5 child threads, 52 this thread waits for new clients and adds them to the list of 53 active clients with 'AddNewClient' 54 55 * clientsThread (ListenClients): this thread is listening for 56 commands from the clients; when it receives a complete command, 57 it executes the command using 'multicommand'. 58 59 * tasksThread 60 CheckTasks 61 NextTask 62 63 64 65 * jobsThread 66 67 * controllerThread 68 CheckControllerStatus : race condition is irrelevant 69 70 * inputsThread 71 AddNewInput called by clientThread 72 (not sure this is really being used: 73 server input (filename) calls 'input' 74 server module (filename) calls 'module' 75 76 * functions which need to be thread-safe: 77 * AddNewClient 78 * gprint and supporting functions must be thread safe (extensively 79 used...) 80 81 82 * race condition is irrelevant for this variable 83 static int ControllerStatus = FALSE; 84 85 static int stdin_cntl, stdout_cntl, stderr_cntl; 86 GetJobOutput -> CheckControllerJob -> CheckController -> controllerThread 87 ControllerCommand -> controllerThread / clientThread 88 CheckControllerOutput -> controllerThread / clientThread 89 StartController -> controllerThread / clientThread 90 StopController 91 92 93 94 static IOBuffer stdout_buffer; 95 static IOBuffer stderr_buffer; 96 static int ControllerPID = 0; 97 98 /* local static variables to track the controller host properties */ 99 static Host *hosts = NULL; 100 static int Nhosts = 0; 101 static int NHOSTS = 0; 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 1 125 2 126 PanTasks Client / Server design -
trunk/Ohana/src/opihi/pantasks/pantasks.c.in
r16453 r23530 10 10 void program_init (int *argc, char **argv) { 11 11 12 pthread_t jobsThread; 13 pthread_t tasksThread; 12 pthread_t JobsAndTasksThread; 14 13 pthread_t controllerThread; 15 14 … … 50 49 51 50 /* start up the background threads here */ 52 pthread_create (&tasksThread, NULL, &CheckTasksThread, NULL); 53 pthread_create (&jobsThread, NULL, &CheckJobsThread, NULL); 54 pthread_create (&controllerThread, NULL, &CheckControllerThread, NULL); 51 // pthread_create (&tasksThread, NULL, &CheckTasksThread, NULL); 52 // pthread_create (&jobsThread, NULL, &CheckJobsThread, NULL); 53 pthread_create (&JobsAndTasksThread, NULL, &CheckJobsAndTasksThread, NULL); 54 pthread_create (&controllerThread, NULL, &CheckControllerThread, NULL); 55 55 return; 56 56 } -
trunk/Ohana/src/opihi/pantasks/pantasks_server.c.in
r16453 r23530 17 17 18 18 char log_stdout[128], log_stderr[128]; 19 pthread_t jobsThread; 20 pthread_t tasksThread; 21 pthread_t inputsThread; 19 pthread_t JobsAndTasksThread; 22 20 pthread_t clientsThread; 23 21 pthread_t controllerThread; … … 63 61 64 62 /* start up the background threads here */ 65 pthread_create (&clientsThread, NULL, &ListenClients, NULL); 66 pthread_create (&tasksThread, NULL, &CheckTasksThread, NULL); 67 pthread_create (&jobsThread, NULL, &CheckJobsThread, NULL); 68 pthread_create (&controllerThread, NULL, &CheckControllerThread, NULL); 69 pthread_create (&inputsThread, NULL, &CheckInputsThread, NULL); 63 pthread_create (&clientsThread, NULL, &ListenClients, NULL); 64 pthread_create (&JobsAndTasksThread, NULL, &CheckJobsAndTasksThread, NULL); 65 pthread_create (&controllerThread, NULL, &CheckControllerThread, NULL); 70 66 71 67 /* in this loop, we listen for incoming connections, validate, and -
trunk/Ohana/src/opihi/pantasks/server.c
r16463 r23530 5 5 int input PROTO((int, char **)); 6 6 int module PROTO((int, char **)); 7 int server_load PROTO((int, char **));8 7 int server_run PROTO((int, char **)); 9 8 int server_stop PROTO((int, char **)); … … 20 19 {1, "input", input, "load input file on server"}, 21 20 {1, "module", module, "load module file on server"}, 22 {1, "load", server_load, "load input file on server"},23 21 {1, "run", server_run, "run scheduler"}, 24 22 {1, "stop", server_stop, "stop scheduler"}, -
trunk/Ohana/src/opihi/pantasks/server_run.c
r11084 r23530 11 11 CheckJobsSetState (TRUE); 12 12 CheckControllerSetState (TRUE); 13 CheckInputsSetState (TRUE);14 13 return (TRUE); 15 14 } … … 23 22 24 23 CheckTasksSetState (FALSE); 25 // CheckJobsSetState (FALSE);26 // CheckControllerSetState (FALSE);27 CheckInputsSetState (FALSE);28 24 return (TRUE); 29 25 } … … 39 35 CheckJobsSetState (FALSE); 40 36 CheckControllerSetState (FALSE); 41 CheckInputsSetState (FALSE);42 37 return (TRUE); 43 38 } -
trunk/Ohana/src/opihi/pantasks/showtask.c
r12468 r23530 8 8 } 9 9 10 JobTaskLock(); 10 11 ShowTask (argv[1]); 12 JobTaskUnlock(); 13 11 14 return (TRUE); 12 15 } -
trunk/Ohana/src/opihi/pantasks/status_server.c
r16012 r23530 11 11 if ((N = get_argument (argc, argv, "-tasks"))) { 12 12 remove_argument (N, &argc, argv); 13 JobTaskLock(); 13 14 ListTasks (FALSE); 15 JobTaskUnlock(); 14 16 return (TRUE); 15 17 } … … 17 19 if ((N = get_argument (argc, argv, "-taskinfo"))) { 18 20 remove_argument (N, &argc, argv); 21 JobTaskLock(); 19 22 ListTasks (TRUE); 23 JobTaskUnlock(); 20 24 return (TRUE); 21 25 } … … 23 27 if ((N = get_argument (argc, argv, "-taskstats"))) { 24 28 remove_argument (N, &argc, argv); 29 JobTaskLock(); 25 30 if (argc == 2) { 26 31 ListTaskStats (argv[N]); … … 28 33 ListTaskStats (NULL); 29 34 } 35 JobTaskUnlock(); 30 36 return (TRUE); 31 37 } … … 33 39 if ((N = get_argument (argc, argv, "-taskstatsreset"))) { 34 40 remove_argument (N, &argc, argv); 41 JobTaskLock(); 35 42 if (argc == 2) { 36 43 ResetTaskStats (argv[N]); … … 38 45 ResetTaskStats (NULL); 39 46 } 47 JobTaskUnlock(); 40 48 return (TRUE); 41 49 } … … 56 64 gprint (GP_LOG, " Controller is stopped\n"); 57 65 } 66 67 JobTaskLock(); 58 68 ListTasks (FALSE); 59 69 ListJobs (); 70 JobTaskUnlock(); 71 72 ControlLock(__func__); 73 PrintControllerBusyJobs(); 74 ControlUnlock(__func__); 75 60 76 return (TRUE); 61 77 -
trunk/Ohana/src/opihi/pantasks/task.c
r14590 r23530 11 11 if (argc != 2) goto usage; 12 12 13 JobTaskLock(); 13 14 task = FindTask (argv[1]); 14 15 if (task == NULL) { /**** new task ****/ … … 18 19 SetNewTask (task); 19 20 } 21 JobTaskUnlock(); 22 20 23 /* While a task is being defined, it is removed from the task list. The new task is added to the task list 21 24 when the definition process is complete. … … 55 58 56 59 case TASK_END: 57 /* I need to add in a test here to see if all task elements58 have been defined. delete the task if not */59 60 free (input); 60 61 /* validate the new task: all mandatory elements defined? */ … … 63 64 return (FALSE); 64 65 } 66 JobTaskLock(); 65 67 RegisterNewTask (); 68 JobTaskUnlock(); 66 69 return (TRUE); 67 70 break; -
trunk/Ohana/src/opihi/pantasks/task_active.c
r10734 r23530 7 7 if (argc != 2) goto usage; 8 8 9 JobTaskLock(); 9 10 task = GetNewTask (); 10 11 if (task == NULL) { 11 12 gprint (GP_ERR, "ERROR: not defining or running a task\n"); 13 JobTaskUnlock(); 12 14 return (FALSE); 13 15 } … … 15 17 if (!strcasecmp (argv[1], "true")) { 16 18 task[0].active = TRUE; 19 JobTaskUnlock(); 17 20 return (TRUE); 18 21 } 19 22 if (!strcasecmp (argv[1], "false")) { 20 23 task[0].active = FALSE; 24 JobTaskUnlock(); 21 25 return (TRUE); 22 26 } 23 27 24 28 gprint (GP_ERR, "ERROR: invalid option: %s\n", argv[1]); 29 JobTaskUnlock(); 25 30 return (FALSE); 26 31 -
trunk/Ohana/src/opihi/pantasks/task_command.c
r7917 r23530 12 12 } 13 13 14 JobTaskLock(); 14 15 task = GetNewTask (); 15 16 if (task == NULL) { … … 17 18 if (task == NULL) { 18 19 gprint (GP_ERR, "ERROR: not defining or running a task\n"); 20 JobTaskUnlock(); 19 21 return (FALSE); 20 22 } … … 35 37 task[0].argv[i] = strcreate (argv[i+1]); 36 38 } 39 JobTaskUnlock(); 37 40 return (TRUE); 38 41 } -
trunk/Ohana/src/opihi/pantasks/task_host.c
r7917 r23530 22 22 23 23 task = GetNewTask (); 24 JobTaskLock(); 24 25 if (task == NULL) { 25 26 task = GetActiveTask (); 26 27 if (task == NULL) { 27 28 gprint (GP_ERR, "ERROR: not defining or running a task\n"); 29 JobTaskUnlock(); 28 30 return (FALSE); 29 31 } … … 34 36 task[0].host = NULL; 35 37 36 if (!strcasecmp (argv[1], "LOCAL")) return (TRUE); 38 if (!strcasecmp (argv[1], "LOCAL")) { 39 JobTaskUnlock(); 40 return (TRUE); 41 } 37 42 38 43 task[0].host = strcreate (argv[1]); 44 JobTaskUnlock(); 39 45 return (TRUE); 40 46 } -
trunk/Ohana/src/opihi/pantasks/task_macros.c
r10647 r23530 22 22 } 23 23 24 JobTaskLock(); 24 25 task = GetNewTask (); 25 26 if (task == NULL) { 26 27 gprint (GP_ERR, "ERROR: not defining or running a task\n"); 28 JobTaskUnlock(); 27 29 return (FALSE); 28 30 } … … 135 137 free (input); 136 138 REALLOCATE (macro[0].line, char *, MAX (1, macro[0].Nlines)); 139 JobTaskUnlock(); 137 140 return (TRUE); 138 141 } … … 148 151 } 149 152 } 153 JobTaskUnlock(); 150 154 return (TRUE); 151 155 } -
trunk/Ohana/src/opihi/pantasks/task_nmax.c
r11055 r23530 7 7 if (argc != 2) goto usage; 8 8 9 JobTaskLock(); 9 10 task = GetNewTask (); 10 11 if (task == NULL) { 11 12 gprint (GP_ERR, "ERROR: not defining or running a task\n"); 13 JobTaskUnlock(); 12 14 return (FALSE); 13 15 } 14 16 15 17 task[0].Nmax = atoi (argv[1]); 18 JobTaskUnlock(); 16 19 return (TRUE); 17 20 … … 27 30 if (argc != 2) goto usage; 28 31 32 JobTaskLock(); 29 33 task = GetNewTask (); 30 34 if (task == NULL) { 31 35 gprint (GP_ERR, "ERROR: not defining or running a task\n"); 36 JobTaskUnlock(); 32 37 return (FALSE); 33 38 } 34 39 35 40 task[0].NpendingMax = atoi (argv[1]); 41 JobTaskUnlock(); 36 42 return (TRUE); 37 43 -
trunk/Ohana/src/opihi/pantasks/task_options.c
r8129 r23530 12 12 } 13 13 14 JobTaskLock(); 14 15 task = GetNewTask (); 15 16 if (task == NULL) { … … 17 18 if (task == NULL) { 18 19 gprint (GP_ERR, "ERROR: not defining or running a task\n"); 20 JobTaskUnlock(); 19 21 return (FALSE); 20 22 } … … 35 37 task[0].optv[i] = strcreate (argv[i+1]); 36 38 } 39 JobTaskUnlock(); 37 40 return (TRUE); 38 41 } -
trunk/Ohana/src/opihi/pantasks/task_periods.c
r7917 r23530 40 40 } 41 41 42 JobTaskLock(); 42 43 task = GetNewTask (); 43 44 if (task == NULL) { … … 45 46 if (task == NULL) { 46 47 gprint (GP_ERR, "ERROR: not defining or running a task\n"); 48 JobTaskUnlock(); 47 49 return (FALSE); 48 50 } … … 53 55 if (Timeout) task[0].timeout_period = TimeoutValue; 54 56 57 JobTaskUnlock(); 55 58 return (TRUE); 56 59 } -
trunk/Ohana/src/opihi/pantasks/task_stdout.c
r12332 r23530 11 11 } 12 12 13 JobTaskLock(); 13 14 task = GetNewTask (); 14 15 if (task == NULL) { … … 16 17 if (task == NULL) { 17 18 gprint (GP_ERR, "ERROR: not defining or running a task\n"); 19 JobTaskUnlock(); 18 20 return (FALSE); 19 21 } … … 21 23 if (task[0].stdout_dump != NULL) free (task[0].stdout_dump); 22 24 task[0].stdout_dump = strcreate (argv[1]); 25 JobTaskUnlock(); 23 26 return (TRUE); 24 27 } … … 34 37 } 35 38 39 JobTaskLock(); 36 40 task = GetNewTask (); 37 41 if (task == NULL) { … … 39 43 if (task == NULL) { 40 44 gprint (GP_ERR, "ERROR: not defining or running a task\n"); 45 JobTaskUnlock(); 41 46 return (FALSE); 42 47 } … … 44 49 if (task[0].stderr_dump != NULL) free (task[0].stderr_dump); 45 50 task[0].stderr_dump = strcreate (argv[1]); 51 JobTaskUnlock(); 46 52 return (TRUE); 47 53 } -
trunk/Ohana/src/opihi/pantasks/task_trange.c
r15487 r23530 12 12 if (argc != 1) goto usage; 13 13 14 JobTaskLock(); 14 15 task = GetNewTask (); 15 16 if (task == NULL) { 16 17 gprint (GP_ERR, "ERROR: not defining or running a task\n"); 18 JobTaskUnlock(); 17 19 return (FALSE); 18 20 } … … 20 22 task[0].Nranges = 0; 21 23 REALLOCATE (task[0].ranges, TimeRange, 1); 24 JobTaskUnlock(); 22 25 return (TRUE); 23 26 } … … 40 43 41 44 if (argc != 3) goto usage; 42 43 task = GetNewTask ();44 if (task == NULL) {45 gprint (GP_ERR, "ERROR: not defining or running a task\n");46 return (FALSE);47 }48 45 49 46 /* test for Mon[@HH:MM:SS] - both must match */ … … 83 80 84 81 valid: 82 JobTaskLock(); 83 task = GetNewTask (); 84 if (task == NULL) { 85 gprint (GP_ERR, "ERROR: not defining or running a task\n"); 86 JobTaskUnlock(); 87 return (FALSE); 88 } 89 85 90 N = task[0].Nranges; 86 91 task[0].Nranges ++; … … 88 93 89 94 task[0].ranges[N] = range; 95 JobTaskUnlock(); 90 96 return (TRUE); 91 97 -
trunk/Ohana/src/opihi/pantasks/thread_locks.c
r11084 r23530 1 1 # include "pantasks.h" 2 2 3 /* this mutex is used by the serialized threads */ 3 /* mutex to lock Client table operations */ 4 static pthread_mutex_t ClientMutex = PTHREAD_MUTEX_INITIALIZER; 4 5 5 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 6 7 void SerialThreadLock () { 8 pthread_mutex_lock (&mutex); 6 void ClientLock () { 7 pthread_mutex_lock (&ClientMutex); 9 8 } 10 9 11 void SerialThreadUnlock () {12 pthread_mutex_unlock (&mutex);10 void ClientUnlock () { 11 pthread_mutex_unlock (&ClientMutex); 13 12 } 14 13 14 /* mutex to lock Command / Multicommand operations */ 15 static pthread_mutex_t CommandMutex = PTHREAD_MUTEX_INITIALIZER; 16 17 void CommandLock () { 18 // fprintf (stderr, "command lock\n"); 19 pthread_mutex_lock (&CommandMutex); 20 } 21 22 void CommandUnlock () { 23 // fprintf (stderr, "command unlock\n"); 24 pthread_mutex_unlock (&CommandMutex); 25 } 26 27 /* mutex to lock Control operations */ 28 static pthread_mutex_t ControlMutex = PTHREAD_MUTEX_INITIALIZER; 29 30 void ControlLock (char *func) { 31 // fprintf (stderr, "control lock %s\n", func); 32 pthread_mutex_lock (&ControlMutex); 33 } 34 35 void ControlUnlock (char *func) { 36 // fprintf (stderr, "control unlock %s\n", func); 37 pthread_mutex_unlock (&ControlMutex); 38 } 39 40 /* mutex to lock Job / Task operations */ 41 static pthread_mutex_t JobTaskMutex = PTHREAD_MUTEX_INITIALIZER; 42 43 void JobTaskLock () { 44 // fprintf (stderr, "jobtask lock\n"); 45 pthread_mutex_lock (&JobTaskMutex); 46 } 47 48 void JobTaskUnlock () { 49 // fprintf (stderr, "jobtask unlock\n"); 50 pthread_mutex_unlock (&JobTaskMutex); 51 } 52
Note:
See TracChangeset
for help on using the changeset viewer.
