added Future.promise/fulfill -- promised futures that are fulfilled by external means;
authorwenzelm
Tue, 05 Jan 2010 23:38:10 +0100
changeset 342777325a5e3587f
parent 34276 12436485c244
child 34278 228f27469139
added Future.promise/fulfill -- promised futures that are fulfilled by external means;
Future.value: official result assignment -- produces immutable ref;
Future.shutdown: raw_wait keeps raw task attributes, e.g. asynchronous interrupts of toplevel;
Task_Queue: passive tasks track dependencies, but lack any evaluation process;
tuned;
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/task_queue.ML
     1.1 --- a/src/Pure/Concurrent/future.ML	Tue Jan 05 18:20:18 2010 +0100
     1.2 +++ b/src/Pure/Concurrent/future.ML	Tue Jan 05 23:38:10 2010 +0100
     1.3 @@ -23,6 +23,10 @@
     1.4      of runtime resources is distorted either if workers yield CPU time
     1.5      (e.g. via system sleep or wait operations), or if non-worker
     1.6      threads contend for significant runtime resources independently.
     1.7 +
     1.8 +  * Promised futures are fulfilled by external means.  There is no
     1.9 +    associated evaluation task, but other futures can depend on them
    1.10 +    as usual.
    1.11  *)
    1.12  
    1.13  signature FUTURE =
    1.14 @@ -37,7 +41,6 @@
    1.15    val group_of: 'a future -> group
    1.16    val peek: 'a future -> 'a Exn.result option
    1.17    val is_finished: 'a future -> bool
    1.18 -  val value: 'a -> 'a future
    1.19    val fork_group: group -> (unit -> 'a) -> 'a future
    1.20    val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future
    1.21    val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
    1.22 @@ -46,7 +49,12 @@
    1.23    val join_results: 'a future list -> 'a Exn.result list
    1.24    val join_result: 'a future -> 'a Exn.result
    1.25    val join: 'a future -> 'a
    1.26 +  val value: 'a -> 'a future
    1.27    val map: ('a -> 'b) -> 'a future -> 'b future
    1.28 +  val promise_group: group -> 'a future
    1.29 +  val promise: unit -> 'a future
    1.30 +  val fulfill_result: 'a future -> 'a Exn.result -> unit
    1.31 +  val fulfill: 'a future -> 'a -> unit
    1.32    val interruptible_task: ('a -> 'b) -> 'a -> 'b
    1.33    val cancel_group: group -> unit
    1.34    val cancel: 'a future -> unit
    1.35 @@ -75,11 +83,14 @@
    1.36  val worker_task = Option.map #1 o thread_data;
    1.37  val worker_group = Option.map #2 o thread_data;
    1.38  
    1.39 +fun new_group () = Task_Queue.new_group (worker_group ());
    1.40 +
    1.41  
    1.42  (* datatype future *)
    1.43  
    1.44  datatype 'a future = Future of
    1.45 - {task: task,
    1.46 + {promised: bool,
    1.47 +  task: task,
    1.48    group: group,
    1.49    result: 'a Exn.result option Synchronized.var};
    1.50  
    1.51 @@ -90,10 +101,14 @@
    1.52  fun peek x = Synchronized.value (result_of x);
    1.53  fun is_finished x = is_some (peek x);
    1.54  
    1.55 -fun value x = Future
    1.56 - {task = Task_Queue.new_task 0,
    1.57 -  group = Task_Queue.new_group NONE,
    1.58 -  result = Synchronized.var "future" (SOME (Exn.Result x))};
    1.59 +fun assign_result group result res =
    1.60 +  let
    1.61 +    val _ = Synchronized.assign result (K (SOME res));
    1.62 +    val ok =
    1.63 +      (case res of
    1.64 +        Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
    1.65 +      | Exn.Result _ => true);
    1.66 +  in ok end;
    1.67  
    1.68  
    1.69  
    1.70 @@ -111,6 +126,9 @@
    1.71  
    1.72  fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
    1.73  
    1.74 +fun raw_wait cond = (*requires SYNCHRONIZED*)
    1.75 +  ConditionVar.wait (cond, lock);
    1.76 +
    1.77  fun wait cond = (*requires SYNCHRONIZED*)
    1.78    Multithreading.sync_wait NONE NONE cond lock;
    1.79  
    1.80 @@ -160,12 +178,7 @@
    1.81              Exn.capture (fn () =>
    1.82                Multithreading.with_attributes Multithreading.private_interrupts (fn _ => e ())) ()
    1.83            else Exn.Exn Exn.Interrupt;
    1.84 -        val _ = Synchronized.assign result (K (SOME res));
    1.85 -      in
    1.86 -        (case res of
    1.87 -          Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
    1.88 -        | Exn.Result _ => true)
    1.89 -      end;
    1.90 +      in assign_result group result res end;
    1.91    in (result, job) end;
    1.92  
    1.93  fun do_cancel group = (*requires SYNCHRONIZED*)
    1.94 @@ -247,7 +260,7 @@
    1.95        if tick andalso ! status_ticks = 0 then
    1.96          Multithreading.tracing 1 (fn () =>
    1.97            let
    1.98 -            val {ready, pending, running} = Task_Queue.status (! queue);
    1.99 +            val {ready, pending, running, passive} = Task_Queue.status (! queue);
   1.100              val total = length (! workers);
   1.101              val active = count_workers Working;
   1.102              val waiting = count_workers Waiting;
   1.103 @@ -255,7 +268,8 @@
   1.104              "SCHEDULE " ^ Time.toString now ^ ": " ^
   1.105                string_of_int ready ^ " ready, " ^
   1.106                string_of_int pending ^ " pending, " ^
   1.107 -              string_of_int running ^ " running; " ^
   1.108 +              string_of_int running ^ " running, " ^
   1.109 +              string_of_int passive ^ " passive; " ^
   1.110                string_of_int total ^ " workers, " ^
   1.111                string_of_int active ^ " active, " ^
   1.112                string_of_int waiting ^ " waiting "
   1.113 @@ -329,7 +343,7 @@
   1.114  
   1.115      (* shutdown *)
   1.116  
   1.117 -    val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
   1.118 +    val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
   1.119      val continue = not (! do_shutdown andalso null (! workers));
   1.120      val _ = if continue then () else scheduler := NONE;
   1.121  
   1.122 @@ -364,8 +378,8 @@
   1.123    let
   1.124      val group =
   1.125        (case opt_group of
   1.126 -        SOME group => group
   1.127 -      | NONE => Task_Queue.new_group (worker_group ()));
   1.128 +        NONE => new_group ()
   1.129 +      | SOME group => group);
   1.130      val (result, job) = future_job group e;
   1.131      val task = SYNCHRONIZED "enqueue" (fn () =>
   1.132        let
   1.133 @@ -374,7 +388,7 @@
   1.134          val _ = if minimal then signal work_available else ();
   1.135          val _ = scheduler_check ();
   1.136        in task end);
   1.137 -  in Future {task = task, group = group, result = result} end;
   1.138 +  in Future {promised = false, task = task, group = group, result = result} end;
   1.139  
   1.140  fun fork_group group e = fork_future (SOME group) [] 0 e;
   1.141  fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e;
   1.142 @@ -432,7 +446,14 @@
   1.143  fun join x = Exn.release (join_result x);
   1.144  
   1.145  
   1.146 -(* map *)
   1.147 +(* fast-path versions -- bypassing full task management *)
   1.148 +
   1.149 +fun value (x: 'a) =
   1.150 +  let
   1.151 +    val group = Task_Queue.new_group NONE;
   1.152 +    val result = Synchronized.var "value" NONE : 'a Exn.result option Synchronized.var;
   1.153 +    val _ = assign_result group result (Exn.Result x);
   1.154 +  in Future {promised = false, task = Task_Queue.dummy_task, group = group, result = result} end;
   1.155  
   1.156  fun map_future f x =
   1.157    let
   1.158 @@ -445,11 +466,32 @@
   1.159          SOME queue' => (queue := queue'; true)
   1.160        | NONE => false));
   1.161    in
   1.162 -    if extended then Future {task = task, group = group, result = result}
   1.163 +    if extended then Future {promised = false, task = task, group = group, result = result}
   1.164      else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
   1.165    end;
   1.166  
   1.167  
   1.168 +(* promised futures -- fulfilled by external means *)
   1.169 +
   1.170 +fun promise_group group : 'a future =
   1.171 +  let
   1.172 +    val result = Synchronized.var "promise" (NONE: 'a Exn.result option);
   1.173 +    val task = SYNCHRONIZED "enqueue" (fn () =>
   1.174 +      Unsynchronized.change_result queue (Task_Queue.enqueue_passive group));
   1.175 +  in Future {promised = true, task = task, group = group, result = result} end;
   1.176 +
   1.177 +fun promise () = promise_group (new_group ());
   1.178 +
   1.179 +fun fulfill_result (Future {promised, task, group, result}) res =
   1.180 +  let
   1.181 +    val _ = promised orelse raise Fail "Not a promised future";
   1.182 +    fun job ok = assign_result group result (if ok then res else Exn.Exn Exn.Interrupt);
   1.183 +    val _ = execute (task, group, [job]);
   1.184 +  in () end;
   1.185 +
   1.186 +fun fulfill x res = fulfill_result x (Exn.Result res);
   1.187 +
   1.188 +
   1.189  (* cancellation *)
   1.190  
   1.191  fun interruptible_task f x =
   1.192 @@ -472,7 +514,7 @@
   1.193    if Multithreading.available then
   1.194      SYNCHRONIZED "shutdown" (fn () =>
   1.195       while scheduler_active () do
   1.196 -      (wait scheduler_event; broadcast_work ()))
   1.197 +      (raw_wait scheduler_event; broadcast_work ()))
   1.198    else ();
   1.199  
   1.200  
     2.1 --- a/src/Pure/Concurrent/task_queue.ML	Tue Jan 05 18:20:18 2010 +0100
     2.2 +++ b/src/Pure/Concurrent/task_queue.ML	Tue Jan 05 23:38:10 2010 +0100
     2.3 @@ -7,7 +7,7 @@
     2.4  signature TASK_QUEUE =
     2.5  sig
     2.6    type task
     2.7 -  val new_task: int -> task
     2.8 +  val dummy_task: task
     2.9    val pri_of_task: task -> int
    2.10    val str_of_task: task -> string
    2.11    type group
    2.12 @@ -20,10 +20,11 @@
    2.13    val str_of_group: group -> string
    2.14    type queue
    2.15    val empty: queue
    2.16 -  val is_empty: queue -> bool
    2.17 -  val status: queue -> {ready: int, pending: int, running: int}
    2.18 +  val all_passive: queue -> bool
    2.19 +  val status: queue -> {ready: int, pending: int, running: int, passive: int}
    2.20    val cancel: queue -> group -> bool
    2.21    val cancel_all: queue -> group list
    2.22 +  val enqueue_passive: group -> queue -> task * queue
    2.23    val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue
    2.24    val extend: task -> (bool -> bool) -> queue -> queue option
    2.25    val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue
    2.26 @@ -38,13 +39,14 @@
    2.27  
    2.28  (* tasks *)
    2.29  
    2.30 -datatype task = Task of int * serial;
    2.31 +datatype task = Task of int option * serial;
    2.32 +val dummy_task = Task (NONE, ~1);
    2.33  fun new_task pri = Task (pri, serial ());
    2.34  
    2.35 -fun pri_of_task (Task (pri, _)) = pri;
    2.36 +fun pri_of_task (Task (pri, _)) = the_default 0 pri;
    2.37  fun str_of_task (Task (_, i)) = string_of_int i;
    2.38  
    2.39 -fun task_ord (Task t1, Task t2) = prod_ord (rev_order o int_ord) int_ord (t1, t2);
    2.40 +fun task_ord (Task t1, Task t2) = prod_ord (rev_order o option_ord int_ord) int_ord (t1, t2);
    2.41  structure Task_Graph = Graph(type key = task val ord = task_ord);
    2.42  
    2.43  
    2.44 @@ -91,7 +93,8 @@
    2.45  
    2.46  datatype job =
    2.47    Job of (bool -> bool) list |
    2.48 -  Running of Thread.thread;
    2.49 +  Running of Thread.thread |
    2.50 +  Passive;
    2.51  
    2.52  type jobs = (group * job) Task_Graph.T;
    2.53  
    2.54 @@ -123,20 +126,24 @@
    2.55  fun make_queue groups jobs cache = Queue {groups = groups, jobs = jobs, cache = cache};
    2.56  
    2.57  val empty = make_queue Inttab.empty Task_Graph.empty No_Result;
    2.58 -fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs;
    2.59 +
    2.60 +fun all_passive (Queue {jobs, ...}) =
    2.61 +  Task_Graph.get_first NONE
    2.62 +    ((fn Job _ => SOME () | Running _ => SOME () | Passive => NONE) o #2 o #1 o #2) jobs |> is_none;
    2.63  
    2.64  
    2.65  (* queue status *)
    2.66  
    2.67  fun status (Queue {jobs, ...}) =
    2.68    let
    2.69 -    val (x, y, z) =
    2.70 -      Task_Graph.fold (fn (_, ((_, job), (deps, _))) => fn (x, y, z) =>
    2.71 +    val (x, y, z, w) =
    2.72 +      Task_Graph.fold (fn (_, ((_, job), (deps, _))) => fn (x, y, z, w) =>
    2.73            (case job of
    2.74 -            Job _ => if null deps then (x + 1, y, z) else (x, y + 1, z)
    2.75 -          | Running _ => (x, y, z + 1)))
    2.76 -        jobs (0, 0, 0);
    2.77 -  in {ready = x, pending = y, running = z} end;
    2.78 +            Job _ => if null deps then (x + 1, y, z, w) else (x, y + 1, z, w)
    2.79 +          | Running _ => (x, y, z + 1, w)
    2.80 +          | Passive => (x, y, z, w + 1)))
    2.81 +        jobs (0, 0, 0, 0);
    2.82 +  in {ready = x, pending = y, running = z, passive = w} end;
    2.83  
    2.84  
    2.85  (* cancel -- peers and sub-groups *)
    2.86 @@ -154,18 +161,27 @@
    2.87    let
    2.88      fun cancel_job (group, job) (groups, running) =
    2.89        (cancel_group group Exn.Interrupt;
    2.90 -        (case job of Running t => (insert eq_group group groups, insert Thread.equal t running)
    2.91 +        (case job of
    2.92 +          Running t => (insert eq_group group groups, insert Thread.equal t running)
    2.93          | _ => (groups, running)));
    2.94 -    val (groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
    2.95 +    val (running_groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
    2.96      val _ = List.app SimpleThread.interrupt running;
    2.97 -  in groups end;
    2.98 +  in running_groups end;
    2.99  
   2.100  
   2.101  (* enqueue *)
   2.102  
   2.103 +fun enqueue_passive group (Queue {groups, jobs, cache}) =
   2.104 +  let
   2.105 +    val task = new_task NONE;
   2.106 +    val groups' = groups
   2.107 +      |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
   2.108 +    val jobs' = jobs |> Task_Graph.new_node (task, (group, Passive));
   2.109 +  in (task, make_queue groups' jobs' cache) end;
   2.110 +
   2.111  fun enqueue group deps pri job (Queue {groups, jobs, cache}) =
   2.112    let
   2.113 -    val task = new_task pri;
   2.114 +    val task = new_task (SOME pri);
   2.115      val groups' = groups
   2.116        |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
   2.117      val jobs' = jobs