Changeset 12404
- Timestamp:
- Mar 11, 2007, 10:56:27 AM (19 years ago)
- Location:
- trunk/Ohana/src/opihi
- Files:
-
- 5 deleted
- 7 edited
-
include/pantasks.h (modified) (2 diffs)
-
lib.shell/multicommand.c (modified) (1 diff)
-
pantasks/CheckController.c (modified) (3 diffs)
-
pantasks/CheckJobs.c (modified) (5 diffs)
-
pantasks/CheckSystem.c (deleted)
-
pantasks/CheckTasks.c (modified) (5 diffs)
-
pantasks/RunScheduler.c (deleted)
-
pantasks/connect_to_server.c (deleted)
-
pantasks/job_threads.c (modified) (2 diffs)
-
pantasks/scheduler.c (deleted)
-
pantasks/server_threads.c (deleted)
-
pantasks/task_threads.c (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
-
trunk/Ohana/src/opihi/include/pantasks.h
r11898 r12404 196 196 void FreeJob (Job *job); 197 197 198 int CheckJobs ();199 int CheckTasks ();198 float CheckJobs (); 199 float CheckTasks (); 200 200 int CheckSystem (); 201 201 int CheckController (); … … 225 225 int KillControllerJob (Job *job); 226 226 int CheckControllerStatus (); 227 int TestElapsedCheck ();228 227 void gotsignal (int signum); 229 228 int client_shell (int argc, char **argv); -
trunk/Ohana/src/opihi/lib.shell/multicommand.c
r7940 r12404 3 3 static int server = 0; 4 4 5 // XXX this is rather pantasks-specific... 5 6 void multicommand_InitServer () { 6 7 -
trunk/Ohana/src/opihi/pantasks/CheckController.c
r7917 r12404 37 37 38 38 p = buffer.buffer; 39 for (i = 0; (i < Njobs) && !TestElapsedCheck(); i++) {39 for (i = 0; i < Njobs; i++) { 40 40 q = strchr (p, '\n'); 41 41 if (q == NULL) { … … 56 56 if (VerboseMode()) gprint (GP_ERR, "clear %d exit jobs %f\n", i, TimerElapsed(TRUE)); 57 57 58 if (TestElapsedCheck()) goto finish;59 /* this will prevent us from ever checking crashed jobs... */60 61 58 /*** check CRASH jobs ***/ 62 59 FlushIOBuffer (&buffer); … … 73 70 74 71 p = buffer.buffer; 75 for (i = 0; (i < Njobs) && !TestElapsedCheck(); i++) {72 for (i = 0; i < Njobs; i++) { 76 73 q = strchr (p, '\n'); 77 74 if (q == NULL) { -
trunk/Ohana/src/opihi/pantasks/CheckJobs.c
r11898 r12404 1 1 # include "pantasks.h" 2 2 3 int CheckJobs () {3 float CheckJobs () { 4 4 5 5 FILE *f; … … 10 10 char varname[64]; 11 11 Queue *queue; 12 float time_running, next_timeout; 12 13 13 14 // int Ncheck; 14 15 // Ncheck = 0; 16 17 // actual maximum delay is controlled in job_threads.c 18 next_timeout = 1.0; 15 19 16 20 /** test all jobs: ready to test? finished? **/ … … 21 25 22 26 /* check poll period (ready to ask for status?) */ 23 if (GetTaskTimer(job[0].last, FALSE) < task[0].poll_period) continue; 27 time_running = GetTaskTimer(job[0].last, FALSE); 28 // fprintf (stderr, "next: %f, poll: %f, run: %f\n", next_timeout, task[0].poll_period, time_running); 29 if (time_running < task[0].poll_period) { 30 next_timeout = MIN (next_timeout, task[0].poll_period - time_running); 31 continue; 32 } 33 next_timeout = 0.0; 24 34 25 35 /* check current status */ … … 139 149 */ 140 150 if (job[0].mode == JOB_LOCAL) { 141 if (GetTaskTimer(job[0].start, FALSE) < task[0].timeout_period) continue; 151 if (GetTaskTimer(job[0].start, FALSE) < task[0].timeout_period) { 152 /* reset polling clock */ 153 SetTaskTimer (&job[0].last); 154 continue; 155 } 142 156 if (VerboseMode()) gprint (GP_LOG, "timeout on %s\n", task[0].name); 143 157 … … 180 194 /* reset polling clock */ 181 195 SetTaskTimer (&job[0].last); 182 if (TestElapsedCheck()) {183 // fprintf (stderr, "check %d jobs\n", Ncheck);184 return (TRUE);185 }186 196 } 187 197 // fprintf (stderr, "check %d jobs\n", Ncheck); 188 return ( TRUE);198 return (next_timeout); 189 199 } 190 200 -
trunk/Ohana/src/opihi/pantasks/CheckTasks.c
r12332 r12404 1 1 # include "pantasks.h" 2 2 3 int CheckTasks () {3 float CheckTasks () { 4 4 5 5 Job *job; 6 6 Task *task; 7 7 int status; 8 float time_running, next_timeout; 8 9 // struct timeval now; 10 11 // actual maximum delay is controlled in job_threads.c 12 next_timeout = 1.0; 9 13 10 14 /** test all tasks: ready to test? ready to run? **/ 11 15 while ((task = NextTask ()) != NULL) { 12 16 13 if (!task[0].active) continue;17 /*** test for all reasons we should skip this task ***/ 14 18 15 /* ready to test? : check exec period */ 16 if (GetTaskTimer(task[0].last, FALSE) < task[0].exec_period) continue; 17 18 /* need to check if the current time is within valid/invalid periods */ 19 /* task has been de-activated by the user */ 20 if (!task[0].active) { 21 continue; 22 } 23 /* current time is not within valid/invalid periods */ 19 24 if (!CheckTimeRanges (task[0].ranges, task[0].Nranges)) { 20 gettimeofday (&task[0].last, (void *) NULL); 21 continue; 25 continue; 22 26 } 27 /* all allowed tasks have been run in this period */ 23 28 if (task[0].Nmax && (task[0].Njobs >= task[0].Nmax)) { 24 gettimeofday (&task[0].last, (void *) NULL); 25 continue; 29 continue; 26 30 } 31 /* too many outstanding jobs */ 27 32 if (task[0].NpendingMax && (task[0].Npending >= task[0].NpendingMax)) { 28 // fprintf (stderr, "npending: %d, max npending: %d\n", task[0].Npending, task[0].NpendingMax); 29 gettimeofday (&task[0].last, (void *) NULL); 30 continue; 33 continue; 31 34 } 32 35 33 // gettimeofday (&now, (void *) NULL); 34 // fprintf (stderr, "t0: %d %6d - \n", now.tv_sec, now.tv_usec); 36 /* ready to test? : check time since last exec */ 37 time_running = GetTaskTimer(task[0].last, FALSE); 38 if (time_running < task[0].exec_period) { 39 // is we are to ready to run, set time to timeout, if shortest of all tasks 40 next_timeout = MIN (next_timeout, task[0].exec_period - time_running); 41 continue; 42 } 43 44 /* ready to try running the task : reset the timer */ 45 next_timeout = 0.0; 46 gettimeofday (&task[0].last, (void *) NULL); 47 // XXX here is where I should add a fuzz factor (fraction of exec_period) 35 48 36 49 /* ready to run? : run task.exec macro */ … … 38 51 status = exec_loop (task[0].exec); 39 52 if (!status) { 40 gettimeofday (&task[0].last, (void *) NULL); 41 continue; 53 continue; 42 54 } 43 55 } … … 48 60 /* check if there are errors with this task */ 49 61 if (!ValidateTask (task, TRUE)) { 50 gettimeofday (&task[0].last, (void *) NULL); 51 continue; 62 continue; 52 63 } 53 64 … … 67 78 // task[0].last.tv_sec, task[0].last.tv_usec); 68 79 69 /* reset timer on task (don't do this if Create/Submit fails) (why not??) */ 70 gettimeofday (&task[0].last, (void *) NULL); 80 /* increment job counters */ 71 81 task[0].Njobs ++; 72 82 task[0].Npending ++; … … 77 87 /* increment Nrun for inclusive ranges with Nmax */ 78 88 BumpTimeRanges (task[0].ranges, task[0].Nranges); 79 80 if (TestElapsedCheck()) return (TRUE);81 89 } 82 return ( TRUE);90 return (next_timeout); 83 91 } -
trunk/Ohana/src/opihi/pantasks/job_threads.c
r11898 r12404 14 14 void *CheckJobsThread (void *data) { 15 15 16 float next_timeout; 17 16 18 gprintInit (); // each thread needs to init the printing system 17 19 while (1) { … … 25 27 // one run of the task checker 26 28 SerialThreadLock (); 27 CheckJobs ();29 next_timeout = CheckJobs (); 28 30 SerialThreadUnlock (); 29 31 if (VerboseMode() == 2) fprintf (stderr, "J"); 30 // fprintf (stderr, "J"); 31 // usleep (10000); // allow other threads a chance to run 32 33 next_timeout = MIN (next_timeout, 0.1); 34 if (next_timeout > 0.001) { 35 usleep ((int)(1000000*next_timeout)); // allow other threads a chance to run 36 } 32 37 } 33 38 } -
trunk/Ohana/src/opihi/pantasks/task_threads.c
r12401 r12404 14 14 void *CheckTasksThread (void *data) { 15 15 16 float next_timeout; 17 16 18 gprintInit (); // each thread needs to init the printing system 17 19 while (1) { … … 25 27 // one run of the task checker 26 28 SerialThreadLock (); 27 CheckTasks ();29 next_timeout = CheckTasks (); 28 30 SerialThreadUnlock (); 29 31 if (VerboseMode() == 2) fprintf (stderr, "T"); 30 // fprintf (stderr, "T"); 31 usleep (1000); // allow other threads a chance to run 32 33 next_timeout = MIN (next_timeout, 0.1); 34 if (next_timeout > 0.001) { 35 usleep ((int)(1000000*next_timeout)); // allow other threads a chance to run 36 } 32 37 } 33 38 } 34 39 35 /* the threaded version of pantasks does not worry about limiting the 36 time spent in one of the test loops -- comms with the client are 37 in a separate thread anyway... 38 */ 39 40 int TestElapsedCheck () { 41 return (FALSE); 42 } 40 // I sleep for a small amount of time here, based on how long until the next expected 41 // timeout. this enforces a certain granularity in the task creation, but prevents the task 42 // thread from driving the load up to silly levels.
Note:
See TracChangeset
for help on using the changeset viewer.
