src/Pure/Concurrent/future.ML
author wenzelm
Tue, 28 Jul 2009 15:10:15 +0200
changeset 32257 d9def420c84e
parent 32253 3e48bf962e05
child 32259 d302f1c9e356
permissions -rw-r--r--
future result: Synchronized.var;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Future values, see also
     5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
     6 
     7 Notes:
     8 
     9   * Futures are similar to delayed evaluation, i.e. delay/force is
    10     generalized to fork/join (and variants).  The idea is to model
    11     parallel value-oriented computations, but *not* communicating
    12     processes.
    13 
    14   * Futures are grouped; failure of one group member causes the whole
    15     group to be interrupted eventually.  Groups are block-structured.
    16 
    17   * Forked futures are evaluated spontaneously by a farm of worker
    18     threads in the background; join resynchronizes the computation and
    19     delivers results (values or exceptions).
    20 
    21   * The pool of worker threads is limited, usually in correlation with
    22     the number of physical cores on the machine.  Note that allocation
    23     of runtime resources is distorted either if workers yield CPU time
    24     (e.g. via system sleep or wait operations), or if non-worker
    25     threads contend for significant runtime resources independently.
    26 *)
    27 
    28 signature FUTURE =
    29 sig
    30   val enabled: unit -> bool
    31   type task = Task_Queue.task
    32   type group = Task_Queue.group
    33   val is_worker: unit -> bool
    34   val worker_group: unit -> Task_Queue.group option
    35   type 'a future
    36   val task_of: 'a future -> task
    37   val group_of: 'a future -> group
    38   val peek: 'a future -> 'a Exn.result option
    39   val is_finished: 'a future -> bool
    40   val value: 'a -> 'a future
    41   val fork: (unit -> 'a) -> 'a future
    42   val fork_group: group -> (unit -> 'a) -> 'a future
    43   val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
    44   val fork_pri: int -> (unit -> 'a) -> 'a future
    45   val join_results: 'a future list -> 'a Exn.result list
    46   val join_result: 'a future -> 'a Exn.result
    47   val join: 'a future -> 'a
    48   val map: ('a -> 'b) -> 'a future -> 'b future
    49   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    50   val cancel_group: group -> unit
    51   val cancel: 'a future -> unit
    52   val shutdown: unit -> unit
    53 end;
    54 
    55 structure Future: FUTURE =
    56 struct
    57 
    58 (** future values **)
    59 
    60 fun enabled () =
    61   Multithreading.enabled () andalso
    62     not (Multithreading.self_critical ());
    63 
    64 
    65 (* identifiers *)
    66 
    67 type task = Task_Queue.task;
    68 type group = Task_Queue.group;
    69 
    70 local
    71   val tag = Universal.tag () : (string * task * group) option Universal.tag;
    72 in
    73   fun thread_data () = the_default NONE (Thread.getLocal tag);
    74   fun setmp_thread_data data f x =
    75     Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
    76 end;
    77 
    78 val is_worker = is_some o thread_data;
    79 val worker_group = Option.map #3 o thread_data;
    80 
    81 
    82 (* datatype future *)
    83 
    84 datatype 'a future = Future of
    85  {task: task,
    86   group: group,
    87   result: 'a Exn.result option Synchronized.var};
    88 
    89 fun task_of (Future {task, ...}) = task;
    90 fun group_of (Future {group, ...}) = group;
    91 fun result_of (Future {result, ...}) = result;
    92 
    93 fun peek x = Synchronized.peek (result_of x);
    94 fun is_finished x = is_some (peek x);
    95 
    96 fun value x = Future
    97  {task = Task_Queue.new_task 0,
    98   group = Task_Queue.new_group NONE,
    99   result = Synchronized.var "future" (SOME (Exn.Result x))};
   100 
   101 
   102 
   103 (** scheduling **)
   104 
   105 (* global state *)
   106 
   107 val queue = ref Task_Queue.empty;
   108 val next = ref 0;
   109 val workers = ref ([]: (Thread.thread * bool) list);
   110 val scheduler = ref (NONE: Thread.thread option);
   111 val excessive = ref 0;
   112 val canceled = ref ([]: Task_Queue.group list);
   113 val do_shutdown = ref false;
   114 
   115 
   116 (* synchronization *)
   117 
   118 val scheduler_event = ConditionVar.conditionVar ();
   119 val work_available = ConditionVar.conditionVar ();
   120 val work_finished = ConditionVar.conditionVar ();
   121 
   122 local
   123   val lock = Mutex.mutex ();
   124 in
   125 
   126 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
   127 
   128 fun wait cond = (*requires SYNCHRONIZED*)
   129   ConditionVar.wait (cond, lock) handle Exn.Interrupt => ();
   130 
   131 fun wait_interruptible cond timeout = (*requires SYNCHRONIZED*)
   132   interruptible (fn () =>
   133     ignore (ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)))) ();
   134 
   135 fun signal cond = (*requires SYNCHRONIZED*)
   136   ConditionVar.signal cond;
   137 
   138 fun broadcast cond = (*requires SYNCHRONIZED*)
   139   ConditionVar.broadcast cond;
   140 
   141 fun broadcast_work () = (*requires SYNCHRONIZED*)
   142  (ConditionVar.broadcast work_available;
   143   ConditionVar.broadcast work_finished);
   144 
   145 end;
   146 
   147 
   148 (* execute future jobs *)
   149 
   150 fun future_job group (e: unit -> 'a) =
   151   let
   152     val result = Synchronized.var "future" (NONE: 'a Exn.result option);
   153     fun job ok =
   154       let
   155         val res =
   156           if ok then
   157             Exn.capture (fn () =>
   158              (Thread.testInterrupt ();
   159               Multithreading.with_attributes Multithreading.restricted_interrupts
   160                 (fn _ => fn () => e ())) ()) ()
   161           else Exn.Exn Exn.Interrupt;
   162         val _ = Synchronized.change result (K (SOME res));
   163       in
   164         (case res of
   165           Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
   166         | Exn.Result _ => true)
   167       end;
   168   in (result, job) end;
   169 
   170 fun do_cancel group = (*requires SYNCHRONIZED*)
   171  (change canceled (insert Task_Queue.eq_group group); broadcast scheduler_event);
   172 
   173 fun execute name (task, group, jobs) =
   174   let
   175     val valid = not (Task_Queue.is_canceled group);
   176     val ok = setmp_thread_data (name, task, group) (fn () =>
   177       fold (fn job => fn ok => job valid andalso ok) jobs true) ();
   178     val _ = SYNCHRONIZED "finish" (fn () =>
   179       let
   180         val maximal = change_result queue (Task_Queue.finish task);
   181         val _ =
   182           if ok then ()
   183           else if Task_Queue.cancel (! queue) group then ()
   184           else do_cancel group;
   185         val _ = broadcast work_finished;
   186         val _ = if maximal then () else broadcast work_available;
   187       in () end);
   188   in () end;
   189 
   190 
   191 (* worker activity *)
   192 
   193 fun count_active () = (*requires SYNCHRONIZED*)
   194   fold (fn (_, active) => fn i => if active then i + 1 else i) (! workers) 0;
   195 
   196 fun change_active active = (*requires SYNCHRONIZED*)
   197   change workers (AList.update Thread.equal (Thread.self (), active));
   198 
   199 
   200 (* worker threads *)
   201 
   202 fun worker_wait cond = (*requires SYNCHRONIZED*)
   203  (change_active false; wait cond; change_active true);
   204 
   205 fun worker_next () = (*requires SYNCHRONIZED*)
   206   if ! excessive > 0 then
   207     (dec excessive;
   208      change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
   209      broadcast scheduler_event;
   210      NONE)
   211   else if count_active () > Multithreading.max_threads_value () then
   212     (worker_wait scheduler_event; worker_next ())
   213   else
   214     (case change_result queue (Task_Queue.dequeue (Thread.self ())) of
   215       NONE => (worker_wait work_available; worker_next ())
   216     | some => some);
   217 
   218 fun worker_loop name =
   219   (case SYNCHRONIZED name (fn () => worker_next ()) of
   220     NONE => ()
   221   | SOME work => (execute name work; worker_loop name));
   222 
   223 fun worker_start name = (*requires SYNCHRONIZED*)
   224   change workers (cons (SimpleThread.fork false (fn () =>
   225      (broadcast scheduler_event; worker_loop name)), true));
   226 
   227 
   228 (* scheduler *)
   229 
   230 val last_status = ref Time.zeroTime;
   231 val next_status = Time.fromMilliseconds 500;
   232 val next_round = Time.fromMilliseconds 50;
   233 
   234 fun scheduler_next () = (*requires SYNCHRONIZED*)
   235   let
   236     (*queue and worker status*)
   237     val _ =
   238       let val now = Time.now () in
   239         if Time.> (Time.+ (! last_status, next_status), now) then ()
   240         else
   241          (last_status := now; Multithreading.tracing 1 (fn () =>
   242             let
   243               val {ready, pending, running} = Task_Queue.status (! queue);
   244               val total = length (! workers);
   245               val active = count_active ();
   246             in
   247               "SCHEDULE: " ^
   248                 string_of_int ready ^ " ready, " ^
   249                 string_of_int pending ^ " pending, " ^
   250                 string_of_int running ^ " running; " ^
   251                 string_of_int total ^ " workers, " ^
   252                 string_of_int active ^ " active"
   253             end))
   254       end;
   255 
   256     (*worker threads*)
   257     val _ =
   258       if forall (Thread.isActive o #1) (! workers) then ()
   259       else
   260         (case List.partition (Thread.isActive o #1) (! workers) of
   261           (_, []) => ()
   262         | (alive, dead) =>
   263             (workers := alive; Multithreading.tracing 0 (fn () =>
   264               "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")));
   265 
   266     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   267     val mm = (m * 3) div 2;
   268     val l = length (! workers);
   269     val _ = excessive := l - mm;
   270     val _ =
   271       if mm > l then
   272         funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ()
   273       else ();
   274 
   275     (*canceled groups*)
   276     val _ =
   277       if null (! canceled) then ()
   278       else (change canceled (filter_out (Task_Queue.cancel (! queue))); broadcast_work ());
   279 
   280     (*delay loop*)
   281     val _ = wait_interruptible scheduler_event next_round
   282       handle Exn.Interrupt =>
   283         (Multithreading.tracing 1 (fn () => "Interrupt");
   284           List.app do_cancel (Task_Queue.cancel_all (! queue)));
   285 
   286     (*shutdown*)
   287     val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
   288     val continue = not (! do_shutdown andalso null (! workers));
   289     val _ = if continue then () else scheduler := NONE;
   290     val _ = broadcast scheduler_event;
   291   in continue end;
   292 
   293 fun scheduler_loop () =
   294   while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ();
   295 
   296 fun scheduler_active () = (*requires SYNCHRONIZED*)
   297   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   298 
   299 fun scheduler_check () = (*requires SYNCHRONIZED*)
   300  (do_shutdown := false;
   301   if scheduler_active () then ()
   302   else scheduler := SOME (SimpleThread.fork false scheduler_loop));
   303 
   304 
   305 
   306 (** futures **)
   307 
   308 (* fork *)
   309 
   310 fun fork_future opt_group deps pri e =
   311   let
   312     val group =
   313       (case opt_group of
   314         SOME group => group
   315       | NONE => Task_Queue.new_group (worker_group ()));
   316     val (result, job) = future_job group e;
   317     val task = SYNCHRONIZED "enqueue" (fn () =>
   318       let
   319         val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job);
   320         val _ = if minimal then signal work_available else ();
   321         val _ = scheduler_check ();
   322       in task end);
   323   in Future {task = task, group = group, result = result} end;
   324 
   325 fun fork e = fork_future NONE [] 0 e;
   326 fun fork_group group e = fork_future (SOME group) [] 0 e;
   327 fun fork_deps deps e = fork_future NONE (map task_of deps) 0 e;
   328 fun fork_pri pri e = fork_future NONE [] pri e;
   329 
   330 
   331 (* join *)
   332 
   333 local
   334 
   335 fun get_result x =
   336   (case peek x of
   337     NONE => Exn.Exn (SYS_ERROR "unfinished future")
   338   | SOME (Exn.Exn Exn.Interrupt) =>
   339       Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
   340   | SOME res => res);
   341 
   342 fun join_wait x =
   343   Synchronized.guarded_access (result_of x) (fn NONE => NONE | some => SOME ((), some));
   344 
   345 fun join_next deps = (*requires SYNCHRONIZED*)
   346   if null deps then NONE
   347   else
   348     (case change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
   349       (NONE, []) => NONE
   350     | (NONE, deps') => (worker_wait work_finished; join_next deps')
   351     | (SOME work, deps') => SOME (work, deps'));
   352 
   353 fun join_work deps =
   354   (case SYNCHRONIZED "join" (fn () => join_next deps) of
   355     NONE => ()
   356   | SOME (work, deps') => (execute "join" work; join_work deps'));
   357 
   358 in
   359 
   360 fun join_results xs =
   361   if forall is_finished xs then map get_result xs
   362   else if Multithreading.self_critical () then
   363     error "Cannot join future values within critical section"
   364   else uninterruptible (fn _ => fn () =>
   365      (if is_worker ()
   366       then join_work (map task_of xs)
   367       else List.app join_wait xs;
   368       map get_result xs)) ();
   369 
   370 end;
   371 
   372 fun join_result x = singleton join_results x;
   373 fun join x = Exn.release (join_result x);
   374 
   375 
   376 (* map *)
   377 
   378 fun map_future f x =
   379   let
   380     val task = task_of x;
   381     val group = Task_Queue.new_group (SOME (group_of x));
   382     val (result, job) = future_job group (fn () => f (join x));
   383 
   384     val extended = SYNCHRONIZED "extend" (fn () =>
   385       (case Task_Queue.extend task job (! queue) of
   386         SOME queue' => (queue := queue'; true)
   387       | NONE => false));
   388   in
   389     if extended then Future {task = task, group = group, result = result}
   390     else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
   391   end;
   392 
   393 
   394 (* cancellation *)
   395 
   396 fun interruptible_task f x =
   397   if Multithreading.available then
   398    (Thread.testInterrupt ();
   399     Multithreading.with_attributes
   400       (if is_worker ()
   401        then Multithreading.restricted_interrupts
   402        else Multithreading.regular_interrupts)
   403       (fn _ => fn x => f x) x)
   404   else interruptible f x;
   405 
   406 (*cancel: present and future group members will be interrupted eventually*)
   407 fun cancel_group group = SYNCHRONIZED "cancel" (fn () => do_cancel group);
   408 fun cancel x = cancel_group (group_of x);
   409 
   410 
   411 (* shutdown *)
   412 
   413 fun shutdown () =
   414   if Multithreading.available then
   415     SYNCHRONIZED "shutdown" (fn () =>
   416      while scheduler_active () do
   417       (wait scheduler_event; broadcast_work ()))
   418   else ();
   419 
   420 
   421 (*final declarations of this structure!*)
   422 val map = map_future;
   423 
   424 end;
   425 
   426 type 'a future = 'a Future.future;
   427