more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
authorwenzelm
Thu, 28 Nov 2013 12:54:39 +0100
changeset 5529899b9249b3e05
parent 55297 f38b113697a2
child 55299 d206c93c0267
more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
tuned signature;
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/task_queue.ML
src/Pure/PIDE/command.ML
src/Pure/System/isabelle_process.ML
     1.1 --- a/src/Pure/Concurrent/future.ML	Mon Nov 25 21:36:10 2013 +0100
     1.2 +++ b/src/Pure/Concurrent/future.ML	Thu Nov 28 12:54:39 2013 +0100
     1.3 @@ -48,7 +48,6 @@
     1.4    val worker_group: unit -> group option
     1.5    val the_worker_group: unit -> group
     1.6    val worker_subgroup: unit -> group
     1.7 -  val worker_context: string -> group -> ('a -> 'b) -> 'a -> 'b
     1.8    type 'a future
     1.9    val task_of: 'a future -> task
    1.10    val peek: 'a future -> 'a Exn.result option
    1.11 @@ -68,6 +67,7 @@
    1.12    val joins: 'a future list -> 'a list
    1.13    val join: 'a future -> 'a
    1.14    val join_tasks: task list -> unit
    1.15 +  val task_context: string -> group -> ('a -> 'b) -> 'a -> 'b
    1.16    val value_result: 'a Exn.result -> 'a future
    1.17    val value: 'a -> 'a future
    1.18    val cond_forks: params -> (unit -> 'a) list -> 'a future list
    1.19 @@ -109,9 +109,6 @@
    1.20  
    1.21  fun worker_subgroup () = new_group (worker_group ());
    1.22  
    1.23 -fun worker_context name group f x =
    1.24 -  setmp_worker_task (Task_Queue.new_task group name NONE) f x;
    1.25 -
    1.26  fun worker_joining e =
    1.27    (case worker_task () of
    1.28      NONE => e ()
    1.29 @@ -471,7 +468,7 @@
    1.30  
    1.31  (* future jobs *)
    1.32  
    1.33 -fun future_job group interrupts (e: unit -> 'a) =
    1.34 +fun future_job group atts (e: unit -> 'a) =
    1.35    let
    1.36      val result = Single_Assignment.var "future" : 'a result;
    1.37      val pos = Position.thread_data ();
    1.38 @@ -480,10 +477,7 @@
    1.39          val res =
    1.40            if ok then
    1.41              Exn.capture (fn () =>
    1.42 -              Multithreading.with_attributes
    1.43 -                (if interrupts
    1.44 -                 then Multithreading.private_interrupts else Multithreading.no_interrupts)
    1.45 -                (fn _ => Position.setmp_thread_data pos e ())) ()
    1.46 +              Multithreading.with_attributes atts (fn _ => Position.setmp_thread_data pos e ())) ()
    1.47            else Exn.interrupt_exn;
    1.48        in assign_result group result (identify_result pos res) end;
    1.49    in (result, job) end;
    1.50 @@ -504,7 +498,11 @@
    1.51          | SOME grp => grp);
    1.52        fun enqueue e queue =
    1.53          let
    1.54 -          val (result, job) = future_job grp interrupts e;
    1.55 +          val atts =
    1.56 +            if interrupts
    1.57 +            then Multithreading.private_interrupts
    1.58 +            else Multithreading.no_interrupts;
    1.59 +          val (result, job) = future_job grp atts e;
    1.60            val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
    1.61            val future = Future {promised = false, task = task, result = result};
    1.62          in (future, queue') end;
    1.63 @@ -580,6 +578,23 @@
    1.64      |> join;
    1.65  
    1.66  
    1.67 +(* task context for running thread *)
    1.68 +
    1.69 +fun task_context name group f x =
    1.70 +  Multithreading.with_attributes Multithreading.no_interrupts (fn orig_atts =>
    1.71 +    let
    1.72 +      val (result, job) = future_job group orig_atts (fn () => f x);
    1.73 +      val task =
    1.74 +        SYNCHRONIZED "enroll" (fn () =>
    1.75 +          Unsynchronized.change_result queue (Task_Queue.enroll (Thread.self ()) name group));
    1.76 +      val _ = worker_exec (task, [job]);
    1.77 +    in
    1.78 +      (case Single_Assignment.peek result of
    1.79 +        NONE => raise Fail "Missing task context result"
    1.80 +      | SOME res => Exn.release res)
    1.81 +    end);
    1.82 +
    1.83 +
    1.84  (* fast-path operations -- bypass task queue if possible *)
    1.85  
    1.86  fun value_result (res: 'a Exn.result) =
    1.87 @@ -602,7 +617,8 @@
    1.88      let
    1.89        val task = task_of x;
    1.90        val group = Task_Queue.group_of_task task;
    1.91 -      val (result, job) = future_job group true (fn () => f (join x));
    1.92 +      val (result, job) =
    1.93 +        future_job group Multithreading.private_interrupts (fn () => f (join x));
    1.94  
    1.95        val extended = SYNCHRONIZED "extend" (fn () =>
    1.96          (case Task_Queue.extend task job (! queue) of
     2.1 --- a/src/Pure/Concurrent/task_queue.ML	Mon Nov 25 21:36:10 2013 +0100
     2.2 +++ b/src/Pure/Concurrent/task_queue.ML	Thu Nov 28 12:54:39 2013 +0100
     2.3 @@ -17,7 +17,6 @@
     2.4    val str_of_groups: group -> string
     2.5    type task
     2.6    val dummy_task: task
     2.7 -  val new_task: group -> string -> int option -> task
     2.8    val group_of_task: task -> group
     2.9    val name_of_task: task -> string
    2.10    val pri_of_task: task -> int
    2.11 @@ -36,6 +35,7 @@
    2.12    val cancel: queue -> group -> Thread.thread list
    2.13    val cancel_all: queue -> group list * Thread.thread list
    2.14    val finish: task -> queue -> bool * queue
    2.15 +  val enroll: Thread.thread -> string -> group -> queue -> task * queue
    2.16    val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue
    2.17    val enqueue: string -> group -> task list -> int -> (bool -> bool) -> queue -> task * queue
    2.18    val extend: task -> (bool -> bool) -> queue -> queue option
    2.19 @@ -295,6 +295,16 @@
    2.20    in (maximal, make_queue groups' jobs') end;
    2.21  
    2.22  
    2.23 +(* enroll *)
    2.24 +
    2.25 +fun enroll thread name group (Queue {groups, jobs}) =
    2.26 +  let
    2.27 +    val task = new_task group name NONE;
    2.28 +    val groups' = fold_groups (fn g => add_task (group_id g, task)) group groups;
    2.29 +    val jobs' = jobs |> Task_Graph.new_node (task, Running thread);
    2.30 +  in (task, make_queue groups' jobs') end;
    2.31 +
    2.32 +
    2.33  (* enqueue *)
    2.34  
    2.35  fun enqueue_passive group abort (Queue {groups, jobs}) =
     3.1 --- a/src/Pure/PIDE/command.ML	Mon Nov 25 21:36:10 2013 +0100
     3.2 +++ b/src/Pure/PIDE/command.ML	Thu Nov 28 12:54:39 2013 +0100
     3.3 @@ -63,7 +63,7 @@
     3.4                    val res =
     3.5                      (body
     3.6                        |> restore_attributes
     3.7 -                      |> Future.worker_context "Command.memo_exec" group
     3.8 +                      |> Future.task_context "Command.memo_exec" group
     3.9                        |> Exn.interruptible_capture) ();
    3.10                  in SOME ((), Result res) end
    3.11                else SOME ((), expr)
     4.1 --- a/src/Pure/System/isabelle_process.ML	Mon Nov 25 21:36:10 2013 +0100
     4.2 +++ b/src/Pure/System/isabelle_process.ML	Thu Nov 28 12:54:39 2013 +0100
     4.3 @@ -159,8 +159,8 @@
     4.4      NONE => raise Runtime.TERMINATE
     4.5    | SOME line => map (read_chunk channel) (space_explode "," line));
     4.6  
     4.7 -fun worker_context e =
     4.8 -  Future.worker_context "Isabelle_Process.loop" (Future.new_group NONE) e ();
     4.9 +fun task_context e =
    4.10 +  Future.task_context "Isabelle_Process.loop" (Future.new_group NONE) e ();
    4.11  
    4.12  in
    4.13  
    4.14 @@ -168,7 +168,7 @@
    4.15    let val continue =
    4.16      (case read_command channel of
    4.17        [] => (Output.error_msg "Isabelle process: no input"; true)
    4.18 -    | name :: args => (worker_context (fn () => run_command name args); true))
    4.19 +    | name :: args => (task_context (fn () => run_command name args); true))
    4.20      handle Runtime.TERMINATE => false
    4.21        | exn => (Output.error_msg (ML_Compiler.exn_message exn) handle crash => recover crash; true);
    4.22    in