1 (* Title: Pure/Concurrent/future.ML
4 Future values, see also
5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
9 * Futures are similar to delayed evaluation, i.e. delay/force is
10 generalized to fork/join (and variants). The idea is to model
11 parallel value-oriented computations, but *not* communicating
14 * Futures are grouped; failure of one group member causes the whole
15 group to be interrupted eventually. Groups are block-structured.
17 * Forked futures are evaluated spontaneously by a farm of worker
18 threads in the background; join resynchronizes the computation and
19 delivers results (values or exceptions).
21 * The pool of worker threads is limited, usually in correlation with
22 the number of physical cores on the machine. Note that allocation
23 of runtime resources is distorted either if workers yield CPU time
24 (e.g. via system sleep or wait operations), or if non-worker
25 threads contend for significant runtime resources independently.
30 type task = Task_Queue.task
31 type group = Task_Queue.group
32 val is_worker: unit -> bool
33 val worker_task: unit -> Task_Queue.task option
34 val worker_group: unit -> Task_Queue.group option
36 val task_of: 'a future -> task
37 val group_of: 'a future -> group
38 val peek: 'a future -> 'a Exn.result option
39 val is_finished: 'a future -> bool
40 val value: 'a -> 'a future
41 val fork_group: group -> (unit -> 'a) -> 'a future
42 val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future
43 val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
44 val fork_pri: int -> (unit -> 'a) -> 'a future
45 val fork: (unit -> 'a) -> 'a future
46 val join_results: 'a future list -> 'a Exn.result list
47 val join_result: 'a future -> 'a Exn.result
48 val join: 'a future -> 'a
49 val map: ('a -> 'b) -> 'a future -> 'b future
50 val interruptible_task: ('a -> 'b) -> 'a -> 'b
51 val cancel_group: group -> unit
52 val cancel: 'a future -> unit
53 val shutdown: unit -> unit
56 structure Future: FUTURE =
63 type task = Task_Queue.task;
64 type group = Task_Queue.group;
67 val tag = Universal.tag () : (task * group) option Universal.tag;
69 fun thread_data () = the_default NONE (Thread.getLocal tag);
70 fun setmp_thread_data data f x =
71 Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
74 val is_worker = is_some o thread_data;
75 val worker_task = Option.map #1 o thread_data;
76 val worker_group = Option.map #2 o thread_data;
81 datatype 'a future = Future of
84 result: 'a Exn.result option Synchronized.var};
86 fun task_of (Future {task, ...}) = task;
87 fun group_of (Future {group, ...}) = group;
88 fun result_of (Future {result, ...}) = result;
90 fun peek x = Synchronized.value (result_of x);
91 fun is_finished x = is_some (peek x);
94 {task = Task_Queue.new_task 0,
95 group = Task_Queue.new_group NONE,
96 result = Synchronized.var "future" (SOME (Exn.Result x))};
104 val queue = Unsynchronized.ref Task_Queue.empty;
105 val next = Unsynchronized.ref 0;
106 val max_workers = Unsynchronized.ref 0;
107 val max_active = Unsynchronized.ref 0;
108 val workers = Unsynchronized.ref ([]: (Thread.thread * bool Unsynchronized.ref) list);
109 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
110 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
111 val do_shutdown = Unsynchronized.ref false;
114 (* synchronization *)
116 val scheduler_event = ConditionVar.conditionVar ();
117 val work_available = ConditionVar.conditionVar ();
118 val work_finished = ConditionVar.conditionVar ();
121 val lock = Mutex.mutex ();
124 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
126 fun wait cond = (*requires SYNCHRONIZED*)
127 Multithreading.sync_wait NONE NONE cond lock;
129 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
130 Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
132 fun signal cond = (*requires SYNCHRONIZED*)
133 ConditionVar.signal cond;
135 fun broadcast cond = (*requires SYNCHRONIZED*)
136 ConditionVar.broadcast cond;
138 fun broadcast_work () = (*requires SYNCHRONIZED*)
139 (ConditionVar.broadcast work_available;
140 ConditionVar.broadcast work_finished);
145 (* execute future jobs *)
147 fun future_job group (e: unit -> 'a) =
149 val result = Synchronized.var "future" (NONE: 'a Exn.result option);
154 Exn.capture (fn () =>
155 Multithreading.with_attributes Multithreading.private_interrupts (fn _ => e ())) ()
156 else Exn.Exn Exn.Interrupt;
157 val _ = Synchronized.assign result (K (SOME res));
160 Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
161 | Exn.Result _ => true)
163 in (result, job) end;
165 fun do_cancel group = (*requires SYNCHRONIZED*)
166 (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
167 broadcast scheduler_event);
169 fun execute (task, group, jobs) =
171 val valid = not (Task_Queue.is_canceled group);
172 val ok = setmp_thread_data (task, group) (fn () =>
173 fold (fn job => fn ok => job valid andalso ok) jobs true) ();
174 val _ = SYNCHRONIZED "finish" (fn () =>
176 val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
179 else if Task_Queue.cancel (! queue) group then ()
180 else do_cancel group;
181 val _ = broadcast work_finished;
182 val _ = if maximal then () else broadcast work_available;
187 (* worker activity *)
189 fun count_active () = (*requires SYNCHRONIZED*)
190 fold (fn (_, active) => fn i => if ! active then i + 1 else i) (! workers) 0;
192 fun find_active () = (*requires SYNCHRONIZED*)
193 (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
194 SOME active => active
195 | NONE => raise Fail "Unregistered worker thread");
200 fun worker_wait cond = (*requires SYNCHRONIZED*)
202 val active = find_active ();
203 val _ = active := false;
205 val _ = active := true;
208 fun worker_next have_work = (*requires SYNCHRONIZED*)
209 if length (! workers) > ! max_workers then
210 (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
211 if have_work then signal work_available else ();
212 broadcast scheduler_event;
214 else if count_active () > ! max_active then
215 (if have_work then signal work_available else ();
216 worker_wait scheduler_event;
219 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
220 NONE => (worker_wait work_available; worker_next true)
223 fun worker_loop name =
224 (case SYNCHRONIZED name (fn () => worker_next false) of
226 | SOME work => (execute work; worker_loop name));
228 fun worker_start name = (*requires SYNCHRONIZED*)
229 Unsynchronized.change workers (cons (SimpleThread.fork false (fn () => worker_loop name),
230 Unsynchronized.ref true));
235 val status_ticks = Unsynchronized.ref 0;
237 val last_round = Unsynchronized.ref Time.zeroTime;
238 val next_round = Time.fromMilliseconds 50;
240 fun scheduler_next () = (*requires SYNCHRONIZED*)
242 val now = Time.now ();
243 val tick = Time.<= (Time.+ (! last_round, next_round), now);
244 val _ = if tick then last_round := now else ();
246 (*queue and worker status*)
248 if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
250 if tick andalso ! status_ticks = 0 then
251 Multithreading.tracing 1 (fn () =>
253 val {ready, pending, running} = Task_Queue.status (! queue);
254 val total = length (! workers);
255 val active = count_active ();
257 "SCHEDULE " ^ Time.toString now ^ ": " ^
258 string_of_int ready ^ " ready, " ^
259 string_of_int pending ^ " pending, " ^
260 string_of_int running ^ " running; " ^
261 string_of_int total ^ " workers, " ^
262 string_of_int active ^ " active "
268 if forall (Thread.isActive o #1) (! workers) then ()
271 val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
272 val _ = workers := alive;
274 Multithreading.tracing 0 (fn () =>
275 "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
278 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
279 val _ = max_active := m;
281 val mm = if m = 9999 then 1 else m * 2;
282 val _ = max_workers := mm;
284 val missing = ! max_workers - length (! workers);
287 (funpow missing (fn () =>
288 ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ();
289 broadcast scheduler_event)
294 if null (! canceled) then ()
296 (Multithreading.tracing 1 (fn () =>
297 string_of_int (length (! canceled)) ^ " canceled groups");
298 Unsynchronized.change canceled (filter_out (Task_Queue.cancel (! queue)));
302 val _ = Exn.release (wait_timeout next_round scheduler_event);
305 val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
306 val continue = not (! do_shutdown andalso null (! workers));
307 val _ = if continue then () else scheduler := NONE;
308 val _ = broadcast scheduler_event;
310 handle Exn.Interrupt =>
311 (Multithreading.tracing 1 (fn () => "Interrupt");
312 uninterruptible (fn _ => fn () => List.app do_cancel (Task_Queue.cancel_all (! queue))) ();
315 fun scheduler_loop () =
316 Multithreading.with_attributes
317 (Multithreading.sync_interrupts Multithreading.public_interrupts)
318 (fn _ => while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ());
320 fun scheduler_active () = (*requires SYNCHRONIZED*)
321 (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
323 fun scheduler_check () = (*requires SYNCHRONIZED*)
324 (do_shutdown := false;
325 if scheduler_active () then ()
326 else scheduler := SOME (SimpleThread.fork false scheduler_loop));
334 fun fork_future opt_group deps pri e =
339 | NONE => Task_Queue.new_group (worker_group ()));
340 val (result, job) = future_job group e;
341 val task = SYNCHRONIZED "enqueue" (fn () =>
343 val (task, minimal) =
344 Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job);
345 val _ = if minimal then signal work_available else ();
346 val _ = scheduler_check ();
348 in Future {task = task, group = group, result = result} end;
350 fun fork_group group e = fork_future (SOME group) [] 0 e;
351 fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e;
352 fun fork_deps deps e = fork_deps_pri deps 0 e;
353 fun fork_pri pri e = fork_deps_pri [] pri e;
354 fun fork e = fork_deps [] e;
363 NONE => Exn.Exn (SYS_ERROR "unfinished future")
364 | SOME (Exn.Exn Exn.Interrupt) =>
365 Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
369 Synchronized.readonly_access (result_of x) (fn NONE => NONE | SOME _ => SOME ());
371 fun join_next deps = (*requires SYNCHRONIZED*)
372 if null deps then NONE
374 (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
376 | (NONE, deps') => (worker_wait work_finished; join_next deps')
377 | (SOME work, deps') => SOME (work, deps'));
379 fun execute_work NONE = ()
380 | execute_work (SOME (work, deps')) = (execute work; join_work deps')
382 execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
384 fun join_depend task deps =
385 execute_work (SYNCHRONIZED "join" (fn () =>
386 (Unsynchronized.change queue (Task_Queue.depend task deps); join_next deps)));
390 fun join_results xs =
391 if forall is_finished xs then map get_result xs
392 else if Multithreading.self_critical () then
393 error "Cannot join future values within critical section"
395 (case worker_task () of
396 SOME task => join_depend task (map task_of xs)
397 | NONE => List.app passive_wait xs;
402 fun join_result x = singleton join_results x;
403 fun join x = Exn.release (join_result x);
410 val task = task_of x;
411 val group = Task_Queue.new_group (SOME (group_of x));
412 val (result, job) = future_job group (fn () => f (join x));
414 val extended = SYNCHRONIZED "extend" (fn () =>
415 (case Task_Queue.extend task job (! queue) of
416 SOME queue' => (queue := queue'; true)
419 if extended then Future {task = task, group = group, result = result}
420 else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
426 fun interruptible_task f x =
427 if Multithreading.available then
428 Multithreading.with_attributes
430 then Multithreading.private_interrupts
431 else Multithreading.public_interrupts)
433 else interruptible f x;
435 (*cancel: present and future group members will be interrupted eventually*)
436 fun cancel_group group = SYNCHRONIZED "cancel" (fn () => do_cancel group);
437 fun cancel x = cancel_group (group_of x);
443 if Multithreading.available then
444 SYNCHRONIZED "shutdown" (fn () =>
445 while scheduler_active () do
446 (wait scheduler_event; broadcast_work ()))
450 (*final declarations of this structure!*)
451 val map = map_future;
455 type 'a future = 'a Future.future;