1.1 --- a/src/Pure/Concurrent/future.ML Wed Nov 04 11:37:06 2009 +0100
1.2 +++ b/src/Pure/Concurrent/future.ML Wed Nov 04 11:58:29 2009 +0100
1.3 @@ -99,18 +99,6 @@
1.4
1.5 (** scheduling **)
1.6
1.7 -(* global state *)
1.8 -
1.9 -val queue = Unsynchronized.ref Task_Queue.empty;
1.10 -val next = Unsynchronized.ref 0;
1.11 -val max_workers = Unsynchronized.ref 0;
1.12 -val max_active = Unsynchronized.ref 0;
1.13 -val workers = Unsynchronized.ref ([]: (Thread.thread * bool Unsynchronized.ref) list);
1.14 -val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
1.15 -val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
1.16 -val do_shutdown = Unsynchronized.ref false;
1.17 -
1.18 -
1.19 (* synchronization *)
1.20
1.21 val scheduler_event = ConditionVar.conditionVar ();
1.22 @@ -142,6 +130,23 @@
1.23 end;
1.24
1.25
1.26 +(* global state *)
1.27 +
1.28 +val queue = Unsynchronized.ref Task_Queue.empty;
1.29 +val next = Unsynchronized.ref 0;
1.30 +val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
1.31 +val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
1.32 +val do_shutdown = Unsynchronized.ref false;
1.33 +val max_workers = Unsynchronized.ref 0;
1.34 +val max_active = Unsynchronized.ref 0;
1.35 +
1.36 +datatype worker_state = Working | Waiting | Sleeping;
1.37 +val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
1.38 +
1.39 +fun count_workers state = (*requires SYNCHRONIZED*)
1.40 + fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
1.41 +
1.42 +
1.43 (* execute future jobs *)
1.44
1.45 fun future_job group (e: unit -> 'a) =
1.46 @@ -184,25 +189,17 @@
1.47 in () end;
1.48
1.49
1.50 -(* worker activity *)
1.51 -
1.52 -fun count_active () = (*requires SYNCHRONIZED*)
1.53 - fold (fn (_, active) => fn i => if ! active then i + 1 else i) (! workers) 0;
1.54 -
1.55 -fun find_active () = (*requires SYNCHRONIZED*)
1.56 - (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
1.57 - SOME active => active
1.58 - | NONE => raise Fail "Unregistered worker thread");
1.59 -
1.60 -
1.61 (* worker threads *)
1.62
1.63 -fun worker_wait cond = (*requires SYNCHRONIZED*)
1.64 +fun worker_wait active cond = (*requires SYNCHRONIZED*)
1.65 let
1.66 - val active = find_active ();
1.67 - val _ = active := false;
1.68 + val state =
1.69 + (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
1.70 + SOME state => state
1.71 + | NONE => raise Fail "Unregistered worker thread");
1.72 + val _ = state := (if active then Waiting else Sleeping);
1.73 val _ = wait cond;
1.74 - val _ = active := true;
1.75 + val _ = state := Working;
1.76 in () end;
1.77
1.78 fun worker_next have_work = (*requires SYNCHRONIZED*)
1.79 @@ -211,13 +208,13 @@
1.80 if have_work then signal work_available else ();
1.81 broadcast scheduler_event;
1.82 NONE)
1.83 - else if count_active () > ! max_active then
1.84 + else if count_workers Working > ! max_active then
1.85 (if have_work then signal work_available else ();
1.86 - worker_wait scheduler_event;
1.87 + worker_wait false scheduler_event;
1.88 worker_next false)
1.89 else
1.90 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
1.91 - NONE => (worker_wait work_available; worker_next true)
1.92 + NONE => (worker_wait true work_available; worker_next true)
1.93 | some => some);
1.94
1.95 fun worker_loop name =
1.96 @@ -227,7 +224,7 @@
1.97
1.98 fun worker_start name = (*requires SYNCHRONIZED*)
1.99 Unsynchronized.change workers (cons (SimpleThread.fork false (fn () => worker_loop name),
1.100 - Unsynchronized.ref true));
1.101 + Unsynchronized.ref Working));
1.102
1.103
1.104 (* scheduler *)
1.105 @@ -252,14 +249,16 @@
1.106 let
1.107 val {ready, pending, running} = Task_Queue.status (! queue);
1.108 val total = length (! workers);
1.109 - val active = count_active ();
1.110 + val active = count_workers Working;
1.111 + val waiting = count_workers Waiting;
1.112 in
1.113 "SCHEDULE " ^ Time.toString now ^ ": " ^
1.114 string_of_int ready ^ " ready, " ^
1.115 string_of_int pending ^ " pending, " ^
1.116 string_of_int running ^ " running; " ^
1.117 string_of_int total ^ " workers, " ^
1.118 - string_of_int active ^ " active "
1.119 + string_of_int active ^ " active, " ^
1.120 + string_of_int waiting ^ " waiting "
1.121 end)
1.122 else ();
1.123
1.124 @@ -373,7 +372,7 @@
1.125 else
1.126 (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
1.127 (NONE, []) => NONE
1.128 - | (NONE, deps') => (worker_wait work_finished; join_next deps')
1.129 + | (NONE, deps') => (worker_wait true work_finished; join_next deps')
1.130 | (SOME work, deps') => SOME (work, deps'));
1.131
1.132 fun execute_work NONE = ()