eliminated old-style sys_error/SYS_ERROR in favour of exception Fail -- after careful checking that there is no overlap with existing handling of that;
tuned some error messages;
1 (* Title: Pure/Concurrent/future.ML
4 Future values, see also
5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
9 * Futures are similar to delayed evaluation, i.e. delay/force is
10 generalized to fork/join (and variants). The idea is to model
11 parallel value-oriented computations, but *not* communicating
14 * Futures are grouped; failure of one group member causes the whole
15 group to be interrupted eventually. Groups are block-structured.
17 * Forked futures are evaluated spontaneously by a farm of worker
18 threads in the background; join resynchronizes the computation and
19 delivers results (values or exceptions).
21 * The pool of worker threads is limited, usually in correlation with
22 the number of physical cores on the machine. Note that allocation
23 of runtime resources is distorted either if workers yield CPU time
24 (e.g. via system sleep or wait operations), or if non-worker
25 threads contend for significant runtime resources independently.
27 * Promised futures are fulfilled by external means. There is no
28 associated evaluation task, but other futures can depend on them
34 type task = Task_Queue.task
35 type group = Task_Queue.group
36 val is_worker: unit -> bool
37 val worker_task: unit -> Task_Queue.task option
38 val worker_group: unit -> Task_Queue.group option
40 val task_of: 'a future -> task
41 val group_of: 'a future -> group
42 val peek: 'a future -> 'a Exn.result option
43 val is_finished: 'a future -> bool
44 val fork_group: group -> (unit -> 'a) -> 'a future
45 val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future
46 val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
47 val fork_pri: int -> (unit -> 'a) -> 'a future
48 val fork: (unit -> 'a) -> 'a future
49 val join_results: 'a future list -> 'a Exn.result list
50 val join_result: 'a future -> 'a Exn.result
51 val join: 'a future -> 'a
52 val value: 'a -> 'a future
53 val map: ('a -> 'b) -> 'a future -> 'b future
54 val promise_group: group -> 'a future
55 val promise: unit -> 'a future
56 val fulfill_result: 'a future -> 'a Exn.result -> unit
57 val fulfill: 'a future -> 'a -> unit
58 val interruptible_task: ('a -> 'b) -> 'a -> 'b
59 val cancel_group: group -> unit
60 val cancel: 'a future -> unit
61 val shutdown: unit -> unit
62 val report: (unit -> 'a) -> 'a
65 structure Future: FUTURE =
72 type task = Task_Queue.task;
73 type group = Task_Queue.group;
76 val tag = Universal.tag () : (task * group) option Universal.tag;
78 fun thread_data () = the_default NONE (Thread.getLocal tag);
79 fun setmp_thread_data data f x =
80 Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
83 val is_worker = is_some o thread_data;
84 val worker_task = Option.map #1 o thread_data;
85 val worker_group = Option.map #2 o thread_data;
87 fun new_group () = Task_Queue.new_group (worker_group ());
92 type 'a result = 'a Exn.result Single_Assignment.var;
94 datatype 'a future = Future of
100 fun task_of (Future {task, ...}) = task;
101 fun group_of (Future {group, ...}) = group;
102 fun result_of (Future {result, ...}) = result;
104 fun peek x = Single_Assignment.peek (result_of x);
105 fun is_finished x = is_some (peek x);
107 fun assign_result group result res =
109 val _ = Single_Assignment.assign result res;
112 Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
113 | Exn.Result _ => true);
120 (* synchronization *)
122 val scheduler_event = ConditionVar.conditionVar ();
123 val work_available = ConditionVar.conditionVar ();
124 val work_finished = ConditionVar.conditionVar ();
127 val lock = Mutex.mutex ();
130 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
132 fun wait cond = (*requires SYNCHRONIZED*)
133 Multithreading.sync_wait NONE NONE cond lock;
135 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
136 Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
138 fun signal cond = (*requires SYNCHRONIZED*)
139 ConditionVar.signal cond;
141 fun broadcast cond = (*requires SYNCHRONIZED*)
142 ConditionVar.broadcast cond;
144 fun broadcast_work () = (*requires SYNCHRONIZED*)
145 (ConditionVar.broadcast work_available;
146 ConditionVar.broadcast work_finished);
153 val queue = Unsynchronized.ref Task_Queue.empty;
154 val next = Unsynchronized.ref 0;
155 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
156 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
157 val do_shutdown = Unsynchronized.ref false;
158 val max_workers = Unsynchronized.ref 0;
159 val max_active = Unsynchronized.ref 0;
160 val worker_trend = Unsynchronized.ref 0;
162 datatype worker_state = Working | Waiting | Sleeping;
163 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
165 fun count_workers state = (*requires SYNCHRONIZED*)
166 fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
169 (* execute future jobs *)
171 fun future_job group (e: unit -> 'a) =
173 val result = Single_Assignment.var "future" : 'a result;
174 val pos = Position.thread_data ();
179 Exn.capture (fn () =>
180 Multithreading.with_attributes Multithreading.private_interrupts
181 (fn _ => Position.setmp_thread_data pos e ())) ()
182 else Exn.Exn Exn.Interrupt;
183 in assign_result group result res end;
184 in (result, job) end;
186 fun cancel_now group = (*requires SYNCHRONIZED*)
187 Task_Queue.cancel (! queue) group;
189 fun cancel_later group = (*requires SYNCHRONIZED*)
190 (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
191 broadcast scheduler_event);
193 fun execute (task, group, jobs) =
195 val valid = not (Task_Queue.is_canceled group);
196 val ok = setmp_thread_data (task, group) (fn () =>
197 fold (fn job => fn ok => job valid andalso ok) jobs true) ();
198 val _ = SYNCHRONIZED "finish" (fn () =>
200 val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
203 else if cancel_now group then ()
204 else cancel_later group;
205 val _ = broadcast work_finished;
206 val _ = if maximal then () else signal work_available;
213 fun worker_wait active cond = (*requires SYNCHRONIZED*)
216 (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
218 | NONE => raise Fail "Unregistered worker thread");
219 val _ = state := (if active then Waiting else Sleeping);
221 val _ = state := Working;
224 fun worker_next () = (*requires SYNCHRONIZED*)
225 if length (! workers) > ! max_workers then
226 (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
227 signal work_available;
229 else if count_workers Working > ! max_active then
230 (worker_wait false work_available; worker_next ())
232 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
233 NONE => (worker_wait false work_available; worker_next ())
234 | some => (signal work_available; some));
236 fun worker_loop name =
237 (case SYNCHRONIZED name (fn () => worker_next ()) of
239 | SOME work => (execute work; worker_loop name));
241 fun worker_start name = (*requires SYNCHRONIZED*)
242 Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
243 Unsynchronized.ref Working));
248 val status_ticks = Unsynchronized.ref 0;
250 val last_round = Unsynchronized.ref Time.zeroTime;
251 val next_round = Time.fromMilliseconds 50;
253 fun scheduler_next () = (*requires SYNCHRONIZED*)
255 val now = Time.now ();
256 val tick = Time.<= (Time.+ (! last_round, next_round), now);
257 val _ = if tick then last_round := now else ();
260 (* queue and worker status *)
263 if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
265 if tick andalso ! status_ticks = 0 then
266 Multithreading.tracing 1 (fn () =>
268 val {ready, pending, running, passive} = Task_Queue.status (! queue);
269 val total = length (! workers);
270 val active = count_workers Working;
271 val waiting = count_workers Waiting;
273 "SCHEDULE " ^ Time.toString now ^ ": " ^
274 string_of_int ready ^ " ready, " ^
275 string_of_int pending ^ " pending, " ^
276 string_of_int running ^ " running, " ^
277 string_of_int passive ^ " passive; " ^
278 string_of_int total ^ " workers, " ^
279 string_of_int active ^ " active, " ^
280 string_of_int waiting ^ " waiting "
285 if forall (Thread.isActive o #1) (! workers) then ()
288 val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
289 val _ = workers := alive;
291 Multithreading.tracing 0 (fn () =>
292 "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
296 (* worker pool adjustments *)
298 val max_active0 = ! max_active;
299 val max_workers0 = ! max_workers;
301 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
302 val _ = max_active := m;
305 if ! do_shutdown then 0
306 else if m = 9999 then 1
307 else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
309 if tick andalso mm > ! max_workers then
310 Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
311 else if tick andalso mm < ! max_workers then
312 Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
315 if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
317 else if ! worker_trend > 5 andalso ! max_workers < 2 * m then
318 max_workers := Int.min (mm, 2 * m)
321 val missing = ! max_workers - length (! workers);
324 funpow missing (fn () =>
325 ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
329 if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
330 else signal work_available;
333 (* canceled groups *)
336 if null (! canceled) then ()
338 (Multithreading.tracing 1 (fn () =>
339 string_of_int (length (! canceled)) ^ " canceled groups");
340 Unsynchronized.change canceled (filter_out cancel_now);
346 val _ = Exn.release (wait_timeout next_round scheduler_event);
351 val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
352 val continue = not (! do_shutdown andalso null (! workers));
353 val _ = if continue then () else scheduler := NONE;
355 val _ = broadcast scheduler_event;
357 handle Exn.Interrupt =>
358 (Multithreading.tracing 1 (fn () => "Interrupt");
359 List.app cancel_later (Task_Queue.cancel_all (! queue));
360 broadcast_work (); true);
362 fun scheduler_loop () =
364 Multithreading.with_attributes
365 (Multithreading.sync_interrupts Multithreading.public_interrupts)
366 (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
369 fun scheduler_active () = (*requires SYNCHRONIZED*)
370 (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
372 fun scheduler_check () = (*requires SYNCHRONIZED*)
373 (do_shutdown := false;
374 if scheduler_active () then ()
375 else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
383 fun fork_future opt_group deps pri e =
388 | SOME group => group);
389 val (result, job) = future_job group e;
390 val task = SYNCHRONIZED "enqueue" (fn () =>
392 val (task, minimal) =
393 Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job);
394 val _ = if minimal then signal work_available else ();
395 val _ = scheduler_check ();
397 in Future {promised = false, task = task, group = group, result = result} end;
399 fun fork_group group e = fork_future (SOME group) [] 0 e;
400 fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e;
401 fun fork_deps deps e = fork_deps_pri deps 0 e;
402 fun fork_pri pri e = fork_deps_pri [] pri e;
403 fun fork e = fork_deps [] e;
412 NONE => Exn.Exn (Fail "Unfinished future")
413 | SOME (exn as Exn.Exn Exn.Interrupt) =>
414 (case Exn.flatten_list (Task_Queue.group_status (group_of x)) of
416 | exns => Exn.Exn (Exn.EXCEPTIONS exns))
419 fun join_next deps = (*requires SYNCHRONIZED*)
420 if null deps then NONE
422 (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
424 | (NONE, deps') => (worker_wait true work_finished; join_next deps')
425 | (SOME work, deps') => SOME (work, deps'));
427 fun execute_work NONE = ()
428 | execute_work (SOME (work, deps')) = (execute work; join_work deps')
430 execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
432 fun join_depend task deps =
433 execute_work (SYNCHRONIZED "join" (fn () =>
434 (Unsynchronized.change queue (Task_Queue.depend task deps); join_next deps)));
438 fun join_results xs =
439 if forall is_finished xs then map get_result xs
440 else if Multithreading.self_critical () then
441 error "Cannot join future values within critical section"
443 (case worker_task () of
444 SOME task => join_depend task (map task_of xs)
445 | NONE => List.app (ignore o Single_Assignment.await o result_of) xs;
450 fun join_result x = singleton join_results x;
451 fun join x = Exn.release (join_result x);
454 (* fast-path versions -- bypassing full task management *)
458 val group = Task_Queue.new_group NONE;
459 val result = Single_Assignment.var "value" : 'a result;
460 val _ = assign_result group result (Exn.Result x);
461 in Future {promised = false, task = Task_Queue.dummy_task, group = group, result = result} end;
465 val task = task_of x;
466 val group = Task_Queue.new_group (SOME (group_of x));
467 val (result, job) = future_job group (fn () => f (join x));
469 val extended = SYNCHRONIZED "extend" (fn () =>
470 (case Task_Queue.extend task job (! queue) of
471 SOME queue' => (queue := queue'; true)
474 if extended then Future {promised = false, task = task, group = group, result = result}
475 else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
479 (* promised futures -- fulfilled by external means *)
481 fun promise_group group : 'a future =
483 val result = Single_Assignment.var "promise" : 'a result;
484 val task = SYNCHRONIZED "enqueue" (fn () =>
485 Unsynchronized.change_result queue (Task_Queue.enqueue_passive group));
486 in Future {promised = true, task = task, group = group, result = result} end;
488 fun promise () = promise_group (new_group ());
490 fun fulfill_result (Future {promised, task, group, result}) res =
492 val _ = promised orelse raise Fail "Not a promised future";
493 fun job ok = assign_result group result (if ok then res else Exn.Exn Exn.Interrupt);
494 val _ = execute (task, group, [job]);
497 fun fulfill x res = fulfill_result x (Exn.Result res);
502 fun interruptible_task f x =
503 if Multithreading.available then
504 Multithreading.with_attributes
506 then Multithreading.private_interrupts
507 else Multithreading.public_interrupts)
509 else interruptible f x;
511 (*cancel: present and future group members will be interrupted eventually*)
512 fun cancel_group group =
513 SYNCHRONIZED "cancel" (fn () => if cancel_now group then () else cancel_later group);
514 fun cancel x = cancel_group (group_of x);
520 if Multithreading.available then
521 SYNCHRONIZED "shutdown" (fn () =>
522 while scheduler_active () do
523 (wait scheduler_event; broadcast_work ()))
531 val _ = Output.status (Markup.markup Markup.forked "");
532 val x = e (); (*sic -- report "joined" only for success*)
533 val _ = Output.status (Markup.markup Markup.joined "");
537 (*final declarations of this structure!*)
538 val map = map_future;
542 type 'a future = 'a Future.future;