1.1 --- a/src/Pure/Concurrent/future.ML Fri Aug 19 14:01:20 2011 +0200
1.2 +++ b/src/Pure/Concurrent/future.ML Fri Aug 19 15:56:26 2011 +0200
1.3 @@ -31,6 +31,9 @@
1.4 that lack regular result information, will pick up parallel
1.5 exceptions from the cumulative group context (as Par_Exn).
1.6
1.7 + * Future task groups may be canceled: present and future group
1.8 + members will be interrupted eventually.
1.9 +
1.10 * Promised "passive" futures are fulfilled by external means. There
1.11 is no associated evaluation task, but other futures can depend on
1.12 them via regular join operations.
1.13 @@ -46,9 +49,6 @@
1.14 val peek: 'a future -> 'a Exn.result option
1.15 val is_finished: 'a future -> bool
1.16 val get_finished: 'a future -> 'a
1.17 - val interruptible_task: ('a -> 'b) -> 'a -> 'b
1.18 - val cancel_group: Task_Queue.group -> unit
1.19 - val cancel: 'a future -> unit
1.20 type fork_params =
1.21 {name: string, group: Task_Queue.group option, deps: Task_Queue.task list,
1.22 pri: int, interrupts: bool}
1.23 @@ -61,6 +61,9 @@
1.24 val value_result: 'a Exn.result -> 'a future
1.25 val value: 'a -> 'a future
1.26 val map: ('a -> 'b) -> 'a future -> 'b future
1.27 + val cancel_group: Task_Queue.group -> unit future
1.28 + val cancel: 'a future -> unit future
1.29 + val interruptible_task: ('a -> 'b) -> 'a -> 'b
1.30 val cond_forks: fork_params -> (unit -> 'a) list -> 'a future list
1.31 val promise_group: Task_Queue.group -> (unit -> unit) -> 'a future
1.32 val promise: (unit -> unit) -> 'a future
1.33 @@ -173,16 +176,6 @@
1.34
1.35 (* cancellation primitives *)
1.36
1.37 -fun interruptible_task f x =
1.38 - (if Multithreading.available then
1.39 - Multithreading.with_attributes
1.40 - (if is_some (worker_task ())
1.41 - then Multithreading.private_interrupts
1.42 - else Multithreading.public_interrupts)
1.43 - (fn _ => f x)
1.44 - else interruptible f x)
1.45 - before Multithreading.interrupted ();
1.46 -
1.47 fun cancel_now group = (*requires SYNCHRONIZED*)
1.48 Task_Queue.cancel (! queue) group;
1.49
1.50 @@ -213,7 +206,7 @@
1.51 val test = Exn.capture Multithreading.interrupted ();
1.52 val _ =
1.53 if ok andalso not (Exn.is_interrupt_exn test) then ()
1.54 - else if cancel_now group then ()
1.55 + else if null (cancel_now group) then ()
1.56 else cancel_later group;
1.57 val _ = broadcast work_finished;
1.58 val _ = if maximal then () else signal work_available;
1.59 @@ -347,7 +340,7 @@
1.60 else
1.61 (Multithreading.tracing 1 (fn () =>
1.62 string_of_int (length (! canceled)) ^ " canceled groups");
1.63 - Unsynchronized.change canceled (filter_out cancel_now);
1.64 + Unsynchronized.change canceled (filter_out (null o cancel_now));
1.65 broadcast_work ());
1.66
1.67
1.68 @@ -386,20 +379,18 @@
1.69 if scheduler_active () then ()
1.70 else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
1.71
1.72 +fun scheduler_cancel group = SYNCHRONIZED "scheduler_cancel" (fn () =>
1.73 + let
1.74 + val running = cancel_now group;
1.75 + val _ =
1.76 + if null running then ()
1.77 + else (cancel_later group; signal work_available; scheduler_check ());
1.78 + in running end);
1.79 +
1.80
1.81
1.82 (** futures **)
1.83
1.84 -(* cancellation *)
1.85 -
1.86 -(*cancel: present and future group members will be interrupted eventually*)
1.87 -fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
1.88 - (if cancel_now group then () else cancel_later group;
1.89 - signal work_available; scheduler_check ()));
1.90 -
1.91 -fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
1.92 -
1.93 -
1.94 (* future jobs *)
1.95
1.96 fun assign_result group result raw_res =
1.97 @@ -559,6 +550,29 @@
1.98 else map (fn e => value_result (Exn.interruptible_capture e ())) es;
1.99
1.100
1.101 +(* cancel *)
1.102 +
1.103 +fun cancel_group group =
1.104 + (case scheduler_cancel group of
1.105 + [] => value ()
1.106 + | running =>
1.107 + singleton
1.108 + (forks {name = "cancel_group", group = SOME (Task_Queue.new_group NONE),
1.109 + deps = running, pri = 0, interrupts = false}) I);
1.110 +
1.111 +fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
1.112 +
1.113 +fun interruptible_task f x =
1.114 + (if Multithreading.available then
1.115 + Multithreading.with_attributes
1.116 + (if is_some (worker_task ())
1.117 + then Multithreading.private_interrupts
1.118 + else Multithreading.public_interrupts)
1.119 + (fn _ => f x)
1.120 + else interruptible f x)
1.121 + before Multithreading.interrupted ();
1.122 +
1.123 +
1.124 (* promised futures -- fulfilled by external means *)
1.125
1.126 fun promise_group group abort : 'a future =
2.1 --- a/src/Pure/Concurrent/par_list.ML Fri Aug 19 14:01:20 2011 +0200
2.2 +++ b/src/Pure/Concurrent/par_list.ML Fri Aug 19 15:56:26 2011 +0200
2.3 @@ -39,7 +39,8 @@
2.4 Future.forks {name = name, group = SOME group, deps = [], pri = 0, interrupts = true}
2.5 (map (fn x => fn () => f x) xs);
2.6 val results = Future.join_results futures
2.7 - handle exn => (if Exn.is_interrupt exn then Future.cancel_group group else (); reraise exn);
2.8 + handle exn =>
2.9 + (if Exn.is_interrupt exn then ignore (Future.cancel_group group) else (); reraise exn);
2.10 in results end;
2.11
2.12 fun map_name name f xs = Par_Exn.release_first (managed_results name f xs);
3.1 --- a/src/Pure/Concurrent/task_queue.ML Fri Aug 19 14:01:20 2011 +0200
3.2 +++ b/src/Pure/Concurrent/task_queue.ML Fri Aug 19 15:56:26 2011 +0200
3.3 @@ -31,7 +31,7 @@
3.4 val known_task: queue -> task -> bool
3.5 val all_passive: queue -> bool
3.6 val status: queue -> {ready: int, pending: int, running: int, passive: int}
3.7 - val cancel: queue -> group -> bool
3.8 + val cancel: queue -> group -> task list
3.9 val cancel_all: queue -> group list
3.10 val finish: task -> queue -> bool * queue
3.11 val enqueue_passive: group -> (unit -> bool) -> queue -> task * queue
3.12 @@ -248,10 +248,12 @@
3.13 let
3.14 val _ = cancel_group group Exn.Interrupt;
3.15 val running =
3.16 - Tasks.fold (#1 #> get_job jobs #> (fn Running t => insert Thread.equal t | _ => I))
3.17 + Tasks.fold (fn (task, _) =>
3.18 + (case get_job jobs task of Running thread => cons (task, thread) | _ => I))
3.19 (get_tasks groups (group_id group)) [];
3.20 - val _ = List.app Simple_Thread.interrupt_unsynchronized running;
3.21 - in null running end;
3.22 + val threads = fold (insert Thread.equal o #2) running [];
3.23 + val _ = List.app Simple_Thread.interrupt_unsynchronized threads;
3.24 + in map #1 running end;
3.25
3.26 fun cancel_all (Queue {jobs, ...}) =
3.27 let
4.1 --- a/src/Pure/PIDE/document.ML Fri Aug 19 14:01:20 2011 +0200
4.2 +++ b/src/Pure/PIDE/document.ML Fri Aug 19 15:56:26 2011 +0200
4.3 @@ -24,7 +24,7 @@
4.4 type state
4.5 val init_state: state
4.6 val join_commands: state -> unit
4.7 - val cancel_execution: state -> unit -> unit
4.8 + val cancel_execution: state -> unit future
4.9 val define_command: command_id -> string -> state -> state
4.10 val edit: version_id -> version_id -> edit list -> state -> (command_id * exec_id) list * state
4.11 val execute: version_id -> state -> state
4.12 @@ -164,7 +164,7 @@
4.13 {versions: version Inttab.table, (*version_id -> document content*)
4.14 commands: Toplevel.transition future Inttab.table, (*command_id -> transition (future parsing)*)
4.15 execs: Toplevel.state lazy Inttab.table, (*exec_id -> execution process*)
4.16 - execution: unit future list} (*global execution process*)
4.17 + execution: Task_Queue.group} (*global execution process*)
4.18 with
4.19
4.20 fun make_state (versions, commands, execs, execution) =
4.21 @@ -177,7 +177,7 @@
4.22 make_state (Inttab.make [(no_id, empty_version)],
4.23 Inttab.make [(no_id, Future.value Toplevel.empty)],
4.24 Inttab.make [(no_id, empty_exec)],
4.25 - []);
4.26 + Task_Queue.new_group NONE);
4.27
4.28
4.29 (* document versions *)
4.30 @@ -233,9 +233,7 @@
4.31
4.32 (* document execution *)
4.33
4.34 -fun cancel_execution (State {execution, ...}) =
4.35 - (List.app Future.cancel execution;
4.36 - fn () => ignore (Future.join_results execution));
4.37 +fun cancel_execution (State {execution, ...}) = Future.cancel_group execution;
4.38
4.39 end;
4.40
4.41 @@ -393,17 +391,18 @@
4.42 fun force_exec NONE = ()
4.43 | force_exec (SOME exec_id) = ignore (Lazy.force (the_exec state exec_id));
4.44
4.45 - val execution' =
4.46 + val execution = Task_Queue.new_group NONE;
4.47 + val _ =
4.48 nodes_of version |> Graph.schedule
4.49 (fn deps => fn (name, node) =>
4.50 singleton
4.51 (Future.forks
4.52 - {name = "theory:" ^ name, group = NONE,
4.53 + {name = "theory:" ^ name, group = SOME (Task_Queue.new_group (SOME execution)),
4.54 deps = map (Future.task_of o #2) deps,
4.55 pri = 1, interrupts = true})
4.56 (fold_entries NONE (fn (_, exec) => fn () => force_exec exec) node));
4.57
4.58 - in (versions, commands, execs, execution') end);
4.59 + in (versions, commands, execs, execution) end);
4.60
4.61
4.62
5.1 --- a/src/Pure/PIDE/isar_document.ML Fri Aug 19 14:01:20 2011 +0200
5.2 +++ b/src/Pure/PIDE/isar_document.ML Fri Aug 19 15:56:26 2011 +0200
5.3 @@ -30,9 +30,9 @@
5.4 fn ([a], []) => Document.Header (Exn.Exn (ERROR a))]))
5.5 end;
5.6
5.7 - val await_cancellation = Document.cancel_execution state;
5.8 + val cancellation = Document.cancel_execution state;
5.9 val (updates, state') = Document.edit old_id new_id edits state;
5.10 - val _ = await_cancellation ();
5.11 + val _ = Future.join cancellation;
5.12 val _ = Document.join_commands state';
5.13 val _ =
5.14 Output.status (Markup.markup (Markup.assign new_id)