added Future.promise/fulfill -- promised futures that are fulfilled by external means;
Future.value: official result assignment -- produces immutable ref;
Future.shutdown: raw_wait keeps raw task attributes, e.g. asynchronous interrupts of toplevel;
Task_Queue: passive tasks track dependencies, but lack any evaluation process;
tuned;
1.1 --- a/src/Pure/Concurrent/future.ML Tue Jan 05 18:20:18 2010 +0100
1.2 +++ b/src/Pure/Concurrent/future.ML Tue Jan 05 23:38:10 2010 +0100
1.3 @@ -23,6 +23,10 @@
1.4 of runtime resources is distorted either if workers yield CPU time
1.5 (e.g. via system sleep or wait operations), or if non-worker
1.6 threads contend for significant runtime resources independently.
1.7 +
1.8 + * Promised futures are fulfilled by external means. There is no
1.9 + associated evaluation task, but other futures can depend on them
1.10 + as usual.
1.11 *)
1.12
1.13 signature FUTURE =
1.14 @@ -37,7 +41,6 @@
1.15 val group_of: 'a future -> group
1.16 val peek: 'a future -> 'a Exn.result option
1.17 val is_finished: 'a future -> bool
1.18 - val value: 'a -> 'a future
1.19 val fork_group: group -> (unit -> 'a) -> 'a future
1.20 val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future
1.21 val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
1.22 @@ -46,7 +49,12 @@
1.23 val join_results: 'a future list -> 'a Exn.result list
1.24 val join_result: 'a future -> 'a Exn.result
1.25 val join: 'a future -> 'a
1.26 + val value: 'a -> 'a future
1.27 val map: ('a -> 'b) -> 'a future -> 'b future
1.28 + val promise_group: group -> 'a future
1.29 + val promise: unit -> 'a future
1.30 + val fulfill_result: 'a future -> 'a Exn.result -> unit
1.31 + val fulfill: 'a future -> 'a -> unit
1.32 val interruptible_task: ('a -> 'b) -> 'a -> 'b
1.33 val cancel_group: group -> unit
1.34 val cancel: 'a future -> unit
1.35 @@ -75,11 +83,14 @@
1.36 val worker_task = Option.map #1 o thread_data;
1.37 val worker_group = Option.map #2 o thread_data;
1.38
1.39 +fun new_group () = Task_Queue.new_group (worker_group ());
1.40 +
1.41
1.42 (* datatype future *)
1.43
1.44 datatype 'a future = Future of
1.45 - {task: task,
1.46 + {promised: bool,
1.47 + task: task,
1.48 group: group,
1.49 result: 'a Exn.result option Synchronized.var};
1.50
1.51 @@ -90,10 +101,14 @@
1.52 fun peek x = Synchronized.value (result_of x);
1.53 fun is_finished x = is_some (peek x);
1.54
1.55 -fun value x = Future
1.56 - {task = Task_Queue.new_task 0,
1.57 - group = Task_Queue.new_group NONE,
1.58 - result = Synchronized.var "future" (SOME (Exn.Result x))};
1.59 +fun assign_result group result res =
1.60 + let
1.61 + val _ = Synchronized.assign result (K (SOME res));
1.62 + val ok =
1.63 + (case res of
1.64 + Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
1.65 + | Exn.Result _ => true);
1.66 + in ok end;
1.67
1.68
1.69
1.70 @@ -111,6 +126,9 @@
1.71
1.72 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
1.73
1.74 +fun raw_wait cond = (*requires SYNCHRONIZED*)
1.75 + ConditionVar.wait (cond, lock);
1.76 +
1.77 fun wait cond = (*requires SYNCHRONIZED*)
1.78 Multithreading.sync_wait NONE NONE cond lock;
1.79
1.80 @@ -160,12 +178,7 @@
1.81 Exn.capture (fn () =>
1.82 Multithreading.with_attributes Multithreading.private_interrupts (fn _ => e ())) ()
1.83 else Exn.Exn Exn.Interrupt;
1.84 - val _ = Synchronized.assign result (K (SOME res));
1.85 - in
1.86 - (case res of
1.87 - Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
1.88 - | Exn.Result _ => true)
1.89 - end;
1.90 + in assign_result group result res end;
1.91 in (result, job) end;
1.92
1.93 fun do_cancel group = (*requires SYNCHRONIZED*)
1.94 @@ -247,7 +260,7 @@
1.95 if tick andalso ! status_ticks = 0 then
1.96 Multithreading.tracing 1 (fn () =>
1.97 let
1.98 - val {ready, pending, running} = Task_Queue.status (! queue);
1.99 + val {ready, pending, running, passive} = Task_Queue.status (! queue);
1.100 val total = length (! workers);
1.101 val active = count_workers Working;
1.102 val waiting = count_workers Waiting;
1.103 @@ -255,7 +268,8 @@
1.104 "SCHEDULE " ^ Time.toString now ^ ": " ^
1.105 string_of_int ready ^ " ready, " ^
1.106 string_of_int pending ^ " pending, " ^
1.107 - string_of_int running ^ " running; " ^
1.108 + string_of_int running ^ " running, " ^
1.109 + string_of_int passive ^ " passive; " ^
1.110 string_of_int total ^ " workers, " ^
1.111 string_of_int active ^ " active, " ^
1.112 string_of_int waiting ^ " waiting "
1.113 @@ -329,7 +343,7 @@
1.114
1.115 (* shutdown *)
1.116
1.117 - val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
1.118 + val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
1.119 val continue = not (! do_shutdown andalso null (! workers));
1.120 val _ = if continue then () else scheduler := NONE;
1.121
1.122 @@ -364,8 +378,8 @@
1.123 let
1.124 val group =
1.125 (case opt_group of
1.126 - SOME group => group
1.127 - | NONE => Task_Queue.new_group (worker_group ()));
1.128 + NONE => new_group ()
1.129 + | SOME group => group);
1.130 val (result, job) = future_job group e;
1.131 val task = SYNCHRONIZED "enqueue" (fn () =>
1.132 let
1.133 @@ -374,7 +388,7 @@
1.134 val _ = if minimal then signal work_available else ();
1.135 val _ = scheduler_check ();
1.136 in task end);
1.137 - in Future {task = task, group = group, result = result} end;
1.138 + in Future {promised = false, task = task, group = group, result = result} end;
1.139
1.140 fun fork_group group e = fork_future (SOME group) [] 0 e;
1.141 fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e;
1.142 @@ -432,7 +446,14 @@
1.143 fun join x = Exn.release (join_result x);
1.144
1.145
1.146 -(* map *)
1.147 +(* fast-path versions -- bypassing full task management *)
1.148 +
1.149 +fun value (x: 'a) =
1.150 + let
1.151 + val group = Task_Queue.new_group NONE;
1.152 + val result = Synchronized.var "value" NONE : 'a Exn.result option Synchronized.var;
1.153 + val _ = assign_result group result (Exn.Result x);
1.154 + in Future {promised = false, task = Task_Queue.dummy_task, group = group, result = result} end;
1.155
1.156 fun map_future f x =
1.157 let
1.158 @@ -445,11 +466,32 @@
1.159 SOME queue' => (queue := queue'; true)
1.160 | NONE => false));
1.161 in
1.162 - if extended then Future {task = task, group = group, result = result}
1.163 + if extended then Future {promised = false, task = task, group = group, result = result}
1.164 else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
1.165 end;
1.166
1.167
1.168 +(* promised futures -- fulfilled by external means *)
1.169 +
1.170 +fun promise_group group : 'a future =
1.171 + let
1.172 + val result = Synchronized.var "promise" (NONE: 'a Exn.result option);
1.173 + val task = SYNCHRONIZED "enqueue" (fn () =>
1.174 + Unsynchronized.change_result queue (Task_Queue.enqueue_passive group));
1.175 + in Future {promised = true, task = task, group = group, result = result} end;
1.176 +
1.177 +fun promise () = promise_group (new_group ());
1.178 +
1.179 +fun fulfill_result (Future {promised, task, group, result}) res =
1.180 + let
1.181 + val _ = promised orelse raise Fail "Not a promised future";
1.182 + fun job ok = assign_result group result (if ok then res else Exn.Exn Exn.Interrupt);
1.183 + val _ = execute (task, group, [job]);
1.184 + in () end;
1.185 +
1.186 +fun fulfill x res = fulfill_result x (Exn.Result res);
1.187 +
1.188 +
1.189 (* cancellation *)
1.190
1.191 fun interruptible_task f x =
1.192 @@ -472,7 +514,7 @@
1.193 if Multithreading.available then
1.194 SYNCHRONIZED "shutdown" (fn () =>
1.195 while scheduler_active () do
1.196 - (wait scheduler_event; broadcast_work ()))
1.197 + (raw_wait scheduler_event; broadcast_work ()))
1.198 else ();
1.199
1.200
2.1 --- a/src/Pure/Concurrent/task_queue.ML Tue Jan 05 18:20:18 2010 +0100
2.2 +++ b/src/Pure/Concurrent/task_queue.ML Tue Jan 05 23:38:10 2010 +0100
2.3 @@ -7,7 +7,7 @@
2.4 signature TASK_QUEUE =
2.5 sig
2.6 type task
2.7 - val new_task: int -> task
2.8 + val dummy_task: task
2.9 val pri_of_task: task -> int
2.10 val str_of_task: task -> string
2.11 type group
2.12 @@ -20,10 +20,11 @@
2.13 val str_of_group: group -> string
2.14 type queue
2.15 val empty: queue
2.16 - val is_empty: queue -> bool
2.17 - val status: queue -> {ready: int, pending: int, running: int}
2.18 + val all_passive: queue -> bool
2.19 + val status: queue -> {ready: int, pending: int, running: int, passive: int}
2.20 val cancel: queue -> group -> bool
2.21 val cancel_all: queue -> group list
2.22 + val enqueue_passive: group -> queue -> task * queue
2.23 val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue
2.24 val extend: task -> (bool -> bool) -> queue -> queue option
2.25 val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue
2.26 @@ -38,13 +39,14 @@
2.27
2.28 (* tasks *)
2.29
2.30 -datatype task = Task of int * serial;
2.31 +datatype task = Task of int option * serial;
2.32 +val dummy_task = Task (NONE, ~1);
2.33 fun new_task pri = Task (pri, serial ());
2.34
2.35 -fun pri_of_task (Task (pri, _)) = pri;
2.36 +fun pri_of_task (Task (pri, _)) = the_default 0 pri;
2.37 fun str_of_task (Task (_, i)) = string_of_int i;
2.38
2.39 -fun task_ord (Task t1, Task t2) = prod_ord (rev_order o int_ord) int_ord (t1, t2);
2.40 +fun task_ord (Task t1, Task t2) = prod_ord (rev_order o option_ord int_ord) int_ord (t1, t2);
2.41 structure Task_Graph = Graph(type key = task val ord = task_ord);
2.42
2.43
2.44 @@ -91,7 +93,8 @@
2.45
2.46 datatype job =
2.47 Job of (bool -> bool) list |
2.48 - Running of Thread.thread;
2.49 + Running of Thread.thread |
2.50 + Passive;
2.51
2.52 type jobs = (group * job) Task_Graph.T;
2.53
2.54 @@ -123,20 +126,24 @@
2.55 fun make_queue groups jobs cache = Queue {groups = groups, jobs = jobs, cache = cache};
2.56
2.57 val empty = make_queue Inttab.empty Task_Graph.empty No_Result;
2.58 -fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs;
2.59 +
2.60 +fun all_passive (Queue {jobs, ...}) =
2.61 + Task_Graph.get_first NONE
2.62 + ((fn Job _ => SOME () | Running _ => SOME () | Passive => NONE) o #2 o #1 o #2) jobs |> is_none;
2.63
2.64
2.65 (* queue status *)
2.66
2.67 fun status (Queue {jobs, ...}) =
2.68 let
2.69 - val (x, y, z) =
2.70 - Task_Graph.fold (fn (_, ((_, job), (deps, _))) => fn (x, y, z) =>
2.71 + val (x, y, z, w) =
2.72 + Task_Graph.fold (fn (_, ((_, job), (deps, _))) => fn (x, y, z, w) =>
2.73 (case job of
2.74 - Job _ => if null deps then (x + 1, y, z) else (x, y + 1, z)
2.75 - | Running _ => (x, y, z + 1)))
2.76 - jobs (0, 0, 0);
2.77 - in {ready = x, pending = y, running = z} end;
2.78 + Job _ => if null deps then (x + 1, y, z, w) else (x, y + 1, z, w)
2.79 + | Running _ => (x, y, z + 1, w)
2.80 + | Passive => (x, y, z, w + 1)))
2.81 + jobs (0, 0, 0, 0);
2.82 + in {ready = x, pending = y, running = z, passive = w} end;
2.83
2.84
2.85 (* cancel -- peers and sub-groups *)
2.86 @@ -154,18 +161,27 @@
2.87 let
2.88 fun cancel_job (group, job) (groups, running) =
2.89 (cancel_group group Exn.Interrupt;
2.90 - (case job of Running t => (insert eq_group group groups, insert Thread.equal t running)
2.91 + (case job of
2.92 + Running t => (insert eq_group group groups, insert Thread.equal t running)
2.93 | _ => (groups, running)));
2.94 - val (groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
2.95 + val (running_groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
2.96 val _ = List.app SimpleThread.interrupt running;
2.97 - in groups end;
2.98 + in running_groups end;
2.99
2.100
2.101 (* enqueue *)
2.102
2.103 +fun enqueue_passive group (Queue {groups, jobs, cache}) =
2.104 + let
2.105 + val task = new_task NONE;
2.106 + val groups' = groups
2.107 + |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
2.108 + val jobs' = jobs |> Task_Graph.new_node (task, (group, Passive));
2.109 + in (task, make_queue groups' jobs' cache) end;
2.110 +
2.111 fun enqueue group deps pri job (Queue {groups, jobs, cache}) =
2.112 let
2.113 - val task = new_task pri;
2.114 + val task = new_task (SOME pri);
2.115 val groups' = groups
2.116 |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
2.117 val jobs' = jobs