1.1 --- a/src/Pure/Concurrent/task_queue.ML Tue Jul 21 20:24:02 2009 +0200
1.2 +++ b/src/Pure/Concurrent/task_queue.ML Tue Jul 21 20:37:31 2009 +0200
1.3 @@ -13,8 +13,8 @@
1.4 type group
1.5 val group_id: group -> int
1.6 val eq_group: group * group -> bool
1.7 - val new_group: unit -> group
1.8 - val group_exns: group -> exn list
1.9 + val new_group: group option -> group
1.10 + val group_status: group -> exn list
1.11 val str_of_group: group -> string
1.12 type queue
1.13 val empty: queue
1.14 @@ -27,6 +27,7 @@
1.15 (((task * group * (bool -> bool) list) * task list) option * queue)
1.16 val interrupt: queue -> task -> unit
1.17 val interrupt_external: queue -> string -> unit
1.18 + val is_canceled: group -> bool
1.19 val cancel_group: group -> exn -> unit
1.20 val cancel: queue -> group -> bool
1.21 val cancel_all: queue -> group list
1.22 @@ -48,19 +49,37 @@
1.23 structure Task_Graph = Graph(type key = task val ord = task_ord);
1.24
1.25
1.26 -(* groups *)
1.27 +(* nested groups *)
1.28
1.29 -datatype group = Group of serial * exn list ref;
1.30 +datatype group = Group of
1.31 + {parent: group option,
1.32 + id: serial,
1.33 + status: exn list ref};
1.34
1.35 -fun group_id (Group (gid, _)) = gid;
1.36 -fun eq_group (Group (gid1, _), Group (gid2, _)) = gid1 = gid2;
1.37 +fun make_group (parent, id, status) = Group {parent = parent, id = id, status = status};
1.38
1.39 -fun new_group () = Group (serial (), ref []);
1.40 +fun new_group parent = make_group (parent, serial (), ref []);
1.41
1.42 -fun group_exns (Group (_, ref exns)) = exns;
1.43 +fun group_id (Group {id, ...}) = id;
1.44 +fun eq_group (group1, group2) = group_id group1 = group_id group2;
1.45
1.46 -fun str_of_group (Group (i, ref exns)) =
1.47 - if null exns then string_of_int i else enclose "(" ")" (string_of_int i);
1.48 +fun group_ancestry (Group {parent, id, ...}) =
1.49 + id :: (case parent of NONE => [] | SOME group => group_ancestry group);
1.50 +
1.51 +
1.52 +fun cancel_group (Group {status, ...}) exn = CRITICAL (fn () =>
1.53 + (case exn of
1.54 + Exn.Interrupt => if null (! status) then status := [exn] else ()
1.55 + | _ => change status (cons exn)));
1.56 +
1.57 +fun group_status (Group {parent, status, ...}) = (*non-critical*)
1.58 + ! status @ (case parent of NONE => [] | SOME group => group_status group);
1.59 +
1.60 +fun is_canceled (Group {parent, status, ...}) = (*non-critical*)
1.61 + not (null (! status)) orelse (case parent of NONE => false | SOME group => is_canceled group);
1.62 +
1.63 +fun str_of_group group =
1.64 + (is_canceled group ? enclose "(" ")") (string_of_int (group_id group));
1.65
1.66
1.67 (* jobs *)
1.68 @@ -94,7 +113,7 @@
1.69 fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs;
1.70
1.71
1.72 -(* status *)
1.73 +(* queue status *)
1.74
1.75 fun status (Queue {jobs, ...}) =
1.76 let
1.77 @@ -107,14 +126,38 @@
1.78 in {ready = x, pending = y, running = z} end;
1.79
1.80
1.81 +(* cancel -- peers and sub-groups *)
1.82 +
1.83 +fun cancel (Queue {groups, jobs, ...}) group =
1.84 + let
1.85 + val _ = cancel_group group Exn.Interrupt;
1.86 + val tasks = Inttab.lookup_list groups (group_id group);
1.87 + val running =
1.88 + fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];
1.89 + val _ = List.app SimpleThread.interrupt running;
1.90 + in null running end;
1.91 +
1.92 +fun cancel_all (Queue {jobs, ...}) =
1.93 + let
1.94 + fun cancel_job (group, job) (groups, running) =
1.95 + (cancel_group group Exn.Interrupt;
1.96 + (case job of Running t => (insert eq_group group groups, insert Thread.equal t running)
1.97 + | _ => (groups, running)));
1.98 + val (groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
1.99 + val _ = List.app SimpleThread.interrupt running;
1.100 + in groups end;
1.101 +
1.102 +
1.103 (* enqueue *)
1.104
1.105 -fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs, cache}) =
1.106 +fun enqueue group deps pri job (Queue {groups, jobs, cache}) =
1.107 let
1.108 val task = new_task pri;
1.109 - val groups' = Inttab.cons_list (gid, task) groups;
1.110 + val groups' = groups
1.111 + |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
1.112 val jobs' = jobs
1.113 - |> Task_Graph.new_node (task, (group, Job [job])) |> fold (add_job task) deps;
1.114 + |> Task_Graph.new_node (task, (group, Job [job]))
1.115 + |> fold (add_job task) deps;
1.116 val cache' =
1.117 (case cache of
1.118 Result last =>
1.119 @@ -158,7 +201,8 @@
1.120 fun ready task =
1.121 (case Task_Graph.get_node jobs task of
1.122 (group, Job list) =>
1.123 - if null (Task_Graph.imm_preds jobs task) then SOME (task, group, rev list)
1.124 + if null (Task_Graph.imm_preds jobs task)
1.125 + then SOME (task, group, rev list)
1.126 else NONE
1.127 | _ => NONE);
1.128
1.129 @@ -181,7 +225,9 @@
1.130 (* sporadic interrupts *)
1.131
1.132 fun interrupt (Queue {jobs, ...}) task =
1.133 - (case try (get_job jobs) task of SOME (Running thread) => SimpleThread.interrupt thread | _ => ());
1.134 + (case try (get_job jobs) task of
1.135 + SOME (Running thread) => SimpleThread.interrupt thread
1.136 + | _ => ());
1.137
1.138 fun interrupt_external (queue as Queue {jobs, ...}) str =
1.139 (case Int.fromString str of
1.140 @@ -194,33 +240,11 @@
1.141
1.142 (* termination *)
1.143
1.144 -fun cancel_group (Group (_, r)) exn = CRITICAL (fn () =>
1.145 - (case exn of
1.146 - Exn.Interrupt => if null (! r) then r := [exn] else ()
1.147 - | _ => change r (cons exn)));
1.148 -
1.149 -fun cancel (Queue {groups, jobs, ...}) (group as Group (gid, _)) =
1.150 - let
1.151 - val _ = cancel_group group Exn.Interrupt;
1.152 - val tasks = Inttab.lookup_list groups gid;
1.153 - val running = fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];
1.154 - val _ = List.app SimpleThread.interrupt running;
1.155 - in null running end;
1.156 -
1.157 -fun cancel_all (Queue {jobs, ...}) =
1.158 - let
1.159 - fun cancel_job (group, job) (groups, running) =
1.160 - (cancel_group group Exn.Interrupt;
1.161 - (case job of Running t => (insert eq_group group groups, insert Thread.equal t running)
1.162 - | _ => (groups, running)));
1.163 - val (groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
1.164 - val _ = List.app SimpleThread.interrupt running;
1.165 - in groups end;
1.166 -
1.167 fun finish task (Queue {groups, jobs, cache}) =
1.168 let
1.169 - val Group (gid, _) = get_group jobs task;
1.170 - val groups' = Inttab.remove_list (op =) (gid, task) groups;
1.171 + val group = get_group jobs task;
1.172 + val groups' = groups
1.173 + |> fold (fn gid => Inttab.remove_list (op =) (gid, task)) (group_ancestry group);
1.174 val jobs' = Task_Graph.del_node task jobs;
1.175 val cache' =
1.176 if null (Task_Graph.imm_succs jobs task) then cache