src/Pure/Concurrent/task_queue.ML
changeset 32121 e25107ff4f56
parent 32119 5382c93108db
child 32190 4fc7a882b41e
     1.1 --- a/src/Pure/Concurrent/task_queue.ML	Tue Jul 21 20:24:02 2009 +0200
     1.2 +++ b/src/Pure/Concurrent/task_queue.ML	Tue Jul 21 20:37:31 2009 +0200
     1.3 @@ -13,8 +13,8 @@
     1.4    type group
     1.5    val group_id: group -> int
     1.6    val eq_group: group * group -> bool
     1.7 -  val new_group: unit -> group
     1.8 -  val group_exns: group -> exn list
     1.9 +  val new_group: group option -> group
    1.10 +  val group_status: group -> exn list
    1.11    val str_of_group: group -> string
    1.12    type queue
    1.13    val empty: queue
    1.14 @@ -27,6 +27,7 @@
    1.15      (((task * group * (bool -> bool) list) * task list) option * queue)
    1.16    val interrupt: queue -> task -> unit
    1.17    val interrupt_external: queue -> string -> unit
    1.18 +  val is_canceled: group -> bool
    1.19    val cancel_group: group -> exn -> unit
    1.20    val cancel: queue -> group -> bool
    1.21    val cancel_all: queue -> group list
    1.22 @@ -48,19 +49,37 @@
    1.23  structure Task_Graph = Graph(type key = task val ord = task_ord);
    1.24  
    1.25  
    1.26 -(* groups *)
    1.27 +(* nested groups *)
    1.28  
    1.29 -datatype group = Group of serial * exn list ref;
    1.30 +datatype group = Group of
    1.31 + {parent: group option,
    1.32 +  id: serial,
    1.33 +  status: exn list ref};
    1.34  
    1.35 -fun group_id (Group (gid, _)) = gid;
    1.36 -fun eq_group (Group (gid1, _), Group (gid2, _)) = gid1 = gid2;
    1.37 +fun make_group (parent, id, status) = Group {parent = parent, id = id, status = status};
    1.38  
    1.39 -fun new_group () = Group (serial (), ref []);
    1.40 +fun new_group parent = make_group (parent, serial (), ref []);
    1.41  
    1.42 -fun group_exns (Group (_, ref exns)) = exns;
    1.43 +fun group_id (Group {id, ...}) = id;
    1.44 +fun eq_group (group1, group2) = group_id group1 = group_id group2;
    1.45  
    1.46 -fun str_of_group (Group (i, ref exns)) =
    1.47 -  if null exns then string_of_int i else enclose "(" ")" (string_of_int i);
    1.48 +fun group_ancestry (Group {parent, id, ...}) =
    1.49 +  id :: (case parent of NONE => [] | SOME group => group_ancestry group);
    1.50 +
    1.51 +
    1.52 +fun cancel_group (Group {status, ...}) exn = CRITICAL (fn () =>
    1.53 +  (case exn of
    1.54 +    Exn.Interrupt => if null (! status) then status := [exn] else ()
    1.55 +  | _ => change status (cons exn)));
    1.56 +
    1.57 +fun group_status (Group {parent, status, ...}) = (*non-critical*)
    1.58 +  ! status @ (case parent of NONE => [] | SOME group => group_status group);
    1.59 +
    1.60 +fun is_canceled (Group {parent, status, ...}) = (*non-critical*)
    1.61 +  not (null (! status)) orelse (case parent of NONE => false | SOME group => is_canceled group);
    1.62 +
    1.63 +fun str_of_group group =
    1.64 +  (is_canceled group ? enclose "(" ")") (string_of_int (group_id group));
    1.65  
    1.66  
    1.67  (* jobs *)
    1.68 @@ -94,7 +113,7 @@
    1.69  fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs;
    1.70  
    1.71  
    1.72 -(* status *)
    1.73 +(* queue status *)
    1.74  
    1.75  fun status (Queue {jobs, ...}) =
    1.76    let
    1.77 @@ -107,14 +126,38 @@
    1.78    in {ready = x, pending = y, running = z} end;
    1.79  
    1.80  
    1.81 +(* cancel -- peers and sub-groups *)
    1.82 +
    1.83 +fun cancel (Queue {groups, jobs, ...}) group =
    1.84 +  let
    1.85 +    val _ = cancel_group group Exn.Interrupt;
    1.86 +    val tasks = Inttab.lookup_list groups (group_id group);
    1.87 +    val running =
    1.88 +      fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];
    1.89 +    val _ = List.app SimpleThread.interrupt running;
    1.90 +  in null running end;
    1.91 +
    1.92 +fun cancel_all (Queue {jobs, ...}) =
    1.93 +  let
    1.94 +    fun cancel_job (group, job) (groups, running) =
    1.95 +      (cancel_group group Exn.Interrupt;
    1.96 +        (case job of Running t => (insert eq_group group groups, insert Thread.equal t running)
    1.97 +        | _ => (groups, running)));
    1.98 +    val (groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
    1.99 +    val _ = List.app SimpleThread.interrupt running;
   1.100 +  in groups end;
   1.101 +
   1.102 +
   1.103  (* enqueue *)
   1.104  
   1.105 -fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs, cache}) =
   1.106 +fun enqueue group deps pri job (Queue {groups, jobs, cache}) =
   1.107    let
   1.108      val task = new_task pri;
   1.109 -    val groups' = Inttab.cons_list (gid, task) groups;
   1.110 +    val groups' = groups
   1.111 +      |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
   1.112      val jobs' = jobs
   1.113 -      |> Task_Graph.new_node (task, (group, Job [job])) |> fold (add_job task) deps;
   1.114 +      |> Task_Graph.new_node (task, (group, Job [job]))
   1.115 +      |> fold (add_job task) deps;
   1.116      val cache' =
   1.117        (case cache of
   1.118          Result last =>
   1.119 @@ -158,7 +201,8 @@
   1.120      fun ready task =
   1.121        (case Task_Graph.get_node jobs task of
   1.122          (group, Job list) =>
   1.123 -          if null (Task_Graph.imm_preds jobs task) then SOME (task, group, rev list)
   1.124 +          if null (Task_Graph.imm_preds jobs task)
   1.125 +          then SOME (task, group, rev list)
   1.126            else NONE
   1.127        | _ => NONE);
   1.128  
   1.129 @@ -181,7 +225,9 @@
   1.130  (* sporadic interrupts *)
   1.131  
   1.132  fun interrupt (Queue {jobs, ...}) task =
   1.133 -  (case try (get_job jobs) task of SOME (Running thread) => SimpleThread.interrupt thread | _ => ());
   1.134 +  (case try (get_job jobs) task of
   1.135 +    SOME (Running thread) => SimpleThread.interrupt thread
   1.136 +  | _ => ());
   1.137  
   1.138  fun interrupt_external (queue as Queue {jobs, ...}) str =
   1.139    (case Int.fromString str of
   1.140 @@ -194,33 +240,11 @@
   1.141  
   1.142  (* termination *)
   1.143  
   1.144 -fun cancel_group (Group (_, r)) exn = CRITICAL (fn () =>
   1.145 -  (case exn of
   1.146 -    Exn.Interrupt => if null (! r) then r := [exn] else ()
   1.147 -  | _ => change r (cons exn)));
   1.148 -
   1.149 -fun cancel (Queue {groups, jobs, ...}) (group as Group (gid, _)) =
   1.150 -  let
   1.151 -    val _ = cancel_group group Exn.Interrupt;
   1.152 -    val tasks = Inttab.lookup_list groups gid;
   1.153 -    val running = fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];
   1.154 -    val _ = List.app SimpleThread.interrupt running;
   1.155 -  in null running end;
   1.156 -
   1.157 -fun cancel_all (Queue {jobs, ...}) =
   1.158 -  let
   1.159 -    fun cancel_job (group, job) (groups, running) =
   1.160 -      (cancel_group group Exn.Interrupt;
   1.161 -        (case job of Running t => (insert eq_group group groups, insert Thread.equal t running)
   1.162 -        | _ => (groups, running)));
   1.163 -    val (groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
   1.164 -    val _ = List.app SimpleThread.interrupt running;
   1.165 -  in groups end;
   1.166 -
   1.167  fun finish task (Queue {groups, jobs, cache}) =
   1.168    let
   1.169 -    val Group (gid, _) = get_group jobs task;
   1.170 -    val groups' = Inttab.remove_list (op =) (gid, task) groups;
   1.171 +    val group = get_group jobs task;
   1.172 +    val groups' = groups
   1.173 +      |> fold (fn gid => Inttab.remove_list (op =) (gid, task)) (group_ancestry group);
   1.174      val jobs' = Task_Graph.del_node task jobs;
   1.175      val cache' =
   1.176        if null (Task_Graph.imm_succs jobs task) then cache