more official task context via Task_Queue.enroll, which is required to participate in group cancellation (e.g. to terminate command exec);
tuned signature;
1.1 --- a/src/Pure/Concurrent/future.ML Mon Nov 25 21:36:10 2013 +0100
1.2 +++ b/src/Pure/Concurrent/future.ML Thu Nov 28 12:54:39 2013 +0100
1.3 @@ -48,7 +48,6 @@
1.4 val worker_group: unit -> group option
1.5 val the_worker_group: unit -> group
1.6 val worker_subgroup: unit -> group
1.7 - val worker_context: string -> group -> ('a -> 'b) -> 'a -> 'b
1.8 type 'a future
1.9 val task_of: 'a future -> task
1.10 val peek: 'a future -> 'a Exn.result option
1.11 @@ -68,6 +67,7 @@
1.12 val joins: 'a future list -> 'a list
1.13 val join: 'a future -> 'a
1.14 val join_tasks: task list -> unit
1.15 + val task_context: string -> group -> ('a -> 'b) -> 'a -> 'b
1.16 val value_result: 'a Exn.result -> 'a future
1.17 val value: 'a -> 'a future
1.18 val cond_forks: params -> (unit -> 'a) list -> 'a future list
1.19 @@ -109,9 +109,6 @@
1.20
1.21 fun worker_subgroup () = new_group (worker_group ());
1.22
1.23 -fun worker_context name group f x =
1.24 - setmp_worker_task (Task_Queue.new_task group name NONE) f x;
1.25 -
1.26 fun worker_joining e =
1.27 (case worker_task () of
1.28 NONE => e ()
1.29 @@ -471,7 +468,7 @@
1.30
1.31 (* future jobs *)
1.32
1.33 -fun future_job group interrupts (e: unit -> 'a) =
1.34 +fun future_job group atts (e: unit -> 'a) =
1.35 let
1.36 val result = Single_Assignment.var "future" : 'a result;
1.37 val pos = Position.thread_data ();
1.38 @@ -480,10 +477,7 @@
1.39 val res =
1.40 if ok then
1.41 Exn.capture (fn () =>
1.42 - Multithreading.with_attributes
1.43 - (if interrupts
1.44 - then Multithreading.private_interrupts else Multithreading.no_interrupts)
1.45 - (fn _ => Position.setmp_thread_data pos e ())) ()
1.46 + Multithreading.with_attributes atts (fn _ => Position.setmp_thread_data pos e ())) ()
1.47 else Exn.interrupt_exn;
1.48 in assign_result group result (identify_result pos res) end;
1.49 in (result, job) end;
1.50 @@ -504,7 +498,11 @@
1.51 | SOME grp => grp);
1.52 fun enqueue e queue =
1.53 let
1.54 - val (result, job) = future_job grp interrupts e;
1.55 + val atts =
1.56 + if interrupts
1.57 + then Multithreading.private_interrupts
1.58 + else Multithreading.no_interrupts;
1.59 + val (result, job) = future_job grp atts e;
1.60 val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
1.61 val future = Future {promised = false, task = task, result = result};
1.62 in (future, queue') end;
1.63 @@ -580,6 +578,23 @@
1.64 |> join;
1.65
1.66
1.67 +(* task context for running thread *)
1.68 +
1.69 +fun task_context name group f x =
1.70 + Multithreading.with_attributes Multithreading.no_interrupts (fn orig_atts =>
1.71 + let
1.72 + val (result, job) = future_job group orig_atts (fn () => f x);
1.73 + val task =
1.74 + SYNCHRONIZED "enroll" (fn () =>
1.75 + Unsynchronized.change_result queue (Task_Queue.enroll (Thread.self ()) name group));
1.76 + val _ = worker_exec (task, [job]);
1.77 + in
1.78 + (case Single_Assignment.peek result of
1.79 + NONE => raise Fail "Missing task context result"
1.80 + | SOME res => Exn.release res)
1.81 + end);
1.82 +
1.83 +
1.84 (* fast-path operations -- bypass task queue if possible *)
1.85
1.86 fun value_result (res: 'a Exn.result) =
1.87 @@ -602,7 +617,8 @@
1.88 let
1.89 val task = task_of x;
1.90 val group = Task_Queue.group_of_task task;
1.91 - val (result, job) = future_job group true (fn () => f (join x));
1.92 + val (result, job) =
1.93 + future_job group Multithreading.private_interrupts (fn () => f (join x));
1.94
1.95 val extended = SYNCHRONIZED "extend" (fn () =>
1.96 (case Task_Queue.extend task job (! queue) of
2.1 --- a/src/Pure/Concurrent/task_queue.ML Mon Nov 25 21:36:10 2013 +0100
2.2 +++ b/src/Pure/Concurrent/task_queue.ML Thu Nov 28 12:54:39 2013 +0100
2.3 @@ -17,7 +17,6 @@
2.4 val str_of_groups: group -> string
2.5 type task
2.6 val dummy_task: task
2.7 - val new_task: group -> string -> int option -> task
2.8 val group_of_task: task -> group
2.9 val name_of_task: task -> string
2.10 val pri_of_task: task -> int
2.11 @@ -36,6 +35,7 @@
2.12 val cancel: queue -> group -> Thread.thread list
2.13 val cancel_all: queue -> group list * Thread.thread list
2.14 val finish: task -> queue -> bool * queue
2.15 + val enroll: Thread.thread -> string -> group -> queue -> task * queue
2.16 val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue
2.17 val enqueue: string -> group -> task list -> int -> (bool -> bool) -> queue -> task * queue
2.18 val extend: task -> (bool -> bool) -> queue -> queue option
2.19 @@ -295,6 +295,16 @@
2.20 in (maximal, make_queue groups' jobs') end;
2.21
2.22
2.23 +(* enroll *)
2.24 +
2.25 +fun enroll thread name group (Queue {groups, jobs}) =
2.26 + let
2.27 + val task = new_task group name NONE;
2.28 + val groups' = fold_groups (fn g => add_task (group_id g, task)) group groups;
2.29 + val jobs' = jobs |> Task_Graph.new_node (task, Running thread);
2.30 + in (task, make_queue groups' jobs') end;
2.31 +
2.32 +
2.33 (* enqueue *)
2.34
2.35 fun enqueue_passive group abort (Queue {groups, jobs}) =
3.1 --- a/src/Pure/PIDE/command.ML Mon Nov 25 21:36:10 2013 +0100
3.2 +++ b/src/Pure/PIDE/command.ML Thu Nov 28 12:54:39 2013 +0100
3.3 @@ -63,7 +63,7 @@
3.4 val res =
3.5 (body
3.6 |> restore_attributes
3.7 - |> Future.worker_context "Command.memo_exec" group
3.8 + |> Future.task_context "Command.memo_exec" group
3.9 |> Exn.interruptible_capture) ();
3.10 in SOME ((), Result res) end
3.11 else SOME ((), expr)
4.1 --- a/src/Pure/System/isabelle_process.ML Mon Nov 25 21:36:10 2013 +0100
4.2 +++ b/src/Pure/System/isabelle_process.ML Thu Nov 28 12:54:39 2013 +0100
4.3 @@ -159,8 +159,8 @@
4.4 NONE => raise Runtime.TERMINATE
4.5 | SOME line => map (read_chunk channel) (space_explode "," line));
4.6
4.7 -fun worker_context e =
4.8 - Future.worker_context "Isabelle_Process.loop" (Future.new_group NONE) e ();
4.9 +fun task_context e =
4.10 + Future.task_context "Isabelle_Process.loop" (Future.new_group NONE) e ();
4.11
4.12 in
4.13
4.14 @@ -168,7 +168,7 @@
4.15 let val continue =
4.16 (case read_command channel of
4.17 [] => (Output.error_msg "Isabelle process: no input"; true)
4.18 - | name :: args => (worker_context (fn () => run_command name args); true))
4.19 + | name :: args => (task_context (fn () => run_command name args); true))
4.20 handle Runtime.TERMINATE => false
4.21 | exn => (Output.error_msg (ML_Compiler.exn_message exn) handle crash => recover crash; true);
4.22 in