IPP Software Navigation Tools IPP Links Communication Pan-STARRS Links

Changeset 11055


Ignore:
Timestamp:
Jan 11, 2007, 4:51:41 PM (19 years ago)
Author:
eugene
Message:

added timing stats, npending limit

Location:
trunk/Ohana/src/opihi
Files:
10 edited

Legend:

Unmodified
Added
Removed
  • trunk/Ohana/src/opihi/include/pantasks.h

    r10694 r11055  
    2828
    2929enum {RANGE_ABS, RANGE_DAY, RANGE_WEEK};
     30enum {TIMER_ALLJOBS, TIMER_SUCCESS, TIMER_FAILURE};
    3031
    3132enum {TASK_NONE,
     
    4243      TASK_OPTIONS,
    4344      TASK_PERIODS,
     45      TASK_NPENDING,
    4446      TASK_EXIT,
    4547      TASK_EXEC
     
    8688  int     Njobs;
    8789
     90  int     Npending;  // number of currently pending jobs
     91  int     NpendingMax;  // max number of pending jobs allowed
     92
    8893  float   poll_period;
    8994  float   exec_period;
     
    95100  int Nfailure;
    96101  int Ntimeout;
     102
     103  double dtimeAve_alljobs, dtimeMin_alljobs, dtimeMax_alljobs;
     104  double dtimeAve_success, dtimeMin_success, dtimeMax_success;
     105  double dtimeAve_failure, dtimeMin_failure, dtimeMax_failure;
    97106
    98107  int active;
     
    140149  int         stdout_fd;                /* stdout pipe (local only) */
    141150  int         stderr_fd;                /* stderr pipe (local only) */
     151
     152  double dtime;
    142153} Job;
    143154
     
    167178int RemoveTask (Task *task);
    168179Task *SetNewTask (Task *task);
     180void ListTaskStats ();
     181void UpdateTaskTimerStats (Task *task, int mode, double dtime);
    169182
    170183int NextJobID ();
  • trunk/Ohana/src/opihi/include/shell.h

    r10846 r11055  
    102102void          ListMacros                PROTO(());
    103103void          FreeMacro                 PROTO((Macro *macro));
     104CommandF     *find_macro_command        PROTO((char *name));
    104105
    105106int           exec_loop                 PROTO((Macro *loop));
  • trunk/Ohana/src/opihi/pantasks/CheckJobs.c

    r9278 r11055  
    55  FILE *f;
    66  Job *job;
     7  Task *task;
    78  Macro *macro;
    89  int i, status;
     
    1314  while ((job = NextJob ()) != NULL) {
    1415
     16    task = job[0].task;
     17
    1518    /* check poll period (ready to ask for status?) */
    16     if (GetTaskTimer(job[0].last) < job[0].task[0].poll_period) continue;
     19    if (GetTaskTimer(job[0].last) < task[0].poll_period) continue;
    1720
    1821    /* check current status */
     
    2023    switch (status) {
    2124      case JOB_PENDING:
    22         /* if (VerboseMode()) gprint (GP_LOG, "job %s (%d) pending\n", job[0].task[0].name, job[0].JobID); */
     25        /* if (VerboseMode()) gprint (GP_LOG, "job %s (%d) pending\n", task[0].name, job[0].JobID); */
    2326        break;
    2427
    2528      case JOB_BUSY:
    26         /* if (VerboseMode()) gprint (GP_LOG, "job %s (%d) busy\n", job[0].task[0].name, job[0].JobID); */
     29        /* if (VerboseMode()) gprint (GP_LOG, "job %s (%d) busy\n", task[0].name, job[0].JobID); */
    2730        break;
    2831
     
    3538
    3639        /* save the stdout and stderr if desired */
    37         if (job[0].stdout_dump != NULL) {
     40        if ((job[0].stdout_dump != NULL) && strcasecmp(job[0].stdout_dump, "NULL")) {
    3841          f = fopen (job[0].stdout_dump, "a");
    3942          if (f == NULL) {
     
    4447          }
    4548        }
    46         if (job[0].stderr_dump != NULL) {
     49        if ((job[0].stderr_dump != NULL) && strcasecmp(job[0].stderr_dump, "NULL")) {
    4750          f = fopen (job[0].stderr_dump, "a");
    4851          if (f == NULL) {
     
    6871        set_int_variable ("options:n", job[0].optc);
    6972
     73        set_variable ("JOB_DTIME", job[0].dtime);
     74
    7075        if (status == JOB_CRASH) {
    7176          /* XXX add an Ncrash element? */
    72           job[0].task[0].Nfailure ++;
     77          task[0].Nfailure ++;
     78          UpdateTaskTimerStats (task, TIMER_FAILURE, job[0].dtime);
    7379
    7480          /* run task[0].crash macro, if it exists */
    7581          /* perhaps define PushNamedQueueBuffer */
    7682
    77           if (VerboseMode()) gprint (GP_LOG, "job %s (%d) crash\n", job[0].task[0].name, job[0].JobID);
    78           if (job[0].task[0].crash != NULL) {
    79             exec_loop (job[0].task[0].crash);
     83          set_str_variable ("JOB_STATUS", "CRASH");
     84
     85          if (VerboseMode()) gprint (GP_LOG, "job %s (%d) crash\n", task[0].name, job[0].JobID);
     86          if (task[0].crash != NULL) {
     87            exec_loop (task[0].crash);
    8088          }
    8189        }
     
    8391          /* update the exit status counters */
    8492          if (job[0].exit_status) {
    85             job[0].task[0].Nfailure ++;
     93            task[0].Nfailure ++;
     94            UpdateTaskTimerStats (task, TIMER_FAILURE, job[0].dtime);
    8695          } else {
    87             job[0].task[0].Nsuccess ++;
    88           }
     96            task[0].Nsuccess ++;
     97            UpdateTaskTimerStats (task, TIMER_SUCCESS, job[0].dtime);
     98          }
     99
     100          set_int_variable ("JOB_STATUS", job[0].exit_status);
    89101
    90102          /* run corresponding task[0].exit macro, if it exists */
    91           if (VerboseMode()) gprint (GP_LOG, "job %s (%d) exit\n", job[0].task[0].name, job[0].JobID);
    92           macro = job[0].task[0].defexit;
    93           for (i = 0; i < job[0].task[0].Nexit; i++) {
    94             if (job[0].exit_status == atoi(job[0].task[0].exit[i][0].name)) {
    95               macro = job[0].task[0].exit[i];
     103          if (VerboseMode()) gprint (GP_LOG, "job %s (%d) exit\n", task[0].name, job[0].JobID);
     104          macro = task[0].defexit;
     105          for (i = 0; i < task[0].Nexit; i++) {
     106            if (job[0].exit_status == atoi(task[0].exit[i][0].name)) {
     107              macro = task[0].exit[i];
    96108              break;
    97109            }
     
    106118        if (queue) InitQueue (queue);
    107119
     120        UpdateTaskTimerStats (task, TIMER_ALLJOBS, job[0].dtime);
     121
     122        task[0].Npending --;
    108123        DeleteJob (job);
    109124        continue;
     
    120135     */
    121136    if (job[0].mode == JOB_LOCAL) {
    122       if (GetTaskTimer(job[0].start) < job[0].task[0].timeout_period) continue;
    123       if (VerboseMode()) gprint (GP_LOG, "timeout on %s\n", job[0].task[0].name);
     137      if (GetTaskTimer(job[0].start) < task[0].timeout_period) continue;
     138      if (VerboseMode()) gprint (GP_LOG, "timeout on %s\n", task[0].name);
    124139
    125140      // XXX harvest STDERR and STDOUT from timeout job (should be available...)
     
    127142
    128143      /* update the timeout counter */
    129       job[0].task[0].Ntimeout ++;
     144      task[0].Ntimeout ++;
    130145
    131146      if (!KillLocalJob (job)) {
     
    150165
    151166      /* run task[0].timeout macro, if it exists */
    152       if (job[0].task[0].timeout != NULL) {
    153         exec_loop (job[0].task[0].timeout);
     167      if (task[0].timeout != NULL) {
     168        exec_loop (task[0].timeout);
    154169      }
    155170      DeleteJob (job);
  • trunk/Ohana/src/opihi/pantasks/CheckTasks.c

    r10694 r11055  
    1818    if (!CheckTimeRanges (task[0].ranges, task[0].Nranges)) continue;
    1919    if (task[0].Nmax && (task[0].Njobs >= task[0].Nmax)) continue;
     20    if (task[0].NpendingMax && (task[0].Npending >= task[0].NpendingMax)) continue;
    2021
    2122    /* ready to run? : run task.exec macro */
     
    3536    gettimeofday (&task[0].last, (void *) NULL);
    3637    task[0].Njobs ++;
     38    task[0].Npending ++;
    3739
    3840    /* increment Nrun for inclusive ranges with Nmax */
  • trunk/Ohana/src/opihi/pantasks/ControllerOps.c

    r10654 r11055  
    9595  p = memstr (buffer.buffer, "STDERR", buffer.Nbuffer);
    9696  sscanf (p, "%*s %d", &job[0].stderr_size);
     97  p = memstr (buffer.buffer, "DTIME",  buffer.Nbuffer);
     98  sscanf (p, "%*s %lf", &job[0].dtime);
    9799  FreeIOBuffer (&buffer);
    98100
  • trunk/Ohana/src/opihi/pantasks/LocalJob.c

    r10647 r11055  
    9797        exit (1);
    9898      }
     99      job[0].dtime = GetTaskTimer (job[0].start);
     100      break;
    99101  }
    100102  return (FALSE);
  • trunk/Ohana/src/opihi/pantasks/TaskOps.c

    r10733 r11055  
    5252void ListTasks (int verbose) {
    5353
    54   int i, j, valid;
     54  int i, j, valid, nameLength, cmdLength;
    5555  char *start, *stop;
     56  char format[128];
    5657
    5758  gprint (GP_LOG, "\n");
     
    6162  }
    6263
     64  /* find string lengths */
     65  nameLength = cmdLength = 0;
     66  for (i = 0; i < Ntasks; i++) {
     67    nameLength = MAX (nameLength, strlen(tasks[i][0].name));
     68    if (tasks[i][0].argv == NULL) {
     69      cmdLength = MAX (nameLength, strlen("(dynamic)"));
     70    } else {
     71      cmdLength = MAX (nameLength, strlen(tasks[i][0].argv[0]));
     72    }
     73  }
     74
    6375  gprint (GP_LOG, " Task Status\n");
    64   gprint (GP_LOG, "  * Name            Njobs  Command\n");
     76
     77  snprintf (format, 128, "  AV %%-%ds %5s %%-%ds\n", nameLength, "Njobs", cmdLength);
     78  gprint (GP_LOG, format, "Name", "Command");
     79
     80  snprintf (format, 128, "%%-%ds %%5d %%-%ds\n", nameLength, cmdLength);
    6581  for (i = 0; i < Ntasks; i++) {
    6682    valid = CheckTimeRanges (tasks[i][0].ranges, tasks[i][0].Nranges);
     
    7793    }
    7894    if (tasks[i][0].argv == NULL) {
    79       gprint (GP_LOG, "%-15s %5d  %-20s\n", tasks[i][0].name, tasks[i][0].Njobs, "(dynamic)");
    80     } else {
    81       gprint (GP_LOG, "%-15s %5d  %-20s\n", tasks[i][0].name, tasks[i][0].Njobs, tasks[i][0].argv[0]);
     95      gprint (GP_LOG, format, tasks[i][0].name, tasks[i][0].Njobs, "(dynamic)");
     96    } else {
     97      gprint (GP_LOG, format, tasks[i][0].name, tasks[i][0].Njobs, tasks[i][0].argv[0]);
    8298    }
    8399    if (verbose) {
     
    129145}
    130146
     147/* list known tasks */
     148void ListTaskStats () {
     149
     150  int i, j, valid, nameLength;
     151  char *start, *stop;
     152  char format[128];
     153
     154  gprint (GP_LOG, "\n");
     155  if (Ntasks == 0) {
     156    gprint (GP_LOG, " no defined tasks\n");
     157    return;
     158  }
     159
     160  /* find string lengths */
     161  nameLength = 0;
     162  for (i = 0; i < Ntasks; i++) {
     163    nameLength = MAX (nameLength, strlen(tasks[i][0].name));
     164  }
     165
     166  gprint (GP_LOG, " Task Statistics\n");
     167
     168  snprintf (format, 128, "     %%-%ds |           alljobs          |           success          |           failure          |\n", nameLength);
     169  gprint (GP_LOG, format, "");
     170  snprintf (format, 128, "  AV %%-%ds | Njobs   Tmin   Tave   Tmax | Njobs   Tmin   Tave   Tmax | Njobs   Tmin   Tave   Tmax |\n", nameLength);
     171  gprint (GP_LOG, format, "Name");
     172
     173  snprintf (format, 128, "%%-%ds", nameLength);
     174  for (i = 0; i < Ntasks; i++) {
     175    valid = CheckTimeRanges (tasks[i][0].ranges, tasks[i][0].Nranges);
     176    if (tasks[i][0].active) {
     177      gprint (GP_LOG, "  +");
     178    } else {
     179      gprint (GP_LOG, "  -");
     180    }
     181    if (valid) {
     182      gprint (GP_LOG, "+ ");
     183    } else {
     184      gprint (GP_LOG, "- ");
     185    }
     186    if (tasks[i][0].argv == NULL) {
     187      gprint (GP_LOG, format, tasks[i][0].name);
     188    } else {
     189      gprint (GP_LOG, format, tasks[i][0].name);
     190    }
     191    if (tasks[i][0].dtimeMin_alljobs < 0) {
     192      gprint (GP_LOG, " | %5d %6s %6.2f %6.2f",     tasks[i][0].Njobs,    "NONE",                       tasks[i][0].dtimeAve_alljobs, tasks[i][0].dtimeMax_alljobs);
     193    } else {
     194      gprint (GP_LOG, " | %5d %6.2f %6.2f %6.2f",   tasks[i][0].Njobs,    tasks[i][0].dtimeMin_alljobs, tasks[i][0].dtimeAve_alljobs, tasks[i][0].dtimeMax_alljobs);
     195    }     
     196    if (tasks[i][0].dtimeMin_success < 0) {
     197      gprint (GP_LOG, " | %5d %6s %6.2f %6.2f",     tasks[i][0].Nsuccess, "NONE",                       tasks[i][0].dtimeAve_success, tasks[i][0].dtimeMax_success);
     198    } else {
     199      gprint (GP_LOG, " | %5d %6.2f %6.2f %6.2f",   tasks[i][0].Nsuccess, tasks[i][0].dtimeMin_success, tasks[i][0].dtimeAve_success, tasks[i][0].dtimeMax_success);
     200    }
     201    if (tasks[i][0].dtimeMin_failure < 0) {
     202      gprint (GP_LOG, " | %5d %6s %6.2f %6.2f |\n",   tasks[i][0].Nfailure, "NONE",                       tasks[i][0].dtimeAve_failure, tasks[i][0].dtimeMax_failure);
     203    } else {
     204      gprint (GP_LOG, " | %5d %6.2f %6.2f %6.2f |\n", tasks[i][0].Nfailure, tasks[i][0].dtimeMin_failure, tasks[i][0].dtimeAve_failure, tasks[i][0].dtimeMax_failure);
     205    }
     206  }
     207  return;
     208}
     209
    131210/* show details of a task */
    132211int ShowTask (char *name) {
     
    311390  NewTask[0].Nmax = 0;  /* default value means 'no limit' */
    312391
     392  NewTask[0].NpendingMax = 0;  /* default value means 'no limit' */
     393
    313394  NewTask[0].Njobs = 0;
    314395  NewTask[0].Nsuccess = 0;
    315396  NewTask[0].Nfailure = 0;
    316397  NewTask[0].Ntimeout = 0;
     398
     399  /* jobs timing statistics */
     400  NewTask[0].dtimeAve_alljobs =  0.0;
     401  NewTask[0].dtimeMin_alljobs = -1.0;
     402  NewTask[0].dtimeMax_alljobs =  0.0;
     403
     404  NewTask[0].dtimeAve_success =  0.0;
     405  NewTask[0].dtimeMin_success = -1.0;
     406  NewTask[0].dtimeMax_success =  0.0;
     407
     408  NewTask[0].dtimeAve_failure =  0.0;
     409  NewTask[0].dtimeMin_failure = -1.0;
     410  NewTask[0].dtimeMax_failure =  0.0;
    317411
    318412  NewTask[0].active = TRUE;
     
    433527  if (!strcasecmp (command, "OPTIONS"))   hash = TASK_OPTIONS;
    434528  if (!strcasecmp (command, "PERIODS"))   hash = TASK_PERIODS;
     529  if (!strcasecmp (command, "NPENDING"))  hash = TASK_NPENDING;
    435530  if (!strcasecmp (command, "TASK.EXIT")) hash = TASK_EXIT;
    436531  if (!strcasecmp (command, "TASK.EXEC")) hash = TASK_EXEC;
     
    461556
    462557  Task *task;
     558  double isec, fsec;
    463559
    464560  while ((task = NextTask ()) != NULL) {
    465561    gettimeofday (&task[0].last, (void *) NULL);
     562    fsec = modf (task[0].exec_period, &isec);
     563    task[0].last.tv_sec -= isec;
     564    task[0].last.tv_usec -= 1e6*fsec;
    466565 }
    467566}
     567
     568/* must call this after updating the corresponding counter */
     569void UpdateTaskTimerStats (Task *task, int mode, double dtime) {
     570
     571  double total;
     572
     573  switch (mode) {
     574    case TIMER_ALLJOBS:
     575      total = task[0].dtimeAve_alljobs * (task[0].Njobs - 1);
     576      total += dtime;
     577      task[0].dtimeAve_alljobs = total / (float) task[0].Njobs;
     578      if (task[0].dtimeMin_alljobs < 0) {
     579        task[0].dtimeMin_alljobs = dtime;
     580      } else {
     581        task[0].dtimeMin_alljobs = MIN (task[0].dtimeMin_alljobs, dtime);
     582      }
     583      task[0].dtimeMax_alljobs = MAX (task[0].dtimeMax_alljobs, dtime);
     584      break;
     585    case TIMER_SUCCESS:
     586      total = task[0].dtimeAve_success * (task[0].Nsuccess - 1);
     587      total += dtime;
     588      task[0].dtimeAve_success = total / (float) task[0].Nsuccess;
     589      if (task[0].dtimeMin_success < 0) {
     590        task[0].dtimeMin_success = dtime;
     591      } else {
     592        task[0].dtimeMin_success = MIN (task[0].dtimeMin_success, dtime);
     593      }
     594      task[0].dtimeMax_success = MAX (task[0].dtimeMax_success, dtime);
     595      break;
     596    case TIMER_FAILURE:
     597      total = task[0].dtimeAve_failure * (task[0].Nfailure - 1);
     598      total += dtime;
     599      task[0].dtimeAve_failure = total / (float) task[0].Nfailure;
     600      if (task[0].dtimeMin_failure < 0) {
     601        task[0].dtimeMin_failure = dtime;
     602      } else {
     603        task[0].dtimeMin_failure = MIN (task[0].dtimeMin_failure, dtime);
     604      }
     605      task[0].dtimeMax_failure = MAX (task[0].dtimeMax_failure, dtime);
     606      break;
     607    default:
     608      abort();
     609  }
     610}
  • trunk/Ohana/src/opihi/pantasks/ipptool2book.c

    r10997 r11055  
    2222  Nkeys = 0;
    2323  keys = NULL;
    24   if ((N = get_argument (argc, argv, "-id"))) {
     24  if ((N = get_argument (argc, argv, "-key"))) {
    2525    remove_argument (N, &argc, argv);
    2626
     
    4141
    4242  if (argc != 3) {
    43     gprint (GP_ERR, "USAGE: ipptool2book (queue) (book) [-uniq] [-id key[:key..]]\n");
     43    gprint (GP_ERR, "USAGE: ipptool2book (queue) (book) [-uniq] [-key key[:key..]]\n");
    4444    FREEKEYS;
    4545    return (FALSE);
  • trunk/Ohana/src/opihi/pantasks/task.c

    r10694 r11055  
    1818    remove_argument (N, &argc, argv);
    1919    ListTasks (TRUE);
     20    return (TRUE);
     21  }
     22
     23  if ((N = get_argument (argc, argv, "-stats"))) {
     24    remove_argument (N, &argc, argv);
     25    ListTaskStats ();
    2026    return (TRUE);
    2127  }
     
    96102      case TASK_OPTIONS:
    97103      case TASK_PERIODS:
     104      case TASK_NPENDING:
    98105      case TASK_ACTIVE:
    99106        status = command (input, &outline, TRUE);
  • trunk/Ohana/src/opihi/pantasks/task_nmax.c

    r7917 r11055  
    2020  return (FALSE);
    2121}
     22
     23int task_npending (int argc, char **argv) {
     24
     25  Task *task;
     26
     27  if (argc != 2) goto usage;
     28
     29  task = GetNewTask ();
     30  if (task == NULL) {
     31    gprint (GP_ERR, "ERROR: not defining or running a task\n");
     32    return (FALSE);
     33  }
     34
     35  task[0].NpendingMax = atoi (argv[1]);
     36  return (TRUE);
     37
     38usage:
     39  gprint (GP_ERR, "USAGE: npending N\n");
     40  return (FALSE);
     41}
Note: See TracChangeset for help on using the changeset viewer.