1.1 --- a/src/Pure/Concurrent/future.ML Fri Aug 19 16:13:26 2011 +0200
1.2 +++ b/src/Pure/Concurrent/future.ML Fri Aug 19 17:39:37 2011 +0200
1.3 @@ -52,6 +52,9 @@
1.4 val peek: 'a future -> 'a Exn.result option
1.5 val is_finished: 'a future -> bool
1.6 val get_finished: 'a future -> 'a
1.7 + val interruptible_task: ('a -> 'b) -> 'a -> 'b
1.8 + val cancel_group: group -> task list
1.9 + val cancel: 'a future -> task list
1.10 type fork_params =
1.11 {name: string, group: group option, deps: task list, pri: int, interrupts: bool}
1.12 val forks: fork_params -> (unit -> 'a) list -> 'a future list
1.13 @@ -60,12 +63,10 @@
1.14 val join_results: 'a future list -> 'a Exn.result list
1.15 val join_result: 'a future -> 'a Exn.result
1.16 val join: 'a future -> 'a
1.17 + val join_tasks: task list -> unit
1.18 val value_result: 'a Exn.result -> 'a future
1.19 val value: 'a -> 'a future
1.20 val map: ('a -> 'b) -> 'a future -> 'b future
1.21 - val cancel_group: group -> unit future
1.22 - val cancel: 'a future -> unit future
1.23 - val interruptible_task: ('a -> 'b) -> 'a -> 'b
1.24 val cond_forks: fork_params -> (unit -> 'a) list -> 'a future list
1.25 val promise_group: group -> (unit -> unit) -> 'a future
1.26 val promise: (unit -> unit) -> 'a future
1.27 @@ -191,6 +192,17 @@
1.28 broadcast scheduler_event);
1.29
1.30
1.31 +fun interruptible_task f x =
1.32 + (if Multithreading.available then
1.33 + Multithreading.with_attributes
1.34 + (if is_some (worker_task ())
1.35 + then Multithreading.private_interrupts
1.36 + else Multithreading.public_interrupts)
1.37 + (fn _ => f x)
1.38 + else interruptible f x)
1.39 + before Multithreading.interrupted ();
1.40 +
1.41 +
1.42 (* worker threads *)
1.43
1.44 fun worker_exec (task, jobs) =
1.45 @@ -386,7 +398,13 @@
1.46 if scheduler_active () then ()
1.47 else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
1.48
1.49 -fun scheduler_cancel group = SYNCHRONIZED "scheduler_cancel" (fn () =>
1.50 +
1.51 +
1.52 +(** futures **)
1.53 +
1.54 +(* cancel *)
1.55 +
1.56 +fun cancel_group group = SYNCHRONIZED "cancel_group" (fn () =>
1.57 let
1.58 val running = cancel_now group;
1.59 val _ =
1.60 @@ -394,10 +412,9 @@
1.61 else (cancel_later group; signal work_available; scheduler_check ());
1.62 in running end);
1.63
1.64 +fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
1.65
1.66
1.67 -(** futures **)
1.68 -
1.69 (* future jobs *)
1.70
1.71 fun assign_result group result raw_res =
1.72 @@ -467,7 +484,7 @@
1.73 end;
1.74
1.75 fun fork_pri pri e =
1.76 - singleton (forks {name = "", group = NONE, deps = [], pri = pri, interrupts = true}) e;
1.77 + (singleton o forks) {name = "fork", group = NONE, deps = [], pri = pri, interrupts = true} e;
1.78
1.79 fun fork e = fork_pri 0 e;
1.80
1.81 @@ -519,6 +536,13 @@
1.82 fun join_result x = singleton join_results x;
1.83 fun join x = Exn.release (join_result x);
1.84
1.85 +fun join_tasks [] = ()
1.86 + | join_tasks tasks =
1.87 + (singleton o forks)
1.88 + {name = "join_tasks", group = SOME (new_group NONE),
1.89 + deps = tasks, pri = 0, interrupts = false} I
1.90 + |> join;
1.91 +
1.92
1.93 (* fast-path versions -- bypassing task queue *)
1.94
1.95 @@ -545,9 +569,9 @@
1.96 in
1.97 if extended then Future {promised = false, task = task, result = result}
1.98 else
1.99 - singleton
1.100 - (forks {name = "Future.map", group = SOME group, deps = [task],
1.101 - pri = Task_Queue.pri_of_task task, interrupts = true})
1.102 + (singleton o forks)
1.103 + {name = "map_future", group = SOME group, deps = [task],
1.104 + pri = Task_Queue.pri_of_task task, interrupts = true}
1.105 (fn () => f (join x))
1.106 end;
1.107
1.108 @@ -556,29 +580,6 @@
1.109 else map (fn e => value_result (Exn.interruptible_capture e ())) es;
1.110
1.111
1.112 -(* cancel *)
1.113 -
1.114 -fun cancel_group group =
1.115 - (case scheduler_cancel group of
1.116 - [] => value ()
1.117 - | running =>
1.118 - singleton
1.119 - (forks {name = "cancel_group", group = SOME (new_group NONE),
1.120 - deps = running, pri = 0, interrupts = false}) I);
1.121 -
1.122 -fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
1.123 -
1.124 -fun interruptible_task f x =
1.125 - (if Multithreading.available then
1.126 - Multithreading.with_attributes
1.127 - (if is_some (worker_task ())
1.128 - then Multithreading.private_interrupts
1.129 - else Multithreading.public_interrupts)
1.130 - (fn _ => f x)
1.131 - else interruptible f x)
1.132 - before Multithreading.interrupted ();
1.133 -
1.134 -
1.135 (* promised futures -- fulfilled by external means *)
1.136
1.137 fun promise_group group abort : 'a future =
2.1 --- a/src/Pure/PIDE/document.ML Fri Aug 19 16:13:26 2011 +0200
2.2 +++ b/src/Pure/PIDE/document.ML Fri Aug 19 17:39:37 2011 +0200
2.3 @@ -24,7 +24,7 @@
2.4 type state
2.5 val init_state: state
2.6 val join_commands: state -> unit
2.7 - val cancel_execution: state -> unit future
2.8 + val cancel_execution: state -> Future.task list
2.9 val define_command: command_id -> string -> state -> state
2.10 val edit: version_id -> version_id -> edit list -> state -> (command_id * exec_id) list * state
2.11 val execute: version_id -> state -> state
3.1 --- a/src/Pure/PIDE/isar_document.ML Fri Aug 19 16:13:26 2011 +0200
3.2 +++ b/src/Pure/PIDE/isar_document.ML Fri Aug 19 17:39:37 2011 +0200
3.3 @@ -30,9 +30,9 @@
3.4 fn ([a], []) => Document.Header (Exn.Exn (ERROR a))]))
3.5 end;
3.6
3.7 - val cancellation = Document.cancel_execution state;
3.8 + val running = Document.cancel_execution state;
3.9 val (updates, state') = Document.edit old_id new_id edits state;
3.10 - val _ = Future.join cancellation;
3.11 + val _ = Future.join_tasks running;
3.12 val _ = Document.join_commands state';
3.13 val _ =
3.14 Output.status (Markup.markup (Markup.assign new_id)