worker_next: ensure that work_available is passed on before sleeping (was occasionally lost when worker configuration changed, causing scheduler deadlock);
worker_start: back to non-self version;
scheduler: status output based on ticks;
1.1 --- a/src/Pure/Concurrent/future.ML Tue Nov 03 19:52:09 2009 +0100
1.2 +++ b/src/Pure/Concurrent/future.ML Wed Nov 04 00:29:58 2009 +0100
1.3 @@ -205,58 +205,63 @@
1.4 val _ = active := true;
1.5 in () end;
1.6
1.7 -fun worker_next () = (*requires SYNCHRONIZED*)
1.8 +fun worker_next has_work = (*requires SYNCHRONIZED*)
1.9 if length (! workers) > ! max_workers then
1.10 (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
1.11 broadcast scheduler_event;
1.12 + if has_work then signal work_available else ();
1.13 NONE)
1.14 else if count_active () > ! max_active then
1.15 - (worker_wait scheduler_event; worker_next ())
1.16 + (if has_work then signal work_available else ();
1.17 + worker_wait scheduler_event;
1.18 + worker_next false)
1.19 else
1.20 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
1.21 - NONE => (worker_wait work_available; worker_next ())
1.22 + NONE => (worker_wait work_available; worker_next true)
1.23 | some => some);
1.24
1.25 fun worker_loop name =
1.26 - (case SYNCHRONIZED name (fn () => worker_next ()) of
1.27 + (case SYNCHRONIZED name (fn () => worker_next false) of
1.28 NONE => ()
1.29 | SOME work => (execute name work; worker_loop name));
1.30
1.31 -fun worker_start name =
1.32 - SimpleThread.fork false (fn () =>
1.33 - (SYNCHRONIZED name (fn () =>
1.34 - Unsynchronized.change workers (cons (Thread.self (), Unsynchronized.ref true)));
1.35 - broadcast scheduler_event;
1.36 - worker_loop name));
1.37 +fun worker_start name = (*requires SYNCHRONIZED*)
1.38 + Unsynchronized.change workers (cons (SimpleThread.fork false (fn () => worker_loop name),
1.39 + Unsynchronized.ref true));
1.40
1.41
1.42 (* scheduler *)
1.43
1.44 -val last_status = Unsynchronized.ref Time.zeroTime;
1.45 -val next_status = Time.fromMilliseconds 500;
1.46 +val status_ticks = Unsynchronized.ref 0;
1.47 +
1.48 +val last_round = Unsynchronized.ref Time.zeroTime;
1.49 val next_round = Time.fromMilliseconds 50;
1.50
1.51 fun scheduler_next () = (*requires SYNCHRONIZED*)
1.52 let
1.53 + val now = Time.now ();
1.54 + val tick = Time.<= (Time.+ (! last_round, next_round), now);
1.55 + val _ = if tick then last_round := now else ();
1.56 +
1.57 (*queue and worker status*)
1.58 val _ =
1.59 - let val now = Time.now () in
1.60 - if Time.> (Time.+ (! last_status, next_status), now) then ()
1.61 - else
1.62 - (last_status := now; Multithreading.tracing 1 (fn () =>
1.63 - let
1.64 - val {ready, pending, running} = Task_Queue.status (! queue);
1.65 - val total = length (! workers);
1.66 - val active = count_active ();
1.67 - in
1.68 - "SCHEDULE " ^ Time.toString now ^ ": " ^
1.69 - string_of_int ready ^ " ready, " ^
1.70 - string_of_int pending ^ " pending, " ^
1.71 - string_of_int running ^ " running; " ^
1.72 - string_of_int total ^ " workers, " ^
1.73 - string_of_int active ^ " active"
1.74 - end))
1.75 - end;
1.76 + if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
1.77 + val _ =
1.78 + if tick andalso ! status_ticks = 0 then
1.79 + Multithreading.tracing 1 (fn () =>
1.80 + let
1.81 + val {ready, pending, running} = Task_Queue.status (! queue);
1.82 + val total = length (! workers);
1.83 + val active = count_active ();
1.84 + in
1.85 + "SCHEDULE " ^ Time.toString now ^ ": " ^
1.86 + string_of_int ready ^ " ready, " ^
1.87 + string_of_int pending ^ " pending, " ^
1.88 + string_of_int running ^ " running; " ^
1.89 + string_of_int total ^ " workers, " ^
1.90 + string_of_int active ^ " active "
1.91 + end)
1.92 + else ();
1.93
1.94 (*worker threads*)
1.95 val _ =
1.96 @@ -274,11 +279,12 @@
1.97 val mm = if m = 9999 then 1 else m * 2;
1.98 val _ = max_workers := mm;
1.99
1.100 - val l = length (! workers);
1.101 + val missing = ! max_workers - length (! workers);
1.102 val _ =
1.103 - if mm > l then
1.104 - funpow (mm - l) (fn () =>
1.105 - ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
1.106 + if missing > 0 then
1.107 + (funpow missing (fn () =>
1.108 + ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ();
1.109 + broadcast scheduler_event)
1.110 else ();
1.111
1.112 (*canceled groups*)