IPP Software Navigation Tools IPP Links Communication Pan-STARRS Links

Changeset 11898


Ignore:
Timestamp:
Feb 19, 2007, 3:18:27 PM (19 years ago)
Author:
eugene
Message:

removed usleep from job and task threads; increased controller timeout; added verbosity; fixed quit controller; added check points to pcontrol thread

Location:
trunk/Ohana/src/opihi
Files:
1 added
14 edited

Legend:

Unmodified
Added
Removed
  • trunk/Ohana/src/opihi/doc/pantasks.txt

    r8190 r11898  
     1
     2- task spawning speed
     3
     4  I have been examining things which affect the speed of the pantasks
     5  processing. I have learned some interesting things:
     6
     7  * pcontrol was being slammed with requests for status by pantasks.
     8    this may account for Paul's controller hang-ups.  I have added a
     9    long (500ms) sleep to the controller thread to limit the rate at
     10    which controller checks are run
     11
     12  * adding even a small usleep to the task_thread or job_thread puts
     13    them to sleep for a long time (>> 10ms).  it seems longer than the
     14    linux time slicer.  I have removed sleeps from the task and job
     15    threads.
     16
     17  * the job submit rate is apparently limited by two things:
     18
     19    * when the job is submitted (SubmitJob) the interaction with
     20      the controller seems to take ~30ms or more.
     21
     22    * some thread (controller thread? main readline thread?) seems to
     23      introduce timeouts which are very long (up to 100ms).  These
     24      introduce bit delays if when they happen during the task_thread
     25      loop.
    126
    227- updates for queues:
  • trunk/Ohana/src/opihi/include/pantasks.h

    r11388 r11898  
    174174Task *GetActiveTask ();
    175175void SetTaskTimer (struct timeval *timer);
    176 double GetTaskTimer (struct timeval start);
     176double GetTaskTimer (struct timeval start, int verbose);
    177177void InitTaskTimers ();
    178178int TaskHash (char *input);
  • trunk/Ohana/src/opihi/pantasks/CheckJobs.c

    r11542 r11898  
    1111  Queue *queue;
    1212
     13  // int Ncheck;
     14  // Ncheck = 0;
     15
    1316  /** test all jobs: ready to test?  finished? **/
    1417  while ((job = NextJob ()) != NULL) {
     18    // Ncheck ++;
    1519
    1620    task = job[0].task;
    1721
    1822    /* check poll period (ready to ask for status?) */
    19     if (GetTaskTimer(job[0].last) < task[0].poll_period) continue;
     23    if (GetTaskTimer(job[0].last, FALSE) < task[0].poll_period) continue;
    2024
    2125    /* check current status */
     
    135139     */
    136140    if (job[0].mode == JOB_LOCAL) {
    137       if (GetTaskTimer(job[0].start) < task[0].timeout_period) continue;
     141      if (GetTaskTimer(job[0].start, FALSE) < task[0].timeout_period) continue;
    138142      if (VerboseMode()) gprint (GP_LOG, "timeout on %s\n", task[0].name);
    139143
     
    176180    /* reset polling clock */
    177181    SetTaskTimer (&job[0].last);
    178     if (TestElapsedCheck()) return (TRUE);
     182    if (TestElapsedCheck()) {
     183      // fprintf (stderr, "check %d jobs\n", Ncheck);
     184      return (TRUE);
     185    }
    179186  }
     187  // fprintf (stderr, "check %d jobs\n", Ncheck);
    180188  return (TRUE);
    181189}
  • trunk/Ohana/src/opihi/pantasks/CheckTasks.c

    r11324 r11898  
    66  Task *task;
    77  int status;
     8  struct timeval now;
    89
    910  /** test all tasks: ready to test? ready to run? **/
     
    1314
    1415    /* ready to test? : check exec period */
    15     if (GetTaskTimer(task[0].last) < task[0].exec_period) continue;
     16    if (GetTaskTimer(task[0].last, FALSE) < task[0].exec_period) continue;
    1617
    1718    /* need to check if the current time is within valid/invalid periods */
     
    2526    }
    2627    if (task[0].NpendingMax && (task[0].Npending >= task[0].NpendingMax)) {
    27         fprintf (stderr, "npending: %d, max npending: %d\n", task[0].Npending, task[0].NpendingMax);
     28        // fprintf (stderr, "npending: %d, max npending: %d\n", task[0].Npending, task[0].NpendingMax);
    2829        gettimeofday (&task[0].last, (void *) NULL);
    2930        continue;
    3031    }
     32
     33    // gettimeofday (&now, (void *) NULL);
     34    // fprintf (stderr, "t0: %d %6d  - \n", now.tv_sec, now.tv_usec);
    3135
    3236    /* ready to run? : run task.exec macro */
     
    3943    }
    4044
     45    // gettimeofday (&now, (void *) NULL);
     46    // fprintf (stderr, "t1: %d %6d  - \n", now.tv_sec, now.tv_usec);
     47
    4148    /* check if there are errors with this task */
    4249    if (!ValidateTask (task, TRUE)) {
     
    4451        continue;
    4552    }
     53   
     54    // gettimeofday (&now, (void *) NULL);
     55    // fprintf (stderr, "t2: %d %6d  - \n", now.tv_sec, now.tv_usec);
    4656
    4757    /* construct job from task */
    4858    job = CreateJob (task);
    4959
     60    // gettimeofday (&now, (void *) NULL);
     61    // fprintf (stderr, "t3: %d %6d  - \n", now.tv_sec, now.tv_usec);
     62
    5063    /* execute job - XXX add status test */
    5164    SubmitJob (job);
     65
     66    // fprintf (stderr, "nl: %d %6d  - ",
     67    // task[0].last.tv_sec, task[0].last.tv_usec);
    5268
    5369    /* reset timer on task (don't do this if Create/Submit fails) (why not??) */
     
    5571    task[0].Njobs ++;
    5672    task[0].Npending ++;
     73
     74    // fprintf (stderr, "%d %6d\n",
     75    // task[0].last.tv_sec, task[0].last.tv_usec);
    5776
    5877    /* increment Nrun for inclusive ranges with Nmax */
  • trunk/Ohana/src/opihi/pantasks/ControllerOps.c

    r11446 r11898  
    352352  }
    353353 
     354  /* for commands which don't return a prompt, don't look for one */
     355  if (response == NULL) {
     356      return (TRUE);
     357  }
     358
    354359  /* watch for response - wait up to 1 second */
    355360  line = NULL;
     
    456461  sprintf (cmd, "quit");
    457462  InitIOBuffer (&buffer, 0x100);
    458   status = ControllerCommand (cmd, "", &buffer);
    459   FreeIOBuffer (&buffer);
    460 
    461   /* the quit command does not return a prompt, so we always
    462      get an error on the controller here */
     463  status = ControllerCommand (cmd, NULL, &buffer);
     464  FreeIOBuffer (&buffer);
     465
     466  /* the quit command does not return a prompt,
     467     check that the controller exited */
    463468  StopController ();
    464469  return (TRUE);
  • trunk/Ohana/src/opihi/pantasks/LocalJob.c

    r11055 r11898  
    9797        exit (1);
    9898      }
    99       job[0].dtime = GetTaskTimer (job[0].start);
     99      job[0].dtime = GetTaskTimer (job[0].start, FALSE);
    100100      break;
    101101  }
  • trunk/Ohana/src/opihi/pantasks/TaskOps.c

    r11324 r11898  
    547547/*** task timer functions ***/
    548548
    549 double GetTaskTimer (struct timeval start) {
     549double GetTaskTimer (struct timeval start, int verbose) {
    550550
    551551  double dtime;
     
    555555  dtime = DTIME (now, start);
    556556 
     557  if (verbose) {
     558      fprintf (stderr, "tt: %d %6d  - %d %6d : %f\n",
     559               now.tv_sec, now.tv_usec,
     560               start.tv_sec, start.tv_usec, dtime);
     561  }
     562
    557563  return (dtime);
    558564}
  • trunk/Ohana/src/opihi/pantasks/controller_threads.c

    r11084 r11898  
    2828    CheckControllerOutput ();
    2929    SerialThreadUnlock ();
    30     usleep (10000); // allow other threads a chance to run
     30    if (VerboseMode() == 2) fprintf (stderr, "C");
     31    // fprintf (stderr, "**** C ****");
     32    usleep (500000); // allow other threads a chance to run
    3133  }
    3234}
  • trunk/Ohana/src/opihi/pantasks/input_threads.c

    r11084 r11898  
    2727    CheckInputs ();
    2828    SerialThreadUnlock ();
    29     fprintf (stderr, "I");
    30     usleep (10000); // allow other threads a chance to run
     29    if (VerboseMode() == 2) fprintf (stderr, "I");
     30    // usleep (10000); // allow other threads a chance to run
    3131  }
    3232}
  • trunk/Ohana/src/opihi/pantasks/job_threads.c

    r11084 r11898  
    2727    CheckJobs ();
    2828    SerialThreadUnlock ();
    29     usleep (10000); // allow other threads a chance to run
     29    if (VerboseMode() == 2) fprintf (stderr, "J");
     30    // fprintf (stderr, "J");
     31    // usleep (10000); // allow other threads a chance to run
    3032  }
    3133}
  • trunk/Ohana/src/opihi/pantasks/task_threads.c

    r11084 r11898  
    2727    CheckTasks ();
    2828    SerialThreadUnlock ();
    29     usleep (10000); // allow other threads a chance to run
     29    if (VerboseMode() == 2) fprintf (stderr, "T");
     30    // fprintf (stderr, "T");
     31    // usleep (1000); // allow other threads a chance to run
    3032  }
    3133}
  • trunk/Ohana/src/opihi/pantasks/test/sleep.sh

    r11318 r11898  
    66
    77  periods      -poll 0.1
    8   periods      -exec 1.0
     8  periods      -exec 0.2
    99  periods      -timeout 20
    10   npending 2
     10  npending 5
    1111 
    1212  stdout tmp.txt
    1313  stderr tmp.txt
     14
     15  task.exec
     16    echo "create command"
     17  end
    1418
    1519  # success
  • trunk/Ohana/src/opihi/pantasks/verbose.c

    r7917 r11898  
    2323      return (TRUE);
    2424    }
     25    if (!strcasecmp (argv[1], "THREADS")) {
     26      VERBOSE = 2;
     27      return (TRUE);
     28    }
    2529    if (!strcasecmp (argv[1], "TOGGLE")) {
    2630      VERBOSE = ~VERBOSE;
     
    2933  }
    3034
    31   gprint (GP_ERR, "USAGE: verbose (on/off/toggle)\n");
     35  gprint (GP_ERR, "USAGE: verbose (on/off/threads/toggle)\n");
    3236  return (FALSE);
    3337}
  • trunk/Ohana/src/opihi/pcontrol/CheckSystem.c

    r10693 r11898  
    8888
    8989    if ((RunLevel == PCONTROL_RUN_ALL) || (RunLevel == PCONTROL_RUN_REAP)) {
    90       Njobchecks  += CheckBusyJobs(0.020);  /* get job status */
    91       Njobchecks  += CheckDoneJobs(0.020);  /* harvest job stdout/stderr */
    92       Njobchecks  += CheckKillJobs(0.020);  /* harvest job stdout/stderr */
     90      Njobchecks  += CheckBusyJobs(0.020);  /* get job status (PCLIENT) */
     91      TestCheckPoint ();
     92      Njobchecks  += CheckDoneJobs(0.020);  /* harvest job stdout/stderr (!PCLIENT) */
     93      TestCheckPoint ();
     94      Njobchecks  += CheckKillJobs(0.020);  /* harvest job stdout/stderr (PCLIENT) */
    9395      TestCheckPoint ();
    9496    }
     
    9698    if (RunLevel != PCONTROL_RUN_NONE) {
    9799      Nhostchecks += CheckDoneHosts(0.020); /* reset the host */
     100      TestCheckPoint ();
    98101      Nhostchecks += CheckDownHosts(0.100); /* launch the host */
    99102      TestCheckPoint ();
     
    102105    if (RunLevel == PCONTROL_RUN_ALL) {
    103106      // we want to give each block a maximum allowed time
    104       Nhostchecks += CheckIdleHosts(0.020); /* submit a new job */
     107      Nhostchecks += CheckIdleHosts(0.020); /* submit a new job (PCLIENT) */
    105108      TestCheckPoint ();
    106109    }
Note: See TracChangeset for help on using the changeset viewer.