worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
authorwenzelm
Wed, 04 Nov 2009 00:29:58 +0100
changeset 334281427333220bc
parent 33427 1ddcb8472bd2
child 33429 a69ddd7dce95
worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
worker_start: back to non-self version;
scheduler: status output based on ticks;
src/Pure/Concurrent/future.ML
     1.1 --- a/src/Pure/Concurrent/future.ML	Tue Nov 03 19:52:09 2009 +0100
     1.2 +++ b/src/Pure/Concurrent/future.ML	Wed Nov 04 00:29:58 2009 +0100
     1.3 @@ -205,58 +205,63 @@
     1.4      val _ = active := true;
     1.5    in () end;
     1.6  
     1.7 -fun worker_next () = (*requires SYNCHRONIZED*)
     1.8 +fun worker_next has_work = (*requires SYNCHRONIZED*)
     1.9    if length (! workers) > ! max_workers then
    1.10      (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
    1.11       broadcast scheduler_event;
    1.12 +     if has_work then signal work_available else ();
    1.13       NONE)
    1.14    else if count_active () > ! max_active then
    1.15 -    (worker_wait scheduler_event; worker_next ())
    1.16 +    (if has_work then signal work_available else ();
    1.17 +     worker_wait scheduler_event;
    1.18 +     worker_next false)
    1.19    else
    1.20      (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
    1.21 -      NONE => (worker_wait work_available; worker_next ())
    1.22 +      NONE => (worker_wait work_available; worker_next true)
    1.23      | some => some);
    1.24  
    1.25  fun worker_loop name =
    1.26 -  (case SYNCHRONIZED name (fn () => worker_next ()) of
    1.27 +  (case SYNCHRONIZED name (fn () => worker_next false) of
    1.28      NONE => ()
    1.29    | SOME work => (execute name work; worker_loop name));
    1.30  
    1.31 -fun worker_start name =
    1.32 -  SimpleThread.fork false (fn () =>
    1.33 -   (SYNCHRONIZED name (fn () =>
    1.34 -      Unsynchronized.change workers (cons (Thread.self (), Unsynchronized.ref true)));
    1.35 -    broadcast scheduler_event;
    1.36 -    worker_loop name));
    1.37 +fun worker_start name = (*requires SYNCHRONIZED*)
    1.38 +  Unsynchronized.change workers (cons (SimpleThread.fork false (fn () => worker_loop name),
    1.39 +    Unsynchronized.ref true));
    1.40  
    1.41  
    1.42  (* scheduler *)
    1.43  
    1.44 -val last_status = Unsynchronized.ref Time.zeroTime;
    1.45 -val next_status = Time.fromMilliseconds 500;
    1.46 +val status_ticks = Unsynchronized.ref 0;
    1.47 +
    1.48 +val last_round = Unsynchronized.ref Time.zeroTime;
    1.49  val next_round = Time.fromMilliseconds 50;
    1.50  
    1.51  fun scheduler_next () = (*requires SYNCHRONIZED*)
    1.52    let
    1.53 +    val now = Time.now ();
    1.54 +    val tick = Time.<= (Time.+ (! last_round, next_round), now);
    1.55 +    val _ = if tick then last_round := now else ();
    1.56 +
    1.57      (*queue and worker status*)
    1.58      val _ =
    1.59 -      let val now = Time.now () in
    1.60 -        if Time.> (Time.+ (! last_status, next_status), now) then ()
    1.61 -        else
    1.62 -         (last_status := now; Multithreading.tracing 1 (fn () =>
    1.63 -            let
    1.64 -              val {ready, pending, running} = Task_Queue.status (! queue);
    1.65 -              val total = length (! workers);
    1.66 -              val active = count_active ();
    1.67 -            in
    1.68 -              "SCHEDULE " ^ Time.toString now ^ ": " ^
    1.69 -                string_of_int ready ^ " ready, " ^
    1.70 -                string_of_int pending ^ " pending, " ^
    1.71 -                string_of_int running ^ " running; " ^
    1.72 -                string_of_int total ^ " workers, " ^
    1.73 -                string_of_int active ^ " active"
    1.74 -            end))
    1.75 -      end;
    1.76 +      if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
    1.77 +    val _ =
    1.78 +      if tick andalso ! status_ticks = 0 then
    1.79 +        Multithreading.tracing 1 (fn () =>
    1.80 +          let
    1.81 +            val {ready, pending, running} = Task_Queue.status (! queue);
    1.82 +            val total = length (! workers);
    1.83 +            val active = count_active ();
    1.84 +          in
    1.85 +            "SCHEDULE " ^ Time.toString now ^ ": " ^
    1.86 +              string_of_int ready ^ " ready, " ^
    1.87 +              string_of_int pending ^ " pending, " ^
    1.88 +              string_of_int running ^ " running; " ^
    1.89 +              string_of_int total ^ " workers, " ^
    1.90 +              string_of_int active ^ " active "
    1.91 +          end)
    1.92 +      else ();
    1.93  
    1.94      (*worker threads*)
    1.95      val _ =
    1.96 @@ -274,11 +279,12 @@
    1.97      val mm = if m = 9999 then 1 else m * 2;
    1.98      val _ = max_workers := mm;
    1.99  
   1.100 -    val l = length (! workers);
   1.101 +    val missing = ! max_workers - length (! workers);
   1.102      val _ =
   1.103 -      if mm > l then
   1.104 -        funpow (mm - l) (fn () =>
   1.105 -          ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
   1.106 +      if missing > 0 then
   1.107 +       (funpow missing (fn () =>
   1.108 +          ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ();
   1.109 +        broadcast scheduler_event)
   1.110        else ();
   1.111  
   1.112      (*canceled groups*)