refined Future.cancel: explicit future allows to join actual cancellation;
authorwenzelm
Fri, 19 Aug 2011 15:56:26 +0200
changeset 45174061599cb6eb0
parent 45173 b8f8488704e2
child 45175 349cc426d929
refined Future.cancel: explicit future allows to join actual cancellation;
Document.cancel_execution: join nested future groups as well;
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/par_list.ML
src/Pure/Concurrent/task_queue.ML
src/Pure/PIDE/document.ML
src/Pure/PIDE/isar_document.ML
     1.1 --- a/src/Pure/Concurrent/future.ML	Fri Aug 19 14:01:20 2011 +0200
     1.2 +++ b/src/Pure/Concurrent/future.ML	Fri Aug 19 15:56:26 2011 +0200
     1.3 @@ -31,6 +31,9 @@
     1.4      that lack regular result information, will pick up parallel
     1.5      exceptions from the cumulative group context (as Par_Exn).
     1.6  
     1.7 +  * Future task groups may be canceled: present and future group
     1.8 +    members will be interrupted eventually.
     1.9 +
    1.10    * Promised "passive" futures are fulfilled by external means.  There
    1.11      is no associated evaluation task, but other futures can depend on
    1.12      them via regular join operations.
    1.13 @@ -46,9 +49,6 @@
    1.14    val peek: 'a future -> 'a Exn.result option
    1.15    val is_finished: 'a future -> bool
    1.16    val get_finished: 'a future -> 'a
    1.17 -  val interruptible_task: ('a -> 'b) -> 'a -> 'b
    1.18 -  val cancel_group: Task_Queue.group -> unit
    1.19 -  val cancel: 'a future -> unit
    1.20    type fork_params =
    1.21     {name: string, group: Task_Queue.group option, deps: Task_Queue.task list,
    1.22      pri: int, interrupts: bool}
    1.23 @@ -61,6 +61,9 @@
    1.24    val value_result: 'a Exn.result -> 'a future
    1.25    val value: 'a -> 'a future
    1.26    val map: ('a -> 'b) -> 'a future -> 'b future
    1.27 +  val cancel_group: Task_Queue.group -> unit future
    1.28 +  val cancel: 'a future -> unit future
    1.29 +  val interruptible_task: ('a -> 'b) -> 'a -> 'b
    1.30    val cond_forks: fork_params -> (unit -> 'a) list -> 'a future list
    1.31    val promise_group: Task_Queue.group -> (unit -> unit) -> 'a future
    1.32    val promise: (unit -> unit) -> 'a future
    1.33 @@ -173,16 +176,6 @@
    1.34  
    1.35  (* cancellation primitives *)
    1.36  
    1.37 -fun interruptible_task f x =
    1.38 -  (if Multithreading.available then
    1.39 -    Multithreading.with_attributes
    1.40 -      (if is_some (worker_task ())
    1.41 -       then Multithreading.private_interrupts
    1.42 -       else Multithreading.public_interrupts)
    1.43 -      (fn _ => f x)
    1.44 -   else interruptible f x)
    1.45 -  before Multithreading.interrupted ();
    1.46 -
    1.47  fun cancel_now group = (*requires SYNCHRONIZED*)
    1.48    Task_Queue.cancel (! queue) group;
    1.49  
    1.50 @@ -213,7 +206,7 @@
    1.51          val test = Exn.capture Multithreading.interrupted ();
    1.52          val _ =
    1.53            if ok andalso not (Exn.is_interrupt_exn test) then ()
    1.54 -          else if cancel_now group then ()
    1.55 +          else if null (cancel_now group) then ()
    1.56            else cancel_later group;
    1.57          val _ = broadcast work_finished;
    1.58          val _ = if maximal then () else signal work_available;
    1.59 @@ -347,7 +340,7 @@
    1.60        else
    1.61         (Multithreading.tracing 1 (fn () =>
    1.62            string_of_int (length (! canceled)) ^ " canceled groups");
    1.63 -        Unsynchronized.change canceled (filter_out cancel_now);
    1.64 +        Unsynchronized.change canceled (filter_out (null o cancel_now));
    1.65          broadcast_work ());
    1.66  
    1.67  
    1.68 @@ -386,20 +379,18 @@
    1.69    if scheduler_active () then ()
    1.70    else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
    1.71  
    1.72 +fun scheduler_cancel group = SYNCHRONIZED "scheduler_cancel" (fn () =>
    1.73 +  let
    1.74 +    val running = cancel_now group;
    1.75 +    val _ =
    1.76 +      if null running then ()
    1.77 +      else (cancel_later group; signal work_available; scheduler_check ());
    1.78 +  in running end);
    1.79 +
    1.80  
    1.81  
    1.82  (** futures **)
    1.83  
    1.84 -(* cancellation *)
    1.85 -
    1.86 -(*cancel: present and future group members will be interrupted eventually*)
    1.87 -fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
    1.88 - (if cancel_now group then () else cancel_later group;
    1.89 -  signal work_available; scheduler_check ()));
    1.90 -
    1.91 -fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
    1.92 -
    1.93 -
    1.94  (* future jobs *)
    1.95  
    1.96  fun assign_result group result raw_res =
    1.97 @@ -559,6 +550,29 @@
    1.98    else map (fn e => value_result (Exn.interruptible_capture e ())) es;
    1.99  
   1.100  
   1.101 +(* cancel *)
   1.102 +
   1.103 +fun cancel_group group =
   1.104 +  (case scheduler_cancel group of
   1.105 +    [] => value ()
   1.106 +  | running =>
   1.107 +      singleton
   1.108 +        (forks {name = "cancel_group", group = SOME (Task_Queue.new_group NONE),
   1.109 +          deps = running, pri = 0, interrupts = false}) I);
   1.110 +
   1.111 +fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
   1.112 +
   1.113 +fun interruptible_task f x =
   1.114 +  (if Multithreading.available then
   1.115 +    Multithreading.with_attributes
   1.116 +      (if is_some (worker_task ())
   1.117 +       then Multithreading.private_interrupts
   1.118 +       else Multithreading.public_interrupts)
   1.119 +      (fn _ => f x)
   1.120 +   else interruptible f x)
   1.121 +  before Multithreading.interrupted ();
   1.122 +
   1.123 +
   1.124  (* promised futures -- fulfilled by external means *)
   1.125  
   1.126  fun promise_group group abort : 'a future =
     2.1 --- a/src/Pure/Concurrent/par_list.ML	Fri Aug 19 14:01:20 2011 +0200
     2.2 +++ b/src/Pure/Concurrent/par_list.ML	Fri Aug 19 15:56:26 2011 +0200
     2.3 @@ -39,7 +39,8 @@
     2.4          Future.forks {name = name, group = SOME group, deps = [], pri = 0, interrupts = true}
     2.5            (map (fn x => fn () => f x) xs);
     2.6        val results = Future.join_results futures
     2.7 -        handle exn => (if Exn.is_interrupt exn then Future.cancel_group group else (); reraise exn);
     2.8 +        handle exn =>
     2.9 +          (if Exn.is_interrupt exn then ignore (Future.cancel_group group) else (); reraise exn);
    2.10      in results end;
    2.11  
    2.12  fun map_name name f xs = Par_Exn.release_first (managed_results name f xs);
     3.1 --- a/src/Pure/Concurrent/task_queue.ML	Fri Aug 19 14:01:20 2011 +0200
     3.2 +++ b/src/Pure/Concurrent/task_queue.ML	Fri Aug 19 15:56:26 2011 +0200
     3.3 @@ -31,7 +31,7 @@
     3.4    val known_task: queue -> task -> bool
     3.5    val all_passive: queue -> bool
     3.6    val status: queue -> {ready: int, pending: int, running: int, passive: int}
     3.7 -  val cancel: queue -> group -> bool
     3.8 +  val cancel: queue -> group -> task list
     3.9    val cancel_all: queue -> group list
    3.10    val finish: task -> queue -> bool * queue
    3.11    val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue
    3.12 @@ -248,10 +248,12 @@
    3.13    let
    3.14      val _ = cancel_group group Exn.Interrupt;
    3.15      val running =
    3.16 -      Tasks.fold (#1 #> get_job jobs #> (fn Running t => insert Thread.equal t | _ => I))
    3.17 +      Tasks.fold (fn (task, _) =>
    3.18 +          (case get_job jobs task of Running thread => cons (task, thread) | _ => I))
    3.19          (get_tasks groups (group_id group)) [];
    3.20 -    val _ = List.app Simple_Thread.interrupt_unsynchronized running;
    3.21 -  in null running end;
    3.22 +    val threads = fold (insert Thread.equal o #2) running [];
    3.23 +    val _ = List.app Simple_Thread.interrupt_unsynchronized threads;
    3.24 +  in map #1 running end;
    3.25  
    3.26  fun cancel_all (Queue {jobs, ...}) =
    3.27    let
     4.1 --- a/src/Pure/PIDE/document.ML	Fri Aug 19 14:01:20 2011 +0200
     4.2 +++ b/src/Pure/PIDE/document.ML	Fri Aug 19 15:56:26 2011 +0200
     4.3 @@ -24,7 +24,7 @@
     4.4    type state
     4.5    val init_state: state
     4.6    val join_commands: state -> unit
     4.7 -  val cancel_execution: state -> unit -> unit
     4.8 +  val cancel_execution: state -> unit future
     4.9    val define_command: command_id -> string -> state -> state
    4.10    val edit: version_id -> version_id -> edit list -> state -> (command_id * exec_id) list * state
    4.11    val execute: version_id -> state -> state
    4.12 @@ -164,7 +164,7 @@
    4.13   {versions: version Inttab.table,  (*version_id -> document content*)
    4.14    commands: Toplevel.transition future Inttab.table,  (*command_id -> transition (future parsing)*)
    4.15    execs: Toplevel.state lazy Inttab.table,  (*exec_id -> execution process*)
    4.16 -  execution: unit future list}  (*global execution process*)
    4.17 +  execution: Task_Queue.group}  (*global execution process*)
    4.18  with
    4.19  
    4.20  fun make_state (versions, commands, execs, execution) =
    4.21 @@ -177,7 +177,7 @@
    4.22    make_state (Inttab.make [(no_id, empty_version)],
    4.23      Inttab.make [(no_id, Future.value Toplevel.empty)],
    4.24      Inttab.make [(no_id, empty_exec)],
    4.25 -    []);
    4.26 +    Task_Queue.new_group NONE);
    4.27  
    4.28  
    4.29  (* document versions *)
    4.30 @@ -233,9 +233,7 @@
    4.31  
    4.32  (* document execution *)
    4.33  
    4.34 -fun cancel_execution (State {execution, ...}) =
    4.35 -  (List.app Future.cancel execution;
    4.36 -    fn () => ignore (Future.join_results execution));
    4.37 +fun cancel_execution (State {execution, ...}) = Future.cancel_group execution;
    4.38  
    4.39  end;
    4.40  
    4.41 @@ -393,17 +391,18 @@
    4.42        fun force_exec NONE = ()
    4.43          | force_exec (SOME exec_id) = ignore (Lazy.force (the_exec state exec_id));
    4.44  
    4.45 -      val execution' =
    4.46 +      val execution = Task_Queue.new_group NONE;
    4.47 +      val _ =
    4.48          nodes_of version |> Graph.schedule
    4.49            (fn deps => fn (name, node) =>
    4.50              singleton
    4.51                (Future.forks
    4.52 -                {name = "theory:" ^ name, group = NONE,
    4.53 +                {name = "theory:" ^ name, group = SOME (Task_Queue.new_group (SOME execution)),
    4.54                    deps = map (Future.task_of o #2) deps,
    4.55                    pri = 1, interrupts = true})
    4.56                (fold_entries NONE (fn (_, exec) => fn () => force_exec exec) node));
    4.57  
    4.58 -    in (versions, commands, execs, execution') end);
    4.59 +    in (versions, commands, execs, execution) end);
    4.60  
    4.61  
    4.62  
     5.1 --- a/src/Pure/PIDE/isar_document.ML	Fri Aug 19 14:01:20 2011 +0200
     5.2 +++ b/src/Pure/PIDE/isar_document.ML	Fri Aug 19 15:56:26 2011 +0200
     5.3 @@ -30,9 +30,9 @@
     5.4                    fn ([a], []) => Document.Header (Exn.Exn (ERROR a))]))
     5.5              end;
     5.6  
     5.7 -      val await_cancellation = Document.cancel_execution state;
     5.8 +      val cancellation = Document.cancel_execution state;
     5.9        val (updates, state') = Document.edit old_id new_id edits state;
    5.10 -      val _ = await_cancellation ();
    5.11 +      val _ = Future.join cancellation;
    5.12        val _ = Document.join_commands state';
    5.13        val _ =
    5.14          Output.status (Markup.markup (Markup.assign new_id)