IPP Software Navigation Tools IPP Links Communication Pan-STARRS Links

Changeset 12404


Ignore:
Timestamp:
Mar 11, 2007, 10:56:27 AM (19 years ago)
Author:
eugene
Message:

adding intelligent timeouts to task and job threads; removing deprecated files

Location:
trunk/Ohana/src/opihi
Files:
5 deleted
7 edited

Legend:

Unmodified
Added
Removed
  • trunk/Ohana/src/opihi/include/pantasks.h

    r11898 r12404  
    196196void FreeJob (Job *job);
    197197
    198 int CheckJobs ();
    199 int CheckTasks ();
     198float CheckJobs ();
     199float CheckTasks ();
    200200int CheckSystem ();
    201201int CheckController ();
     
    225225int KillControllerJob (Job *job);
    226226int CheckControllerStatus ();
    227 int TestElapsedCheck ();
    228227void gotsignal (int signum);
    229228int client_shell (int argc, char **argv);
  • trunk/Ohana/src/opihi/lib.shell/multicommand.c

    r7940 r12404  
    33static int server = 0;
    44
     5// XXX this is rather pantasks-specific...
    56void multicommand_InitServer () {
    67
  • trunk/Ohana/src/opihi/pantasks/CheckController.c

    r7917 r12404  
    3737
    3838  p = buffer.buffer;
    39   for (i = 0; (i < Njobs) && !TestElapsedCheck(); i++) {
     39  for (i = 0; i < Njobs; i++) {
    4040    q = strchr (p, '\n');
    4141    if (q == NULL) {
     
    5656  if (VerboseMode()) gprint (GP_ERR, "clear %d exit jobs %f\n", i, TimerElapsed(TRUE));
    5757
    58   if (TestElapsedCheck()) goto finish;
    59   /* this will prevent us from ever checking crashed jobs... */
    60 
    6158  /*** check CRASH jobs ***/
    6259  FlushIOBuffer (&buffer);
     
    7370
    7471  p = buffer.buffer;
    75   for (i = 0; (i < Njobs) && !TestElapsedCheck(); i++) {
     72  for (i = 0; i < Njobs; i++) {
    7673    q = strchr (p, '\n');
    7774    if (q == NULL) {
  • trunk/Ohana/src/opihi/pantasks/CheckJobs.c

    r11898 r12404  
    11# include "pantasks.h"
    22
    3 int CheckJobs () {
     3float CheckJobs () {
    44
    55  FILE *f;
     
    1010  char varname[64];
    1111  Queue *queue;
     12  float time_running, next_timeout;
    1213
    1314  // int Ncheck;
    1415  // Ncheck = 0;
     16
     17  // actual maximum delay is controlled in job_threads.c
     18  next_timeout = 1.0;
    1519
    1620  /** test all jobs: ready to test?  finished? **/
     
    2125
    2226    /* check poll period (ready to ask for status?) */
    23     if (GetTaskTimer(job[0].last, FALSE) < task[0].poll_period) continue;
     27    time_running = GetTaskTimer(job[0].last, FALSE);
     28    // fprintf (stderr, "next: %f, poll: %f, run: %f\n", next_timeout, task[0].poll_period, time_running);
     29    if (time_running < task[0].poll_period) {
     30      next_timeout = MIN (next_timeout, task[0].poll_period - time_running);
     31      continue;
     32    }
     33    next_timeout = 0.0;
    2434
    2535    /* check current status */
     
    139149     */
    140150    if (job[0].mode == JOB_LOCAL) {
    141       if (GetTaskTimer(job[0].start, FALSE) < task[0].timeout_period) continue;
     151      if (GetTaskTimer(job[0].start, FALSE) < task[0].timeout_period) {
     152        /* reset polling clock */
     153        SetTaskTimer (&job[0].last);
     154        continue;
     155      }
    142156      if (VerboseMode()) gprint (GP_LOG, "timeout on %s\n", task[0].name);
    143157
     
    180194    /* reset polling clock */
    181195    SetTaskTimer (&job[0].last);
    182     if (TestElapsedCheck()) {
    183       // fprintf (stderr, "check %d jobs\n", Ncheck);
    184       return (TRUE);
    185     }
    186196  }
    187197  // fprintf (stderr, "check %d jobs\n", Ncheck);
    188   return (TRUE);
     198  return (next_timeout);
    189199}
    190200
  • trunk/Ohana/src/opihi/pantasks/CheckTasks.c

    r12332 r12404  
    11# include "pantasks.h"
    22
    3 int CheckTasks () {
     3float CheckTasks () {
    44
    55  Job *job;
    66  Task *task;
    77  int status;
     8  float time_running, next_timeout;
    89  // struct timeval now;
     10
     11  // actual maximum delay is controlled in job_threads.c
     12  next_timeout = 1.0;
    913
    1014  /** test all tasks: ready to test? ready to run? **/
    1115  while ((task = NextTask ()) != NULL) {
    1216
    13     if (!task[0].active) continue;
     17    /*** test for all reasons we should skip this task ***/
    1418
    15     /* ready to test? : check exec period */
    16     if (GetTaskTimer(task[0].last, FALSE) < task[0].exec_period) continue;
    17 
    18     /* need to check if the current time is within valid/invalid periods */
     19    /* task has been de-activated by the user */
     20    if (!task[0].active) {
     21      continue;
     22    }
     23    /* current time is not within valid/invalid periods */
    1924    if (!CheckTimeRanges (task[0].ranges, task[0].Nranges)) {
    20         gettimeofday (&task[0].last, (void *) NULL);
    21         continue;
     25      continue;
    2226    }
     27    /* all allowed tasks have been run in this period */
    2328    if (task[0].Nmax && (task[0].Njobs >= task[0].Nmax)) {
    24         gettimeofday (&task[0].last, (void *) NULL);
    25         continue;
     29      continue;
    2630    }
     31    /* too many outstanding jobs */
    2732    if (task[0].NpendingMax && (task[0].Npending >= task[0].NpendingMax)) {
    28         // fprintf (stderr, "npending: %d, max npending: %d\n", task[0].Npending, task[0].NpendingMax);
    29         gettimeofday (&task[0].last, (void *) NULL);
    30         continue;
     33      continue;
    3134    }
    3235
    33     // gettimeofday (&now, (void *) NULL);
    34     // fprintf (stderr, "t0: %d %6d  - \n", now.tv_sec, now.tv_usec);
     36    /* ready to test? : check time since last exec */
     37    time_running = GetTaskTimer(task[0].last, FALSE);
     38    if (time_running < task[0].exec_period) {
     39      // is we are to ready to run, set time to timeout, if shortest of all tasks
     40      next_timeout = MIN (next_timeout, task[0].exec_period - time_running);
     41      continue;
     42    }
     43
     44    /* ready to try running the task : reset the timer */
     45    next_timeout = 0.0;
     46    gettimeofday (&task[0].last, (void *) NULL);
     47    // XXX here is where I should add a fuzz factor (fraction of exec_period)
    3548
    3649    /* ready to run? : run task.exec macro */
     
    3851      status = exec_loop (task[0].exec);
    3952      if (!status) {
    40           gettimeofday (&task[0].last, (void *) NULL);
    41           continue;
     53        continue;
    4254      }
    4355    }
     
    4860    /* check if there are errors with this task */
    4961    if (!ValidateTask (task, TRUE)) {
    50         gettimeofday (&task[0].last, (void *) NULL);
    51         continue;
     62      continue;
    5263    }
    5364   
     
    6778    // task[0].last.tv_sec, task[0].last.tv_usec);
    6879
    69     /* reset timer on task (don't do this if Create/Submit fails) (why not??) */
    70     gettimeofday (&task[0].last, (void *) NULL);
     80    /* increment job counters */
    7181    task[0].Njobs ++;
    7282    task[0].Npending ++;
     
    7787    /* increment Nrun for inclusive ranges with Nmax */
    7888    BumpTimeRanges (task[0].ranges, task[0].Nranges);
    79 
    80     if (TestElapsedCheck()) return (TRUE);
    8189  }
    82   return (TRUE);
     90  return (next_timeout);
    8391}
  • trunk/Ohana/src/opihi/pantasks/job_threads.c

    r11898 r12404  
    1414void *CheckJobsThread (void *data) {
    1515
     16  float next_timeout;
     17
    1618  gprintInit ();  // each thread needs to init the printing system
    1719  while (1) {
     
    2527    // one run of the task checker
    2628    SerialThreadLock ();
    27     CheckJobs ();
     29    next_timeout = CheckJobs ();
    2830    SerialThreadUnlock ();
    2931    if (VerboseMode() == 2) fprintf (stderr, "J");
    30     // fprintf (stderr, "J");
    31     // usleep (10000); // allow other threads a chance to run
     32
     33    next_timeout = MIN (next_timeout, 0.1);
     34    if (next_timeout > 0.001) {
     35      usleep ((int)(1000000*next_timeout)); // allow other threads a chance to run
     36    }     
    3237  }
    3338}
  • trunk/Ohana/src/opihi/pantasks/task_threads.c

    r12401 r12404  
    1414void *CheckTasksThread (void *data) {
    1515
     16  float next_timeout;
     17
    1618  gprintInit ();  // each thread needs to init the printing system
    1719  while (1) {
     
    2527    // one run of the task checker
    2628    SerialThreadLock ();
    27     CheckTasks ();
     29    next_timeout = CheckTasks ();
    2830    SerialThreadUnlock ();
    2931    if (VerboseMode() == 2) fprintf (stderr, "T");
    30     // fprintf (stderr, "T");
    31     usleep (1000); // allow other threads a chance to run
     32
     33    next_timeout = MIN (next_timeout, 0.1);
     34    if (next_timeout > 0.001) {
     35      usleep ((int)(1000000*next_timeout)); // allow other threads a chance to run
     36    }     
    3237  }
    3338}
    3439
    35 /* the threaded version of pantasks does not worry about limiting the
    36    time spent in one of the test loops -- comms with the client are
    37    in a separate thread anyway...
    38 */
    39 
    40 int TestElapsedCheck () {
    41   return (FALSE);
    42 }
     40// I sleep for a small amount of time here, based on how long until the next expected
     41// timeout.  this enforces a certain granularity in the task creation, but prevents the task
     42// thread from driving the load up to silly levels.
Note: See TracChangeset for help on using the changeset viewer.