src/Pure/Concurrent/future.ML
author wenzelm
Mon, 27 Jul 2009 15:53:43 +0200
changeset 32226 23511e4da055
parent 32225 d5d6f47fb018
child 32227 a7e901209005
permissions -rw-r--r--
tuned tracing;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Future values.
     5 
     6 Notes:
     7 
     8   * Futures are similar to delayed evaluation, i.e. delay/force is
     9     generalized to fork/join (and variants).  The idea is to model
    10     parallel value-oriented computations, but *not* communicating
    11     processes.
    12 
    13   * Futures are grouped; failure of one group member causes the whole
    14     group to be interrupted eventually.  Groups are block-structured.
    15 
    16   * Forked futures are evaluated spontaneously by a farm of worker
    17     threads in the background; join resynchronizes the computation and
    18     delivers results (values or exceptions).
    19 
    20   * The pool of worker threads is limited, usually in correlation with
    21     the number of physical cores on the machine.  Note that allocation
    22     of runtime resources is distorted either if workers yield CPU time
    23     (e.g. via system sleep or wait operations), or if non-worker
    24     threads contend for significant runtime resources independently.
    25 *)
    26 
    27 signature FUTURE =
    28 sig
    29   val enabled: unit -> bool
    30   type task = Task_Queue.task
    31   type group = Task_Queue.group
    32   val is_worker: unit -> bool
    33   val worker_group: unit -> Task_Queue.group option
    34   type 'a future
    35   val task_of: 'a future -> task
    36   val group_of: 'a future -> group
    37   val peek: 'a future -> 'a Exn.result option
    38   val is_finished: 'a future -> bool
    39   val value: 'a -> 'a future
    40   val fork: (unit -> 'a) -> 'a future
    41   val fork_group: group -> (unit -> 'a) -> 'a future
    42   val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
    43   val fork_pri: int -> (unit -> 'a) -> 'a future
    44   val join_results: 'a future list -> 'a Exn.result list
    45   val join_result: 'a future -> 'a Exn.result
    46   val join: 'a future -> 'a
    47   val map: ('a -> 'b) -> 'a future -> 'b future
    48   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    49   val cancel_group: group -> unit
    50   val cancel: 'a future -> unit
    51   val shutdown: unit -> unit
    52 end;
    53 
    54 structure Future: FUTURE =
    55 struct
    56 
    57 (** future values **)
    58 
    59 fun enabled () =
    60   Multithreading.enabled () andalso
    61     not (Multithreading.self_critical ());
    62 
    63 
    64 (* identifiers *)
    65 
    66 type task = Task_Queue.task;
    67 type group = Task_Queue.group;
    68 
    69 local
    70   val tag = Universal.tag () : (string * task * group) option Universal.tag;
    71 in
    72   fun thread_data () = the_default NONE (Thread.getLocal tag);
    73   fun setmp_thread_data data f x =
    74     Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
    75 end;
    76 
    77 val is_worker = is_some o thread_data;
    78 val worker_group = Option.map #3 o thread_data;
    79 
    80 
    81 (* datatype future *)
    82 
    83 datatype 'a future = Future of
    84  {task: task,
    85   group: group,
    86   result: 'a Exn.result option ref};
    87 
    88 fun task_of (Future {task, ...}) = task;
    89 fun group_of (Future {group, ...}) = group;
    90 
    91 fun peek (Future {result, ...}) = ! result;
    92 fun is_finished x = is_some (peek x);
    93 
    94 fun value x = Future
    95  {task = Task_Queue.new_task 0,
    96   group = Task_Queue.new_group NONE,
    97   result = ref (SOME (Exn.Result x))};
    98 
    99 
   100 
   101 (** scheduling **)
   102 
   103 (* global state *)
   104 
   105 val queue = ref Task_Queue.empty;
   106 val next = ref 0;
   107 val workers = ref ([]: (Thread.thread * bool) list);
   108 val scheduler = ref (NONE: Thread.thread option);
   109 val excessive = ref 0;
   110 val canceled = ref ([]: Task_Queue.group list);
   111 val do_shutdown = ref false;
   112 
   113 
   114 (* synchronization *)
   115 
   116 val scheduler_event = ConditionVar.conditionVar ();
   117 val work_available = ConditionVar.conditionVar ();
   118 val work_finished = ConditionVar.conditionVar ();
   119 
   120 local
   121   val lock = Mutex.mutex ();
   122 in
   123 
   124 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
   125 
   126 fun wait cond = (*requires SYNCHRONIZED*)
   127   ConditionVar.wait (cond, lock);
   128 
   129 fun wait_timeout cond timeout = (*requires SYNCHRONIZED*)
   130   ignore (ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)));
   131 
   132 fun signal cond = (*requires SYNCHRONIZED*)
   133   ConditionVar.signal cond;
   134 
   135 fun broadcast cond = (*requires SYNCHRONIZED*)
   136   ConditionVar.broadcast cond;
   137 
   138 fun broadcast_all () = (*requires SYNCHRONIZED*)
   139  (ConditionVar.broadcast scheduler_event;
   140   ConditionVar.broadcast work_available;
   141   ConditionVar.broadcast work_finished);
   142 
   143 end;
   144 
   145 
   146 (* worker activity *)
   147 
   148 fun count_active ws =
   149   fold (fn (_, active) => fn i => if active then i + 1 else i) ws 0;
   150 
   151 fun change_active active = (*requires SYNCHRONIZED*)
   152   change workers (AList.update Thread.equal (Thread.self (), active));
   153 
   154 fun overloaded () =
   155   count_active (! workers) > Multithreading.max_threads_value ();
   156 
   157 
   158 (* execute future jobs *)
   159 
   160 fun future_job group (e: unit -> 'a) =
   161   let
   162     val result = ref (NONE: 'a Exn.result option);
   163     fun job ok =
   164       let
   165         val res =
   166           if ok then
   167             Exn.capture
   168               (Multithreading.with_attributes Multithreading.restricted_interrupts
   169                 (fn _ => fn () => e ())) ()
   170           else Exn.Exn Exn.Interrupt;
   171         val _ = result := SOME res;
   172       in
   173         (case res of
   174           Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
   175         | Exn.Result _ => true)
   176       end;
   177   in (result, job) end;
   178 
   179 fun do_cancel group = (*requires SYNCHRONIZED*)
   180  (change canceled (insert Task_Queue.eq_group group); broadcast scheduler_event);
   181 
   182 fun execute name (task, group, jobs) =
   183   let
   184     val valid = not (Task_Queue.is_canceled group);
   185     val ok = setmp_thread_data (name, task, group) (fn () =>
   186       fold (fn job => fn ok => job valid andalso ok) jobs true) ();
   187     val _ = SYNCHRONIZED "execute" (fn () =>
   188       let
   189         val maximal = change_result queue (Task_Queue.finish task);
   190         val _ =
   191           if ok then ()
   192           else if Task_Queue.cancel (! queue) group then ()
   193           else do_cancel group;
   194         val _ = broadcast work_finished;
   195         val _ = if maximal then () else broadcast work_available;
   196       in () end);
   197   in () end;
   198 
   199 
   200 (* worker threads *)
   201 
   202 fun worker_wait cond = (*requires SYNCHRONIZED*)
   203  (change_active false; broadcast scheduler_event;
   204   wait cond;
   205   change_active true; broadcast scheduler_event);
   206 
   207 fun worker_next () = (*requires SYNCHRONIZED*)
   208   if ! excessive > 0 then
   209     (dec excessive;
   210      change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
   211      broadcast scheduler_event;
   212      NONE)
   213   else if overloaded () then (worker_wait scheduler_event; worker_next ())
   214   else
   215     (case change_result queue Task_Queue.dequeue of
   216       NONE => (worker_wait work_available; worker_next ())
   217     | some => some);
   218 
   219 fun worker_loop name =
   220   (case SYNCHRONIZED name (fn () => worker_next ()) of
   221     NONE => ()
   222   | SOME work => (execute name work; worker_loop name));
   223 
   224 fun worker_start name = (*requires SYNCHRONIZED*)
   225   change workers (cons (SimpleThread.fork false (fn () => worker_loop name), true));
   226 
   227 
   228 (* scheduler *)
   229 
   230 val last_status = ref Time.zeroTime;
   231 val next_status = Time.fromMilliseconds 450;
   232 
   233 fun scheduler_next () = (*requires SYNCHRONIZED*)
   234   let
   235     (*queue and worker status*)
   236     val _ =
   237       let val now = Time.now () in
   238         if Time.> (Time.+ (! last_status, next_status), now) then ()
   239         else
   240          (last_status := now; Multithreading.tracing 1 (fn () =>
   241             let
   242               val {ready, pending, running} = Task_Queue.status (! queue);
   243               val total = length (! workers);
   244               val active = count_active (! workers);
   245             in
   246               "SCHEDULE: " ^
   247                 string_of_int ready ^ " ready, " ^
   248                 string_of_int pending ^ " pending, " ^
   249                 string_of_int running ^ " running; " ^
   250                 string_of_int total ^ " workers, " ^
   251                 string_of_int active ^ " active"
   252             end))
   253       end;
   254 
   255     (*worker threads*)
   256     val _ =
   257       if forall (Thread.isActive o #1) (! workers) then ()
   258       else
   259         (case List.partition (Thread.isActive o #1) (! workers) of
   260           (_, []) => ()
   261         | (alive, dead) =>
   262             (workers := alive; Multithreading.tracing 0 (fn () =>
   263               "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")));
   264 
   265     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   266     val mm = (m * 3) div 2;
   267     val l = length (! workers);
   268     val _ = excessive := l - mm;
   269     val _ =
   270       if mm > l then
   271        (funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ();
   272         broadcast scheduler_event)
   273       else ();
   274 
   275     (*canceled groups*)
   276     val _ =
   277       if null (! canceled) then ()
   278       else (change canceled (filter_out (Task_Queue.cancel (! queue))); broadcast_all ());
   279 
   280     val timeout =
   281       Time.fromMilliseconds (if not (! do_shutdown) andalso null (! canceled) then 500 else 50);
   282     val _ = interruptible (fn () => wait_timeout scheduler_event timeout) ()
   283       handle Exn.Interrupt => List.app do_cancel (Task_Queue.cancel_all (! queue));
   284 
   285     (*shutdown*)
   286     val continue = not (! do_shutdown andalso null (! workers));
   287     val _ = if continue then () else scheduler := NONE;
   288     val _ = broadcast scheduler_event;
   289   in continue end;
   290 
   291 fun scheduler_loop () =
   292   while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ();
   293 
   294 fun scheduler_active () = (*requires SYNCHRONIZED*)
   295   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   296 
   297 fun scheduler_check name = SYNCHRONIZED name (fn () =>
   298   if not (scheduler_active ()) then
   299    (do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop);
   300     broadcast scheduler_event)
   301   else if ! do_shutdown then error "Scheduler shutdown in progress"
   302   else ());
   303 
   304 
   305 
   306 (** futures **)
   307 
   308 (* fork *)
   309 
   310 fun fork_future opt_group deps pri e =
   311   let
   312     val _ = scheduler_check "future check";
   313 
   314     val group =
   315       (case opt_group of
   316         SOME group => group
   317       | NONE => Task_Queue.new_group (worker_group ()));
   318     val (result, job) = future_job group e;
   319     val task = SYNCHRONIZED "future" (fn () =>
   320       let
   321         val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job);
   322         val _ = if minimal then signal work_available else ();
   323       in task end);
   324   in Future {task = task, group = group, result = result} end;
   325 
   326 fun fork e = fork_future NONE [] 0 e;
   327 fun fork_group group e = fork_future (SOME group) [] 0 e;
   328 fun fork_deps deps e = fork_future NONE (map task_of deps) 0 e;
   329 fun fork_pri pri e = fork_future NONE [] pri e;
   330 
   331 
   332 (* join *)
   333 
   334 local
   335 
   336 fun get_result x =
   337   (case peek x of
   338     NONE => Exn.Exn (SYS_ERROR "unfinished future")
   339   | SOME (Exn.Exn Exn.Interrupt) =>
   340       Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
   341   | SOME res => res);
   342 
   343 fun join_wait x =
   344   if SYNCHRONIZED "join_wait" (fn () =>
   345     is_finished x orelse (wait work_finished; false))
   346   then () else join_wait x;
   347 
   348 fun join_next deps = (*requires SYNCHRONIZED*)
   349   if null deps then NONE
   350   else if overloaded () then (worker_wait scheduler_event; join_next deps)
   351   else
   352     (case change_result queue (Task_Queue.dequeue_towards deps) of
   353       (NONE, []) => NONE
   354     | (NONE, deps') => (worker_wait work_finished; join_next deps')
   355     | (SOME work, deps') => SOME (work, deps'));
   356 
   357 fun join_work deps =
   358   (case SYNCHRONIZED "join" (fn () => join_next deps) of
   359     NONE => ()
   360   | SOME (work, deps') => (execute "join" work; join_work deps'));
   361 
   362 in
   363 
   364 fun join_results xs =
   365   if forall is_finished xs then map get_result xs
   366   else uninterruptible (fn _ => fn () =>
   367     let
   368       val _ = scheduler_check "join check";
   369       val _ = Multithreading.self_critical () andalso
   370         error "Cannot join future values within critical section";
   371       val _ =
   372         if is_worker () then join_work (map task_of xs)
   373         else List.app join_wait xs;
   374     in map get_result xs end) ();
   375 
   376 end;
   377 
   378 fun join_result x = singleton join_results x;
   379 fun join x = Exn.release (join_result x);
   380 
   381 
   382 (* map *)
   383 
   384 fun map_future f x =
   385   let
   386     val _ = scheduler_check "map_future check";
   387 
   388     val task = task_of x;
   389     val group = Task_Queue.new_group (SOME (group_of x));
   390     val (result, job) = future_job group (fn () => f (join x));
   391 
   392     val extended = SYNCHRONIZED "map_future" (fn () =>
   393       (case Task_Queue.extend task job (! queue) of
   394         SOME queue' => (queue := queue'; true)
   395       | NONE => false));
   396   in
   397     if extended then Future {task = task, group = group, result = result}
   398     else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
   399   end;
   400 
   401 
   402 (* cancellation *)
   403 
   404 fun interruptible_task f x =
   405   if Multithreading.available then
   406     Multithreading.with_attributes
   407       (if is_worker ()
   408        then Multithreading.restricted_interrupts
   409        else Multithreading.regular_interrupts)
   410       (fn _ => f) x
   411   else interruptible f x;
   412 
   413 (*cancel_group: present and future group members will be interrupted eventually*)
   414 fun cancel_group group =
   415  (scheduler_check "cancel check";
   416   SYNCHRONIZED "cancel" (fn () => do_cancel group));
   417 
   418 fun cancel x = cancel_group (group_of x);
   419 
   420 
   421 (** global join and shutdown **)
   422 
   423 fun shutdown () =
   424   if Multithreading.available then
   425    (scheduler_check "shutdown check";
   426     SYNCHRONIZED "shutdown" (fn () =>
   427      (while not (scheduler_active ()) do wait scheduler_event;
   428       while not (Task_Queue.is_empty (! queue)) do wait scheduler_event;
   429       do_shutdown := true;
   430       while scheduler_active () do (broadcast_all (); wait scheduler_event))))
   431   else ();
   432 
   433 
   434 (*final declarations of this structure!*)
   435 val map = map_future;
   436 
   437 end;
   438 
   439 type 'a future = 'a Future.future;
   440