more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
more specific signal vs. broadcast;
execute/finish: more careful notification based on minimal/maximal status;
tuned shutdown;
1 (* Title: Pure/Concurrent/future.ML
8 * Futures are similar to delayed evaluation, i.e. delay/force is
9 generalized to fork/join (and variants). The idea is to model
10 parallel value-oriented computations, but *not* communicating
13 * Futures are grouped; failure of one group member causes the whole
14 group to be interrupted eventually.
16 * Forked futures are evaluated spontaneously by a farm of worker
17 threads in the background; join resynchronizes the computation and
18 delivers results (values or exceptions).
20 * The pool of worker threads is limited, usually in correlation with
21 the number of physical cores on the machine. Note that allocation
22 of runtime resources is distorted either if workers yield CPU time
23 (e.g. via system sleep or wait operations), or if non-worker
24 threads contend for significant runtime resources independently.
29 val enabled: unit -> bool
30 type task = Task_Queue.task
31 type group = Task_Queue.group
32 val is_worker: unit -> bool
33 val worker_group: unit -> Task_Queue.group option
35 val task_of: 'a future -> task
36 val group_of: 'a future -> group
37 val peek: 'a future -> 'a Exn.result option
38 val is_finished: 'a future -> bool
39 val value: 'a -> 'a future
40 val fork: (unit -> 'a) -> 'a future
41 val fork_group: group -> (unit -> 'a) -> 'a future
42 val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
43 val fork_pri: int -> (unit -> 'a) -> 'a future
44 val join_results: 'a future list -> 'a Exn.result list
45 val join_result: 'a future -> 'a Exn.result
46 val join: 'a future -> 'a
47 val map: ('a -> 'b) -> 'a future -> 'b future
48 val interruptible_task: ('a -> 'b) -> 'a -> 'b
49 val interrupt_task: string -> unit
50 val cancel_group: group -> unit
51 val cancel: 'a future -> unit
52 val shutdown: unit -> unit
55 structure Future: FUTURE =
61 Multithreading.enabled () andalso
62 not (Multithreading.self_critical ());
67 type task = Task_Queue.task;
68 type group = Task_Queue.group;
71 val tag = Universal.tag () : (string * task * group) option Universal.tag;
73 fun thread_data () = the_default NONE (Thread.getLocal tag);
74 fun setmp_thread_data data f x =
75 Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
78 val is_worker = is_some o thread_data;
79 val worker_group = Option.map #3 o thread_data;
84 datatype 'a future = Future of
87 result: 'a Exn.result option ref};
89 fun task_of (Future {task, ...}) = task;
90 fun group_of (Future {group, ...}) = group;
92 fun peek (Future {result, ...}) = ! result;
93 fun is_finished x = is_some (peek x);
96 {task = Task_Queue.new_task 0,
97 group = Task_Queue.new_group NONE,
98 result = ref (SOME (Exn.Result x))};
106 val queue = ref Task_Queue.empty;
108 val workers = ref ([]: (Thread.thread * bool) list);
109 val scheduler = ref (NONE: Thread.thread option);
110 val excessive = ref 0;
111 val canceled = ref ([]: Task_Queue.group list);
112 val do_shutdown = ref false;
115 (* synchronization *)
117 val scheduler_event = ConditionVar.conditionVar ();
118 val work_available = ConditionVar.conditionVar ();
119 val work_finished = ConditionVar.conditionVar ();
122 val lock = Mutex.mutex ();
125 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
127 fun wait cond = (*requires SYNCHRONIZED*)
128 ConditionVar.wait (cond, lock);
130 fun wait_timeout cond timeout = (*requires SYNCHRONIZED*)
131 ignore (ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)));
133 fun signal cond = (*requires SYNCHRONIZED*)
134 ConditionVar.signal cond;
136 fun broadcast cond = (*requires SYNCHRONIZED*)
137 ConditionVar.broadcast cond;
142 (* worker activity *)
144 fun count_active ws =
145 fold (fn (_, active) => fn i => if active then i + 1 else i) ws 0;
147 fun trace_active () = Multithreading.tracing 6 (fn () =>
150 val m = string_of_int (length ws);
151 val n = string_of_int (count_active ws);
152 in "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active" end);
154 fun change_active active = (*requires SYNCHRONIZED*)
155 change workers (AList.update Thread.equal (Thread.self (), active));
158 count_active (! workers) > Multithreading.max_threads_value ();
161 (* execute future jobs *)
163 fun future_job group (e: unit -> 'a) =
165 val result = ref (NONE: 'a Exn.result option);
171 (Multithreading.with_attributes Multithreading.restricted_interrupts
172 (fn _ => fn () => e ())) ()
173 else Exn.Exn Exn.Interrupt;
174 val _ = result := SOME res;
177 Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
178 | Exn.Result _ => true)
180 in (result, job) end;
182 fun do_cancel group = (*requires SYNCHRONIZED*)
183 change canceled (insert Task_Queue.eq_group group);
185 fun execute name (task, group, jobs) =
187 val _ = trace_active ();
188 val valid = not (Task_Queue.is_canceled group);
189 val ok = setmp_thread_data (name, task, group) (fn () =>
190 fold (fn job => fn ok => job valid andalso ok) jobs true) ();
191 val _ = SYNCHRONIZED "execute" (fn () =>
193 val maximal = change_result queue (Task_Queue.finish task);
196 else if Task_Queue.cancel (! queue) group then ()
197 else do_cancel group;
198 val _ = broadcast work_finished;
199 val _ = if maximal then () else broadcast work_available;
206 fun worker_wait cond = (*requires SYNCHRONIZED*)
207 (change_active false; broadcast scheduler_event;
209 change_active true; broadcast scheduler_event);
211 fun worker_next () = (*requires SYNCHRONIZED*)
212 if ! excessive > 0 then
214 change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
215 broadcast scheduler_event;
217 else if overloaded () then (worker_wait scheduler_event; worker_next ())
219 (case change_result queue Task_Queue.dequeue of
220 NONE => (worker_wait work_available; worker_next ())
223 fun worker_loop name =
224 (case SYNCHRONIZED name (fn () => worker_next ()) of
226 | SOME work => (execute name work; worker_loop name));
228 fun worker_start name = (*requires SYNCHRONIZED*)
229 change workers (cons (SimpleThread.fork false (fn () => worker_loop name), true));
234 fun scheduler_next () = (*requires SYNCHRONIZED*)
237 val _ = Multithreading.tracing 6 (fn () =>
238 let val {ready, pending, running} = Task_Queue.status (! queue) in
240 string_of_int ready ^ " ready, " ^
241 string_of_int pending ^ " pending, " ^
242 string_of_int running ^ " running"
247 if forall (Thread.isActive o #1) (! workers) then ()
249 (case List.partition (Thread.isActive o #1) (! workers) of
251 | (active, inactive) =>
252 (workers := active; Multithreading.tracing 0 (fn () =>
253 "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads")));
254 val _ = trace_active ();
256 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
257 val mm = (m * 3) div 2;
258 val l = length (! workers);
259 val _ = excessive := l - mm;
262 (funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ();
263 broadcast scheduler_event)
267 val _ = change canceled (filter_out (Task_Queue.cancel (! queue)));
270 Time.fromMilliseconds (if not (! do_shutdown) andalso null (! canceled) then 500 else 50);
271 val _ = interruptible (fn () => wait_timeout scheduler_event timeout) ()
272 handle Exn.Interrupt => List.app do_cancel (Task_Queue.cancel_all (! queue));
275 val continue = not (! do_shutdown andalso null (! workers));
276 val _ = if continue then () else scheduler := NONE;
277 val _ = broadcast scheduler_event;
280 fun scheduler_loop () =
281 while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ();
283 fun scheduler_active () = (*requires SYNCHRONIZED*)
284 (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
286 fun scheduler_check name = SYNCHRONIZED name (fn () =>
287 if not (scheduler_active ()) then
288 (do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop);
289 broadcast scheduler_event)
290 else if ! do_shutdown then error "Scheduler shutdown in progress"
299 fun fork_future opt_group deps pri e =
301 val _ = scheduler_check "future check";
306 | NONE => Task_Queue.new_group (worker_group ()));
307 val (result, job) = future_job group e;
308 val task = SYNCHRONIZED "future" (fn () =>
310 val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job);
311 val _ = if minimal then signal work_available else ();
313 in Future {task = task, group = group, result = result} end;
315 fun fork e = fork_future NONE [] 0 e;
316 fun fork_group group e = fork_future (SOME group) [] 0 e;
317 fun fork_deps deps e = fork_future NONE (map task_of deps) 0 e;
318 fun fork_pri pri e = fork_future NONE [] pri e;
327 NONE => Exn.Exn (SYS_ERROR "unfinished future")
328 | SOME (Exn.Exn Exn.Interrupt) =>
329 Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
332 fun join_next deps = (*requires SYNCHRONIZED*)
333 if overloaded () then (worker_wait scheduler_event; join_next deps)
334 else change_result queue (Task_Queue.dequeue_towards deps);
337 (case SYNCHRONIZED "join" (fn () => join_next deps) of
339 | SOME (work, deps') => (execute "join" work; join_deps deps'));
343 fun join_results xs =
344 if forall is_finished xs then map get_result xs
345 else uninterruptible (fn _ => fn () =>
347 val _ = scheduler_check "join check";
348 val _ = Multithreading.self_critical () andalso
349 error "Cannot join future values within critical section";
351 val worker = is_worker ();
352 val _ = if worker then join_deps (map task_of xs) else ();
355 if SYNCHRONIZED "join_wait" (fn () =>
356 is_finished x orelse ((if worker then worker_wait else wait) work_finished; false))
357 then () else join_wait x;
359 val _ = xs |> List.app (fn x =>
360 let val time = Multithreading.real_time join_wait x in
361 Multithreading.tracing_time true time
362 (fn () => "joined after " ^ Time.toString time)
364 in map get_result xs end) ();
368 fun join_result x = singleton join_results x;
369 fun join x = Exn.release (join_result x);
376 val _ = scheduler_check "map_future check";
378 val task = task_of x;
379 val group = Task_Queue.new_group (SOME (group_of x));
380 val (result, job) = future_job group (fn () => f (join x));
382 val extended = SYNCHRONIZED "map_future" (fn () =>
383 (case Task_Queue.extend task job (! queue) of
384 SOME queue' => (queue := queue'; true)
387 if extended then Future {task = task, group = group, result = result}
388 else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
394 fun interruptible_task f x =
395 if Multithreading.available then
396 Multithreading.with_attributes
398 then Multithreading.restricted_interrupts
399 else Multithreading.regular_interrupts)
401 else interruptible f x;
403 (*interrupt: permissive signal, may get ignored*)
404 fun interrupt_task id = SYNCHRONIZED "interrupt"
405 (fn () => Task_Queue.interrupt_external (! queue) id);
407 (*cancel: present and future group members will be interrupted eventually*)
408 fun cancel_group group =
409 (scheduler_check "cancel check";
410 SYNCHRONIZED "cancel" (fn () => (do_cancel group; broadcast scheduler_event)));
412 fun cancel x = cancel_group (group_of x);
415 (** global join and shutdown **)
418 if Multithreading.available then
419 (scheduler_check "shutdown check";
420 SYNCHRONIZED "shutdown" (fn () =>
421 (while not (scheduler_active ()) do wait scheduler_event;
422 while not (Task_Queue.is_empty (! queue)) do wait scheduler_event;
424 while scheduler_active () do
425 (broadcast work_available;
426 broadcast scheduler_event;
427 wait scheduler_event))))
431 (*final declarations of this structure!*)
432 val map = map_future;
436 type 'a future = 'a Future.future;