1.1 --- a/src/Pure/Concurrent/future.ML Wed Feb 02 13:44:40 2011 +0100
1.2 +++ b/src/Pure/Concurrent/future.ML Wed Feb 02 15:04:09 2011 +0100
1.3 @@ -32,19 +32,16 @@
1.4
1.5 signature FUTURE =
1.6 sig
1.7 - type task = Task_Queue.task
1.8 - type group = Task_Queue.group
1.9 - val is_worker: unit -> bool
1.10 val worker_task: unit -> Task_Queue.task option
1.11 val worker_group: unit -> Task_Queue.group option
1.12 val worker_subgroup: unit -> Task_Queue.group
1.13 type 'a future
1.14 - val task_of: 'a future -> task
1.15 - val group_of: 'a future -> group
1.16 + val task_of: 'a future -> Task_Queue.task
1.17 val peek: 'a future -> 'a Exn.result option
1.18 val is_finished: 'a future -> bool
1.19 - val forks: {name: string, group: group option, deps: task list, pri: int} ->
1.20 - (unit -> 'a) list -> 'a future list
1.21 + val forks:
1.22 + {name: string, group: Task_Queue.group option, deps: Task_Queue.task list, pri: int} ->
1.23 + (unit -> 'a) list -> 'a future list
1.24 val fork_pri: int -> (unit -> 'a) -> 'a future
1.25 val fork: (unit -> 'a) -> 'a future
1.26 val join_results: 'a future list -> 'a Exn.result list
1.27 @@ -52,12 +49,12 @@
1.28 val join: 'a future -> 'a
1.29 val value: 'a -> 'a future
1.30 val map: ('a -> 'b) -> 'a future -> 'b future
1.31 - val promise_group: group -> 'a future
1.32 + val promise_group: Task_Queue.group -> 'a future
1.33 val promise: unit -> 'a future
1.34 val fulfill_result: 'a future -> 'a Exn.result -> unit
1.35 val fulfill: 'a future -> 'a -> unit
1.36 val interruptible_task: ('a -> 'b) -> 'a -> 'b
1.37 - val cancel_group: group -> unit
1.38 + val cancel_group: Task_Queue.group -> unit
1.39 val cancel: 'a future -> unit
1.40 val shutdown: unit -> unit
1.41 val status: (unit -> 'a) -> 'a
1.42 @@ -70,20 +67,15 @@
1.43
1.44 (* identifiers *)
1.45
1.46 -type task = Task_Queue.task;
1.47 -type group = Task_Queue.group;
1.48 -
1.49 local
1.50 - val tag = Universal.tag () : (task * group) option Universal.tag;
1.51 + val tag = Universal.tag () : Task_Queue.task option Universal.tag;
1.52 in
1.53 - fun thread_data () = the_default NONE (Thread.getLocal tag);
1.54 - fun setmp_thread_data data f x =
1.55 - Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
1.56 + fun worker_task () = the_default NONE (Thread.getLocal tag);
1.57 + fun setmp_worker_task data f x =
1.58 + Library.setmp_thread_data tag (worker_task ()) (SOME data) f x;
1.59 end;
1.60
1.61 -val is_worker = is_some o thread_data;
1.62 -val worker_task = Option.map #1 o thread_data;
1.63 -val worker_group = Option.map #2 o thread_data;
1.64 +val worker_group = Option.map Task_Queue.group_of_task o worker_task;
1.65 fun worker_subgroup () = Task_Queue.new_group (worker_group ());
1.66
1.67 fun worker_joining e =
1.68 @@ -103,12 +95,10 @@
1.69
1.70 datatype 'a future = Future of
1.71 {promised: bool,
1.72 - task: task,
1.73 - group: group,
1.74 + task: Task_Queue.task,
1.75 result: 'a result};
1.76
1.77 fun task_of (Future {task, ...}) = task;
1.78 -fun group_of (Future {group, ...}) = group;
1.79 fun result_of (Future {result, ...}) = result;
1.80
1.81 fun peek x = Single_Assignment.peek (result_of x);
1.82 @@ -204,12 +194,13 @@
1.83 (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
1.84 broadcast scheduler_event);
1.85
1.86 -fun execute (task, group, jobs) =
1.87 +fun execute (task, jobs) =
1.88 let
1.89 + val group = Task_Queue.group_of_task task;
1.90 val valid = not (Task_Queue.is_canceled group);
1.91 val ok =
1.92 Task_Queue.running task (fn () =>
1.93 - setmp_thread_data (task, group) (fn () =>
1.94 + setmp_worker_task task (fn () =>
1.95 fold (fn job => fn ok => job valid andalso ok) jobs true) ());
1.96 val _ = Multithreading.tracing 1 (fn () =>
1.97 let
1.98 @@ -416,7 +407,7 @@
1.99 let
1.100 val (result, job) = future_job grp e;
1.101 val ((task, minimal'), queue') = Task_Queue.enqueue name grp deps pri job queue;
1.102 - val future = Future {promised = false, task = task, group = grp, result = result};
1.103 + val future = Future {promised = false, task = task, result = result};
1.104 in (future, (minimal orelse minimal', queue')) end;
1.105 in
1.106 SYNCHRONIZED "enqueue" (fn () =>
1.107 @@ -443,7 +434,7 @@
1.108 NONE => Exn.Exn (Fail "Unfinished future")
1.109 | SOME res =>
1.110 if Exn.is_interrupt_exn res then
1.111 - (case Exn.flatten_list (Task_Queue.group_status (group_of x)) of
1.112 + (case Exn.flatten_list (Task_Queue.group_status (Task_Queue.group_of_task (task_of x))) of
1.113 [] => res
1.114 | exns => Exn.Exn (Exn.EXCEPTIONS exns))
1.115 else res);
1.116 @@ -470,7 +461,7 @@
1.117 if forall is_finished xs then ()
1.118 else if Multithreading.self_critical () then
1.119 error "Cannot join future values within critical section"
1.120 - else if is_some (thread_data ()) then
1.121 + else if is_some (worker_task ()) then
1.122 join_work (Task_Queue.init_deps (map task_of xs))
1.123 else List.app (ignore o Single_Assignment.await o result_of) xs;
1.124 in map get_result xs end;
1.125 @@ -485,15 +476,16 @@
1.126
1.127 fun value (x: 'a) =
1.128 let
1.129 - val group = Task_Queue.new_group NONE;
1.130 + val task = Task_Queue.dummy_task ();
1.131 + val group = Task_Queue.group_of_task task;
1.132 val result = Single_Assignment.var "value" : 'a result;
1.133 val _ = assign_result group result (Exn.Result x);
1.134 - in Future {promised = false, task = Task_Queue.dummy_task, group = group, result = result} end;
1.135 + in Future {promised = false, task = task, result = result} end;
1.136
1.137 fun map_future f x =
1.138 let
1.139 val task = task_of x;
1.140 - val group = Task_Queue.new_group (SOME (group_of x));
1.141 + val group = Task_Queue.new_group (SOME (Task_Queue.group_of_task task));
1.142 val (result, job) = future_job group (fn () => f (join x));
1.143
1.144 val extended = SYNCHRONIZED "extend" (fn () =>
1.145 @@ -501,7 +493,7 @@
1.146 SOME queue' => (queue := queue'; true)
1.147 | NONE => false));
1.148 in
1.149 - if extended then Future {promised = false, task = task, group = group, result = result}
1.150 + if extended then Future {promised = false, task = task, result = result}
1.151 else
1.152 singleton
1.153 (forks {name = "Future.map", group = SOME group,
1.154 @@ -522,14 +514,15 @@
1.155 else reraise exn;
1.156 val task = SYNCHRONIZED "enqueue_passive" (fn () =>
1.157 Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));
1.158 - in Future {promised = true, task = task, group = group, result = result} end;
1.159 + in Future {promised = true, task = task, result = result} end;
1.160
1.161 fun promise () = promise_group (worker_subgroup ());
1.162
1.163 -fun fulfill_result (Future {promised, task, group, result}) res =
1.164 +fun fulfill_result (Future {promised, task, result}) res =
1.165 if not promised then raise Fail "Not a promised future"
1.166 else
1.167 let
1.168 + val group = Task_Queue.group_of_task task;
1.169 fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
1.170 val _ =
1.171 Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
1.172 @@ -538,7 +531,7 @@
1.173 SYNCHRONIZED "fulfill_result" (fn () =>
1.174 Unsynchronized.change_result queue
1.175 (Task_Queue.dequeue_passive (Thread.self ()) task));
1.176 - in if still_passive then execute (task, group, [job]) else () end);
1.177 + in if still_passive then execute (task, [job]) else () end);
1.178 val _ =
1.179 worker_waiting (Task_Queue.init_deps [task])
1.180 (fn () => Single_Assignment.await result);
1.181 @@ -552,7 +545,7 @@
1.182 fun interruptible_task f x =
1.183 if Multithreading.available then
1.184 Multithreading.with_attributes
1.185 - (if is_worker ()
1.186 + (if is_some (worker_task ())
1.187 then Multithreading.private_interrupts
1.188 else Multithreading.public_interrupts)
1.189 (fn _ => f x)
1.190 @@ -563,7 +556,7 @@
1.191 (if cancel_now group then () else cancel_later group;
1.192 signal work_available; scheduler_check ()));
1.193
1.194 -fun cancel x = cancel_group (group_of x);
1.195 +fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
1.196
1.197
1.198 (* shutdown *)
2.1 --- a/src/Pure/Concurrent/task_queue.ML Wed Feb 02 13:44:40 2011 +0100
2.2 +++ b/src/Pure/Concurrent/task_queue.ML Wed Feb 02 15:04:09 2011 +0100
2.3 @@ -6,12 +6,6 @@
2.4
2.5 signature TASK_QUEUE =
2.6 sig
2.7 - type task
2.8 - val dummy_task: task
2.9 - val name_of_task: task -> string
2.10 - val pri_of_task: task -> int
2.11 - val str_of_task: task -> string
2.12 - val timing_of_task: task -> Time.time * Time.time * string list
2.13 type group
2.14 val new_group: group option -> group
2.15 val group_id: group -> int
2.16 @@ -20,6 +14,13 @@
2.17 val is_canceled: group -> bool
2.18 val group_status: group -> exn list
2.19 val str_of_group: group -> string
2.20 + type task
2.21 + val dummy_task: unit -> task
2.22 + val group_of_task: task -> group
2.23 + val name_of_task: task -> string
2.24 + val pri_of_task: task -> int
2.25 + val str_of_task: task -> string
2.26 + val timing_of_task: task -> Time.time * Time.time * string list
2.27 type queue
2.28 val empty: queue
2.29 val all_passive: queue -> bool
2.30 @@ -32,12 +33,12 @@
2.31 queue -> (task * bool) * queue
2.32 val extend: task -> (bool -> bool) -> queue -> queue option
2.33 val dequeue_passive: Thread.thread -> task -> queue -> bool * queue
2.34 - val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue
2.35 + val dequeue: Thread.thread -> queue -> (task * (bool -> bool) list) option * queue
2.36 type deps
2.37 val init_deps: task list -> deps
2.38 val finished_deps: deps -> bool
2.39 val dequeue_deps: Thread.thread -> deps -> queue ->
2.40 - (((task * group * (bool -> bool) list) option * deps) * queue)
2.41 + (((task * (bool -> bool) list) option * deps) * queue)
2.42 val running: task -> (unit -> 'a) -> 'a
2.43 val joining: task -> (unit -> 'a) -> 'a
2.44 val waiting: task -> deps -> (unit -> 'a) -> 'a
2.45 @@ -49,7 +50,48 @@
2.46 val new_id = Synchronized.counter ();
2.47
2.48
2.49 -(** grouped tasks **)
2.50 +(** nested groups of tasks **)
2.51 +
2.52 +(* groups *)
2.53 +
2.54 +abstype group = Group of
2.55 + {parent: group option,
2.56 + id: int,
2.57 + status: exn list Synchronized.var}
2.58 +with
2.59 +
2.60 +fun make_group (parent, id, status) = Group {parent = parent, id = id, status = status};
2.61 +
2.62 +fun new_group parent = make_group (parent, new_id (), Synchronized.var "group" []);
2.63 +
2.64 +fun group_id (Group {id, ...}) = id;
2.65 +fun eq_group (group1, group2) = group_id group1 = group_id group2;
2.66 +
2.67 +fun group_ancestry (Group {parent, id, ...}) =
2.68 + id :: (case parent of NONE => [] | SOME group => group_ancestry group);
2.69 +
2.70 +
2.71 +(* group status *)
2.72 +
2.73 +fun cancel_group (Group {status, ...}) exn =
2.74 + Synchronized.change status
2.75 + (fn exns =>
2.76 + if not (Exn.is_interrupt exn) orelse null exns then exn :: exns
2.77 + else exns);
2.78 +
2.79 +fun is_canceled (Group {parent, status, ...}) =
2.80 + not (null (Synchronized.value status)) orelse
2.81 + (case parent of NONE => false | SOME group => is_canceled group);
2.82 +
2.83 +fun group_status (Group {parent, status, ...}) =
2.84 + Synchronized.value status @
2.85 + (case parent of NONE => [] | SOME group => group_status group);
2.86 +
2.87 +fun str_of_group group =
2.88 + (is_canceled group ? enclose "(" ")") (string_of_int (group_id group));
2.89 +
2.90 +end;
2.91 +
2.92
2.93 (* tasks *)
2.94
2.95 @@ -59,15 +101,20 @@
2.96 Synchronized.var "timing" ((Time.zeroTime, Time.zeroTime, []): timing);
2.97
2.98 abstype task = Task of
2.99 - {name: string,
2.100 + {group: group,
2.101 + name: string,
2.102 id: int,
2.103 pri: int option,
2.104 timing: timing Synchronized.var}
2.105 with
2.106
2.107 -val dummy_task = Task {name = "", id = ~1, pri = NONE, timing = new_timing ()};
2.108 -fun new_task name pri = Task {name = name, id = new_id (), pri = pri, timing = new_timing ()};
2.109 +fun dummy_task () =
2.110 + Task {group = new_group NONE, name = "", id = 0, pri = NONE, timing = new_timing ()};
2.111
2.112 +fun new_task group name pri =
2.113 + Task {group = group, name = name, id = new_id (), pri = pri, timing = new_timing ()};
2.114 +
2.115 +fun group_of_task (Task {group, ...}) = group;
2.116 fun name_of_task (Task {name, ...}) = name;
2.117 fun pri_of_task (Task {pri, ...}) = the_default 0 pri;
2.118 fun str_of_task (Task {name, id, ...}) =
2.119 @@ -93,46 +140,6 @@
2.120 structure Task_Graph = Graph(type key = task val ord = task_ord);
2.121
2.122
2.123 -(* nested groups *)
2.124 -
2.125 -abstype group = Group of
2.126 - {parent: group option,
2.127 - id: int,
2.128 - status: exn list Synchronized.var}
2.129 -with
2.130 -
2.131 -fun make_group (parent, id, status) = Group {parent = parent, id = id, status = status};
2.132 -
2.133 -fun new_group parent = make_group (parent, new_id (), Synchronized.var "group" []);
2.134 -
2.135 -fun group_id (Group {id, ...}) = id;
2.136 -fun eq_group (group1, group2) = group_id group1 = group_id group2;
2.137 -
2.138 -fun group_ancestry (Group {parent, id, ...}) =
2.139 - id :: (case parent of NONE => [] | SOME group => group_ancestry group);
2.140 -
2.141 -
2.142 -(* group status *)
2.143 -
2.144 -fun cancel_group (Group {status, ...}) exn =
2.145 - Synchronized.change status
2.146 - (fn exns =>
2.147 - if not (Exn.is_interrupt exn) orelse null exns then exn :: exns
2.148 - else exns);
2.149 -
2.150 -fun is_canceled (Group {parent, status, ...}) =
2.151 - not (null (Synchronized.value status)) orelse
2.152 - (case parent of NONE => false | SOME group => is_canceled group);
2.153 -
2.154 -fun group_status (Group {parent, status, ...}) =
2.155 - Synchronized.value status @
2.156 - (case parent of NONE => [] | SOME group => group_status group);
2.157 -
2.158 -fun str_of_group group =
2.159 - (is_canceled group ? enclose "(" ")") (string_of_int (group_id group));
2.160 -
2.161 -end;
2.162 -
2.163
2.164 (** queue of jobs and groups **)
2.165
2.166 @@ -143,11 +150,10 @@
2.167 Running of Thread.thread |
2.168 Passive of unit -> bool;
2.169
2.170 -type jobs = (group * job) Task_Graph.T;
2.171 +type jobs = job Task_Graph.T;
2.172
2.173 -fun get_group (jobs: jobs) task = #1 (Task_Graph.get_node jobs task);
2.174 -fun get_job (jobs: jobs) task = #2 (Task_Graph.get_node jobs task);
2.175 -fun set_job task job (jobs: jobs) = Task_Graph.map_node task (fn (group, _) => (group, job)) jobs;
2.176 +fun get_job (jobs: jobs) task = Task_Graph.get_node jobs task;
2.177 +fun set_job task job (jobs: jobs) = Task_Graph.map_node task (K job) jobs;
2.178
2.179 fun add_job task dep (jobs: jobs) =
2.180 Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
2.181 @@ -169,18 +175,18 @@
2.182
2.183 (* job status *)
2.184
2.185 -fun ready_job task ((group, Job list), ([], _)) = SOME (task, group, rev list)
2.186 - | ready_job task ((group, Passive abort), ([], _)) =
2.187 - if is_canceled group then SOME (task, group, [fn _ => abort ()])
2.188 +fun ready_job task (Job list, ([], _)) = SOME (task, rev list)
2.189 + | ready_job task (Passive abort, ([], _)) =
2.190 + if is_canceled (group_of_task task) then SOME (task, [fn _ => abort ()])
2.191 else NONE
2.192 | ready_job _ _ = NONE;
2.193
2.194 -fun active_job (_, Job _) = SOME ()
2.195 - | active_job (_, Running _) = SOME ()
2.196 - | active_job (group, Passive _) = if is_canceled group then SOME () else NONE;
2.197 +fun active_job (_, (Job _, _)) = SOME ()
2.198 + | active_job (_, (Running _, _)) = SOME ()
2.199 + | active_job (task, (Passive _, _)) =
2.200 + if is_canceled (group_of_task task) then SOME () else NONE;
2.201
2.202 -fun all_passive (Queue {jobs, ...}) =
2.203 - is_none (Task_Graph.get_first (active_job o #1 o #2) jobs);
2.204 +fun all_passive (Queue {jobs, ...}) = is_none (Task_Graph.get_first active_job jobs);
2.205
2.206
2.207 (* queue status *)
2.208 @@ -188,7 +194,7 @@
2.209 fun status (Queue {jobs, ...}) =
2.210 let
2.211 val (x, y, z, w) =
2.212 - Task_Graph.fold (fn (_, ((_, job), (deps, _))) => fn (x, y, z, w) =>
2.213 + Task_Graph.fold (fn (_, (job, (deps, _))) => fn (x, y, z, w) =>
2.214 (case job of
2.215 Job _ => if null deps then (x + 1, y, z, w) else (x, y + 1, z, w)
2.216 | Running _ => (x, y, z + 1, w)
2.217 @@ -213,12 +219,16 @@
2.218
2.219 fun cancel_all (Queue {jobs, ...}) =
2.220 let
2.221 - fun cancel_job (group, job) (groups, running) =
2.222 - (cancel_group group Exn.Interrupt;
2.223 + fun cancel_job (task, (job, _)) (groups, running) =
2.224 + let
2.225 + val group = group_of_task task;
2.226 + val _ = cancel_group group Exn.Interrupt;
2.227 + in
2.228 (case job of
2.229 Running t => (insert eq_group group groups, insert Thread.equal t running)
2.230 - | _ => (groups, running)));
2.231 - val (running_groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
2.232 + | _ => (groups, running))
2.233 + end;
2.234 + val (running_groups, running) = Task_Graph.fold cancel_job jobs ([], []);
2.235 val _ = List.app Simple_Thread.interrupt running;
2.236 in running_groups end;
2.237
2.238 @@ -227,7 +237,7 @@
2.239
2.240 fun finish task (Queue {groups, jobs}) =
2.241 let
2.242 - val group = get_group jobs task;
2.243 + val group = group_of_task task;
2.244 val groups' = groups
2.245 |> fold (fn gid => Inttab.remove_list eq_task (gid, task)) (group_ancestry group);
2.246 val jobs' = Task_Graph.del_node task jobs;
2.247 @@ -239,19 +249,19 @@
2.248
2.249 fun enqueue_passive group abort (Queue {groups, jobs}) =
2.250 let
2.251 - val task = new_task "passive" NONE;
2.252 + val task = new_task group "passive" NONE;
2.253 val groups' = groups
2.254 |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
2.255 - val jobs' = jobs |> Task_Graph.new_node (task, (group, Passive abort));
2.256 + val jobs' = jobs |> Task_Graph.new_node (task, Passive abort);
2.257 in (task, make_queue groups' jobs') end;
2.258
2.259 fun enqueue name group deps pri job (Queue {groups, jobs}) =
2.260 let
2.261 - val task = new_task name (SOME pri);
2.262 + val task = new_task group name (SOME pri);
2.263 val groups' = groups
2.264 |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
2.265 val jobs' = jobs
2.266 - |> Task_Graph.new_node (task, (group, Job [job]))
2.267 + |> Task_Graph.new_node (task, Job [job])
2.268 |> fold (add_job task) deps
2.269 |> fold (fold (add_job task) o get_deps jobs) deps;
2.270 val minimal = null (get_deps jobs' task);
2.271 @@ -274,7 +284,7 @@
2.272
2.273 fun dequeue thread (queue as Queue {groups, jobs}) =
2.274 (case Task_Graph.get_first (uncurry ready_job) jobs of
2.275 - SOME (result as (task, _, _)) =>
2.276 + SOME (result as (task, _)) =>
2.277 let val jobs' = set_job task (Running thread) jobs
2.278 in (SOME result, make_queue groups jobs') end
2.279 | NONE => (NONE, queue));
2.280 @@ -294,7 +304,7 @@
2.281 let
2.282 fun ready task = ready_job task (Task_Graph.get_entry jobs task);
2.283 val tasks = filter (can (Task_Graph.get_node jobs)) deps;
2.284 - fun result (res as (task, _, _)) =
2.285 + fun result (res as (task, _)) =
2.286 let val jobs' = set_job task (Running thread) jobs
2.287 in ((SOME res, Deps tasks), make_queue groups jobs') end;
2.288 in
3.1 --- a/src/Pure/goal.ML Wed Feb 02 13:44:40 2011 +0100
3.2 +++ b/src/Pure/goal.ML Wed Feb 02 15:04:09 2011 +0100
3.3 @@ -117,7 +117,7 @@
3.4 val parallel_proofs = Unsynchronized.ref 1;
3.5
3.6 fun future_enabled () =
3.7 - Multithreading.enabled () andalso Future.is_worker () andalso ! parallel_proofs >= 1;
3.8 + Multithreading.enabled () andalso is_some (Future.worker_task ()) andalso ! parallel_proofs >= 1;
3.9
3.10 fun local_future_enabled () = future_enabled () andalso ! parallel_proofs >= 2;
3.11