1.1 --- a/src/Pure/Concurrent/task_queue.ML Wed Feb 02 13:44:40 2011 +0100
1.2 +++ b/src/Pure/Concurrent/task_queue.ML Wed Feb 02 15:04:09 2011 +0100
1.3 @@ -6,12 +6,6 @@
1.4
1.5 signature TASK_QUEUE =
1.6 sig
1.7 - type task
1.8 - val dummy_task: task
1.9 - val name_of_task: task -> string
1.10 - val pri_of_task: task -> int
1.11 - val str_of_task: task -> string
1.12 - val timing_of_task: task -> Time.time * Time.time * string list
1.13 type group
1.14 val new_group: group option -> group
1.15 val group_id: group -> int
1.16 @@ -20,6 +14,13 @@
1.17 val is_canceled: group -> bool
1.18 val group_status: group -> exn list
1.19 val str_of_group: group -> string
1.20 + type task
1.21 + val dummy_task: unit -> task
1.22 + val group_of_task: task -> group
1.23 + val name_of_task: task -> string
1.24 + val pri_of_task: task -> int
1.25 + val str_of_task: task -> string
1.26 + val timing_of_task: task -> Time.time * Time.time * string list
1.27 type queue
1.28 val empty: queue
1.29 val all_passive: queue -> bool
1.30 @@ -32,12 +33,12 @@
1.31 queue -> (task * bool) * queue
1.32 val extend: task -> (bool -> bool) -> queue -> queue option
1.33 val dequeue_passive: Thread.thread -> task -> queue -> bool * queue
1.34 - val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue
1.35 + val dequeue: Thread.thread -> queue -> (task * (bool -> bool) list) option * queue
1.36 type deps
1.37 val init_deps: task list -> deps
1.38 val finished_deps: deps -> bool
1.39 val dequeue_deps: Thread.thread -> deps -> queue ->
1.40 - (((task * group * (bool -> bool) list) option * deps) * queue)
1.41 + (((task * (bool -> bool) list) option * deps) * queue)
1.42 val running: task -> (unit -> 'a) -> 'a
1.43 val joining: task -> (unit -> 'a) -> 'a
1.44 val waiting: task -> deps -> (unit -> 'a) -> 'a
1.45 @@ -49,7 +50,48 @@
1.46 val new_id = Synchronized.counter ();
1.47
1.48
1.49 -(** grouped tasks **)
1.50 +(** nested groups of tasks **)
1.51 +
1.52 +(* groups *)
1.53 +
1.54 +abstype group = Group of
1.55 + {parent: group option,
1.56 + id: int,
1.57 + status: exn list Synchronized.var}
1.58 +with
1.59 +
1.60 +fun make_group (parent, id, status) = Group {parent = parent, id = id, status = status};
1.61 +
1.62 +fun new_group parent = make_group (parent, new_id (), Synchronized.var "group" []);
1.63 +
1.64 +fun group_id (Group {id, ...}) = id;
1.65 +fun eq_group (group1, group2) = group_id group1 = group_id group2;
1.66 +
1.67 +fun group_ancestry (Group {parent, id, ...}) =
1.68 + id :: (case parent of NONE => [] | SOME group => group_ancestry group);
1.69 +
1.70 +
1.71 +(* group status *)
1.72 +
1.73 +fun cancel_group (Group {status, ...}) exn =
1.74 + Synchronized.change status
1.75 + (fn exns =>
1.76 + if not (Exn.is_interrupt exn) orelse null exns then exn :: exns
1.77 + else exns);
1.78 +
1.79 +fun is_canceled (Group {parent, status, ...}) =
1.80 + not (null (Synchronized.value status)) orelse
1.81 + (case parent of NONE => false | SOME group => is_canceled group);
1.82 +
1.83 +fun group_status (Group {parent, status, ...}) =
1.84 + Synchronized.value status @
1.85 + (case parent of NONE => [] | SOME group => group_status group);
1.86 +
1.87 +fun str_of_group group =
1.88 + (is_canceled group ? enclose "(" ")") (string_of_int (group_id group));
1.89 +
1.90 +end;
1.91 +
1.92
1.93 (* tasks *)
1.94
1.95 @@ -59,15 +101,20 @@
1.96 Synchronized.var "timing" ((Time.zeroTime, Time.zeroTime, []): timing);
1.97
1.98 abstype task = Task of
1.99 - {name: string,
1.100 + {group: group,
1.101 + name: string,
1.102 id: int,
1.103 pri: int option,
1.104 timing: timing Synchronized.var}
1.105 with
1.106
1.107 -val dummy_task = Task {name = "", id = ~1, pri = NONE, timing = new_timing ()};
1.108 -fun new_task name pri = Task {name = name, id = new_id (), pri = pri, timing = new_timing ()};
1.109 +fun dummy_task () =
1.110 + Task {group = new_group NONE, name = "", id = 0, pri = NONE, timing = new_timing ()};
1.111
1.112 +fun new_task group name pri =
1.113 + Task {group = group, name = name, id = new_id (), pri = pri, timing = new_timing ()};
1.114 +
1.115 +fun group_of_task (Task {group, ...}) = group;
1.116 fun name_of_task (Task {name, ...}) = name;
1.117 fun pri_of_task (Task {pri, ...}) = the_default 0 pri;
1.118 fun str_of_task (Task {name, id, ...}) =
1.119 @@ -93,46 +140,6 @@
1.120 structure Task_Graph = Graph(type key = task val ord = task_ord);
1.121
1.122
1.123 -(* nested groups *)
1.124 -
1.125 -abstype group = Group of
1.126 - {parent: group option,
1.127 - id: int,
1.128 - status: exn list Synchronized.var}
1.129 -with
1.130 -
1.131 -fun make_group (parent, id, status) = Group {parent = parent, id = id, status = status};
1.132 -
1.133 -fun new_group parent = make_group (parent, new_id (), Synchronized.var "group" []);
1.134 -
1.135 -fun group_id (Group {id, ...}) = id;
1.136 -fun eq_group (group1, group2) = group_id group1 = group_id group2;
1.137 -
1.138 -fun group_ancestry (Group {parent, id, ...}) =
1.139 - id :: (case parent of NONE => [] | SOME group => group_ancestry group);
1.140 -
1.141 -
1.142 -(* group status *)
1.143 -
1.144 -fun cancel_group (Group {status, ...}) exn =
1.145 - Synchronized.change status
1.146 - (fn exns =>
1.147 - if not (Exn.is_interrupt exn) orelse null exns then exn :: exns
1.148 - else exns);
1.149 -
1.150 -fun is_canceled (Group {parent, status, ...}) =
1.151 - not (null (Synchronized.value status)) orelse
1.152 - (case parent of NONE => false | SOME group => is_canceled group);
1.153 -
1.154 -fun group_status (Group {parent, status, ...}) =
1.155 - Synchronized.value status @
1.156 - (case parent of NONE => [] | SOME group => group_status group);
1.157 -
1.158 -fun str_of_group group =
1.159 - (is_canceled group ? enclose "(" ")") (string_of_int (group_id group));
1.160 -
1.161 -end;
1.162 -
1.163
1.164 (** queue of jobs and groups **)
1.165
1.166 @@ -143,11 +150,10 @@
1.167 Running of Thread.thread |
1.168 Passive of unit -> bool;
1.169
1.170 -type jobs = (group * job) Task_Graph.T;
1.171 +type jobs = job Task_Graph.T;
1.172
1.173 -fun get_group (jobs: jobs) task = #1 (Task_Graph.get_node jobs task);
1.174 -fun get_job (jobs: jobs) task = #2 (Task_Graph.get_node jobs task);
1.175 -fun set_job task job (jobs: jobs) = Task_Graph.map_node task (fn (group, _) => (group, job)) jobs;
1.176 +fun get_job (jobs: jobs) task = Task_Graph.get_node jobs task;
1.177 +fun set_job task job (jobs: jobs) = Task_Graph.map_node task (K job) jobs;
1.178
1.179 fun add_job task dep (jobs: jobs) =
1.180 Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
1.181 @@ -169,18 +175,18 @@
1.182
1.183 (* job status *)
1.184
1.185 -fun ready_job task ((group, Job list), ([], _)) = SOME (task, group, rev list)
1.186 - | ready_job task ((group, Passive abort), ([], _)) =
1.187 - if is_canceled group then SOME (task, group, [fn _ => abort ()])
1.188 +fun ready_job task (Job list, ([], _)) = SOME (task, rev list)
1.189 + | ready_job task (Passive abort, ([], _)) =
1.190 + if is_canceled (group_of_task task) then SOME (task, [fn _ => abort ()])
1.191 else NONE
1.192 | ready_job _ _ = NONE;
1.193
1.194 -fun active_job (_, Job _) = SOME ()
1.195 - | active_job (_, Running _) = SOME ()
1.196 - | active_job (group, Passive _) = if is_canceled group then SOME () else NONE;
1.197 +fun active_job (_, (Job _, _)) = SOME ()
1.198 + | active_job (_, (Running _, _)) = SOME ()
1.199 + | active_job (task, (Passive _, _)) =
1.200 + if is_canceled (group_of_task task) then SOME () else NONE;
1.201
1.202 -fun all_passive (Queue {jobs, ...}) =
1.203 - is_none (Task_Graph.get_first (active_job o #1 o #2) jobs);
1.204 +fun all_passive (Queue {jobs, ...}) = is_none (Task_Graph.get_first active_job jobs);
1.205
1.206
1.207 (* queue status *)
1.208 @@ -188,7 +194,7 @@
1.209 fun status (Queue {jobs, ...}) =
1.210 let
1.211 val (x, y, z, w) =
1.212 - Task_Graph.fold (fn (_, ((_, job), (deps, _))) => fn (x, y, z, w) =>
1.213 + Task_Graph.fold (fn (_, (job, (deps, _))) => fn (x, y, z, w) =>
1.214 (case job of
1.215 Job _ => if null deps then (x + 1, y, z, w) else (x, y + 1, z, w)
1.216 | Running _ => (x, y, z + 1, w)
1.217 @@ -213,12 +219,16 @@
1.218
1.219 fun cancel_all (Queue {jobs, ...}) =
1.220 let
1.221 - fun cancel_job (group, job) (groups, running) =
1.222 - (cancel_group group Exn.Interrupt;
1.223 + fun cancel_job (task, (job, _)) (groups, running) =
1.224 + let
1.225 + val group = group_of_task task;
1.226 + val _ = cancel_group group Exn.Interrupt;
1.227 + in
1.228 (case job of
1.229 Running t => (insert eq_group group groups, insert Thread.equal t running)
1.230 - | _ => (groups, running)));
1.231 - val (running_groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
1.232 + | _ => (groups, running))
1.233 + end;
1.234 + val (running_groups, running) = Task_Graph.fold cancel_job jobs ([], []);
1.235 val _ = List.app Simple_Thread.interrupt running;
1.236 in running_groups end;
1.237
1.238 @@ -227,7 +237,7 @@
1.239
1.240 fun finish task (Queue {groups, jobs}) =
1.241 let
1.242 - val group = get_group jobs task;
1.243 + val group = group_of_task task;
1.244 val groups' = groups
1.245 |> fold (fn gid => Inttab.remove_list eq_task (gid, task)) (group_ancestry group);
1.246 val jobs' = Task_Graph.del_node task jobs;
1.247 @@ -239,19 +249,19 @@
1.248
1.249 fun enqueue_passive group abort (Queue {groups, jobs}) =
1.250 let
1.251 - val task = new_task "passive" NONE;
1.252 + val task = new_task group "passive" NONE;
1.253 val groups' = groups
1.254 |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
1.255 - val jobs' = jobs |> Task_Graph.new_node (task, (group, Passive abort));
1.256 + val jobs' = jobs |> Task_Graph.new_node (task, Passive abort);
1.257 in (task, make_queue groups' jobs') end;
1.258
1.259 fun enqueue name group deps pri job (Queue {groups, jobs}) =
1.260 let
1.261 - val task = new_task name (SOME pri);
1.262 + val task = new_task group name (SOME pri);
1.263 val groups' = groups
1.264 |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
1.265 val jobs' = jobs
1.266 - |> Task_Graph.new_node (task, (group, Job [job]))
1.267 + |> Task_Graph.new_node (task, Job [job])
1.268 |> fold (add_job task) deps
1.269 |> fold (fold (add_job task) o get_deps jobs) deps;
1.270 val minimal = null (get_deps jobs' task);
1.271 @@ -274,7 +284,7 @@
1.272
1.273 fun dequeue thread (queue as Queue {groups, jobs}) =
1.274 (case Task_Graph.get_first (uncurry ready_job) jobs of
1.275 - SOME (result as (task, _, _)) =>
1.276 + SOME (result as (task, _)) =>
1.277 let val jobs' = set_job task (Running thread) jobs
1.278 in (SOME result, make_queue groups jobs') end
1.279 | NONE => (NONE, queue));
1.280 @@ -294,7 +304,7 @@
1.281 let
1.282 fun ready task = ready_job task (Task_Graph.get_entry jobs task);
1.283 val tasks = filter (can (Task_Graph.get_node jobs)) deps;
1.284 - fun result (res as (task, _, _)) =
1.285 + fun result (res as (task, _)) =
1.286 let val jobs' = set_job task (Running thread) jobs
1.287 in ((SOME res, Deps tasks), make_queue groups jobs') end;
1.288 in