1.1 --- a/src/Pure/Concurrent/future.ML Mon Jul 27 12:00:02 2009 +0200
1.2 +++ b/src/Pure/Concurrent/future.ML Mon Jul 27 12:11:18 2009 +0200
1.3 @@ -114,20 +114,26 @@
1.4
1.5 (* synchronization *)
1.6
1.7 +val scheduler_event = ConditionVar.conditionVar ();
1.8 +val work_available = ConditionVar.conditionVar ();
1.9 +val work_finished = ConditionVar.conditionVar ();
1.10 +
1.11 local
1.12 val lock = Mutex.mutex ();
1.13 - val cond = ConditionVar.conditionVar ();
1.14 in
1.15
1.16 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
1.17
1.18 -fun wait () = (*requires SYNCHRONIZED*)
1.19 +fun wait cond = (*requires SYNCHRONIZED*)
1.20 ConditionVar.wait (cond, lock);
1.21
1.22 -fun wait_timeout timeout = (*requires SYNCHRONIZED*)
1.23 +fun wait_timeout cond timeout = (*requires SYNCHRONIZED*)
1.24 ignore (ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)));
1.25
1.26 -fun notify_all () = (*requires SYNCHRONIZED*)
1.27 +fun signal cond = (*requires SYNCHRONIZED*)
1.28 + ConditionVar.signal cond;
1.29 +
1.30 +fun broadcast cond = (*requires SYNCHRONIZED*)
1.31 ConditionVar.broadcast cond;
1.32
1.33 end;
1.34 @@ -183,29 +189,35 @@
1.35 val ok = setmp_thread_data (name, task, group) (fn () =>
1.36 fold (fn job => fn ok => job valid andalso ok) jobs true) ();
1.37 val _ = SYNCHRONIZED "execute" (fn () =>
1.38 - (change queue (Task_Queue.finish task);
1.39 - if ok then ()
1.40 - else if Task_Queue.cancel (! queue) group then ()
1.41 - else do_cancel group;
1.42 - notify_all ()));
1.43 + let
1.44 + val maximal = change_result queue (Task_Queue.finish task);
1.45 + val _ =
1.46 + if ok then ()
1.47 + else if Task_Queue.cancel (! queue) group then ()
1.48 + else do_cancel group;
1.49 + val _ = broadcast work_finished;
1.50 + val _ = if maximal then () else broadcast work_available;
1.51 + in () end);
1.52 in () end;
1.53
1.54
1.55 (* worker threads *)
1.56
1.57 -fun worker_wait () = (*requires SYNCHRONIZED*)
1.58 - (change_active false; wait (); change_active true);
1.59 +fun worker_wait cond = (*requires SYNCHRONIZED*)
1.60 + (change_active false; broadcast scheduler_event;
1.61 + wait cond;
1.62 + change_active true; broadcast scheduler_event);
1.63
1.64 fun worker_next () = (*requires SYNCHRONIZED*)
1.65 if ! excessive > 0 then
1.66 (dec excessive;
1.67 change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
1.68 - notify_all ();
1.69 + broadcast scheduler_event;
1.70 NONE)
1.71 - else if overloaded () then (worker_wait (); worker_next ())
1.72 + else if overloaded () then (worker_wait scheduler_event; worker_next ())
1.73 else
1.74 (case change_result queue Task_Queue.dequeue of
1.75 - NONE => (worker_wait (); worker_next ())
1.76 + NONE => (worker_wait work_available; worker_next ())
1.77 | some => some);
1.78
1.79 fun worker_loop name =
1.80 @@ -231,11 +243,10 @@
1.81 end);
1.82
1.83 (*worker threads*)
1.84 - val ws = ! workers;
1.85 val _ =
1.86 - if forall (Thread.isActive o #1) ws then ()
1.87 + if forall (Thread.isActive o #1) (! workers) then ()
1.88 else
1.89 - (case List.partition (Thread.isActive o #1) ws of
1.90 + (case List.partition (Thread.isActive o #1) (! workers) of
1.91 (_, []) => ()
1.92 | (active, inactive) =>
1.93 (workers := active; Multithreading.tracing 0 (fn () =>
1.94 @@ -244,24 +255,26 @@
1.95
1.96 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
1.97 val mm = (m * 3) div 2;
1.98 - val l = length ws;
1.99 + val l = length (! workers);
1.100 val _ = excessive := l - mm;
1.101 val _ =
1.102 if mm > l then
1.103 - funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ()
1.104 + (funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ();
1.105 + broadcast scheduler_event)
1.106 else ();
1.107
1.108 (*canceled groups*)
1.109 val _ = change canceled (filter_out (Task_Queue.cancel (! queue)));
1.110
1.111 + val timeout =
1.112 + Time.fromMilliseconds (if not (! do_shutdown) andalso null (! canceled) then 500 else 50);
1.113 + val _ = interruptible (fn () => wait_timeout scheduler_event timeout) ()
1.114 + handle Exn.Interrupt => List.app do_cancel (Task_Queue.cancel_all (! queue));
1.115 +
1.116 (*shutdown*)
1.117 - val continue = not (! do_shutdown andalso null ws);
1.118 + val continue = not (! do_shutdown andalso null (! workers));
1.119 val _ = if continue then () else scheduler := NONE;
1.120 -
1.121 - val _ = notify_all ();
1.122 - val _ = interruptible (fn () =>
1.123 - wait_timeout (Time.fromMilliseconds (if null (! canceled) then 1000 else 50))) ()
1.124 - handle Exn.Interrupt => List.app do_cancel (Task_Queue.cancel_all (! queue));
1.125 + val _ = broadcast scheduler_event;
1.126 in continue end;
1.127
1.128 fun scheduler_loop () =
1.129 @@ -272,7 +285,8 @@
1.130
1.131 fun scheduler_check name = SYNCHRONIZED name (fn () =>
1.132 if not (scheduler_active ()) then
1.133 - (do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop))
1.134 + (do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop);
1.135 + broadcast scheduler_event)
1.136 else if ! do_shutdown then error "Scheduler shutdown in progress"
1.137 else ());
1.138
1.139 @@ -292,7 +306,10 @@
1.140 | NONE => Task_Queue.new_group (worker_group ()));
1.141 val (result, job) = future_job group e;
1.142 val task = SYNCHRONIZED "future" (fn () =>
1.143 - change_result queue (Task_Queue.enqueue group deps pri job) before notify_all ());
1.144 + let
1.145 + val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job);
1.146 + val _ = if minimal then signal work_available else ();
1.147 + in task end);
1.148 in Future {task = task, group = group, result = result} end;
1.149
1.150 fun fork e = fork_future NONE [] 0 e;
1.151 @@ -313,7 +330,7 @@
1.152 | SOME res => res);
1.153
1.154 fun join_next deps = (*requires SYNCHRONIZED*)
1.155 - if overloaded () then (worker_wait (); join_next deps)
1.156 + if overloaded () then (worker_wait scheduler_event; join_next deps)
1.157 else change_result queue (Task_Queue.dequeue_towards deps);
1.158
1.159 fun join_deps deps =
1.160 @@ -336,7 +353,7 @@
1.161
1.162 fun join_wait x =
1.163 if SYNCHRONIZED "join_wait" (fn () =>
1.164 - is_finished x orelse (if worker then worker_wait () else wait (); false))
1.165 + is_finished x orelse ((if worker then worker_wait else wait) work_finished; false))
1.166 then () else join_wait x;
1.167
1.168 val _ = xs |> List.app (fn x =>
1.169 @@ -390,7 +407,7 @@
1.170 (*cancel: present and future group members will be interrupted eventually*)
1.171 fun cancel_group group =
1.172 (scheduler_check "cancel check";
1.173 - SYNCHRONIZED "cancel" (fn () => (do_cancel group; notify_all ())));
1.174 + SYNCHRONIZED "cancel" (fn () => (do_cancel group; broadcast scheduler_event)));
1.175
1.176 fun cancel x = cancel_group (group_of x);
1.177
1.178 @@ -401,13 +418,13 @@
1.179 if Multithreading.available then
1.180 (scheduler_check "shutdown check";
1.181 SYNCHRONIZED "shutdown" (fn () =>
1.182 - (while not (scheduler_active ()) do wait ();
1.183 - while not (Task_Queue.is_empty (! queue)) do wait ();
1.184 + (while not (scheduler_active ()) do wait scheduler_event;
1.185 + while not (Task_Queue.is_empty (! queue)) do wait scheduler_event;
1.186 do_shutdown := true;
1.187 - notify_all ();
1.188 - while not (null (! workers)) do wait ();
1.189 - while scheduler_active () do wait ();
1.190 - OS.Process.sleep (Time.fromMilliseconds 300))))
1.191 + while scheduler_active () do
1.192 + (broadcast work_available;
1.193 + broadcast scheduler_event;
1.194 + wait scheduler_event))))
1.195 else ();
1.196
1.197