src/Pure/Concurrent/future.ML
author wenzelm
Mon, 27 Jul 2009 12:11:18 +0200
changeset 32219 9a2566d1fdbd
parent 32186 8026b73cd357
child 32220 01ff6781dd18
permissions -rw-r--r--
more specific conditions: scheduler_event, work_available, work_finished -- considereably reduces overhead with many threads;
more specific signal vs. broadcast;
execute/finish: more careful notification based on minimal/maximal status;
tuned shutdown;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Future values.
     5 
     6 Notes:
     7 
     8   * Futures are similar to delayed evaluation, i.e. delay/force is
     9     generalized to fork/join (and variants).  The idea is to model
    10     parallel value-oriented computations, but *not* communicating
    11     processes.
    12 
    13   * Futures are grouped; failure of one group member causes the whole
    14     group to be interrupted eventually.
    15 
    16   * Forked futures are evaluated spontaneously by a farm of worker
    17     threads in the background; join resynchronizes the computation and
    18     delivers results (values or exceptions).
    19 
    20   * The pool of worker threads is limited, usually in correlation with
    21     the number of physical cores on the machine.  Note that allocation
    22     of runtime resources is distorted either if workers yield CPU time
    23     (e.g. via system sleep or wait operations), or if non-worker
    24     threads contend for significant runtime resources independently.
    25 *)
    26 
    27 signature FUTURE =
    28 sig
    29   val enabled: unit -> bool
    30   type task = Task_Queue.task
    31   type group = Task_Queue.group
    32   val is_worker: unit -> bool
    33   val worker_group: unit -> Task_Queue.group option
    34   type 'a future
    35   val task_of: 'a future -> task
    36   val group_of: 'a future -> group
    37   val peek: 'a future -> 'a Exn.result option
    38   val is_finished: 'a future -> bool
    39   val value: 'a -> 'a future
    40   val fork: (unit -> 'a) -> 'a future
    41   val fork_group: group -> (unit -> 'a) -> 'a future
    42   val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
    43   val fork_pri: int -> (unit -> 'a) -> 'a future
    44   val join_results: 'a future list -> 'a Exn.result list
    45   val join_result: 'a future -> 'a Exn.result
    46   val join: 'a future -> 'a
    47   val map: ('a -> 'b) -> 'a future -> 'b future
    48   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    49   val interrupt_task: string -> unit
    50   val cancel_group: group -> unit
    51   val cancel: 'a future -> unit
    52   val shutdown: unit -> unit
    53 end;
    54 
    55 structure Future: FUTURE =
    56 struct
    57 
    58 (** future values **)
    59 
    60 fun enabled () =
    61   Multithreading.enabled () andalso
    62     not (Multithreading.self_critical ());
    63 
    64 
    65 (* identifiers *)
    66 
    67 type task = Task_Queue.task;
    68 type group = Task_Queue.group;
    69 
    70 local
    71   val tag = Universal.tag () : (string * task * group) option Universal.tag;
    72 in
    73   fun thread_data () = the_default NONE (Thread.getLocal tag);
    74   fun setmp_thread_data data f x =
    75     Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
    76 end;
    77 
    78 val is_worker = is_some o thread_data;
    79 val worker_group = Option.map #3 o thread_data;
    80 
    81 
    82 (* datatype future *)
    83 
    84 datatype 'a future = Future of
    85  {task: task,
    86   group: group,
    87   result: 'a Exn.result option ref};
    88 
    89 fun task_of (Future {task, ...}) = task;
    90 fun group_of (Future {group, ...}) = group;
    91 
    92 fun peek (Future {result, ...}) = ! result;
    93 fun is_finished x = is_some (peek x);
    94 
    95 fun value x = Future
    96  {task = Task_Queue.new_task 0,
    97   group = Task_Queue.new_group NONE,
    98   result = ref (SOME (Exn.Result x))};
    99 
   100 
   101 
   102 (** scheduling **)
   103 
   104 (* global state *)
   105 
   106 val queue = ref Task_Queue.empty;
   107 val next = ref 0;
   108 val workers = ref ([]: (Thread.thread * bool) list);
   109 val scheduler = ref (NONE: Thread.thread option);
   110 val excessive = ref 0;
   111 val canceled = ref ([]: Task_Queue.group list);
   112 val do_shutdown = ref false;
   113 
   114 
   115 (* synchronization *)
   116 
   117 val scheduler_event = ConditionVar.conditionVar ();
   118 val work_available = ConditionVar.conditionVar ();
   119 val work_finished = ConditionVar.conditionVar ();
   120 
   121 local
   122   val lock = Mutex.mutex ();
   123 in
   124 
   125 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
   126 
   127 fun wait cond = (*requires SYNCHRONIZED*)
   128   ConditionVar.wait (cond, lock);
   129 
   130 fun wait_timeout cond timeout = (*requires SYNCHRONIZED*)
   131   ignore (ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)));
   132 
   133 fun signal cond = (*requires SYNCHRONIZED*)
   134   ConditionVar.signal cond;
   135 
   136 fun broadcast cond = (*requires SYNCHRONIZED*)
   137   ConditionVar.broadcast cond;
   138 
   139 end;
   140 
   141 
   142 (* worker activity *)
   143 
   144 fun count_active ws =
   145   fold (fn (_, active) => fn i => if active then i + 1 else i) ws 0;
   146 
   147 fun trace_active () = Multithreading.tracing 6 (fn () =>
   148   let
   149     val ws = ! workers;
   150     val m = string_of_int (length ws);
   151     val n = string_of_int (count_active ws);
   152   in "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active" end);
   153 
   154 fun change_active active = (*requires SYNCHRONIZED*)
   155   change workers (AList.update Thread.equal (Thread.self (), active));
   156 
   157 fun overloaded () =
   158   count_active (! workers) > Multithreading.max_threads_value ();
   159 
   160 
   161 (* execute future jobs *)
   162 
   163 fun future_job group (e: unit -> 'a) =
   164   let
   165     val result = ref (NONE: 'a Exn.result option);
   166     fun job ok =
   167       let
   168         val res =
   169           if ok then
   170             Exn.capture
   171               (Multithreading.with_attributes Multithreading.restricted_interrupts
   172                 (fn _ => fn () => e ())) ()
   173           else Exn.Exn Exn.Interrupt;
   174         val _ = result := SOME res;
   175       in
   176         (case res of
   177           Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
   178         | Exn.Result _ => true)
   179       end;
   180   in (result, job) end;
   181 
   182 fun do_cancel group = (*requires SYNCHRONIZED*)
   183   change canceled (insert Task_Queue.eq_group group);
   184 
   185 fun execute name (task, group, jobs) =
   186   let
   187     val _ = trace_active ();
   188     val valid = not (Task_Queue.is_canceled group);
   189     val ok = setmp_thread_data (name, task, group) (fn () =>
   190       fold (fn job => fn ok => job valid andalso ok) jobs true) ();
   191     val _ = SYNCHRONIZED "execute" (fn () =>
   192       let
   193         val maximal = change_result queue (Task_Queue.finish task);
   194         val _ =
   195           if ok then ()
   196           else if Task_Queue.cancel (! queue) group then ()
   197           else do_cancel group;
   198         val _ = broadcast work_finished;
   199         val _ = if maximal then () else broadcast work_available;
   200       in () end);
   201   in () end;
   202 
   203 
   204 (* worker threads *)
   205 
   206 fun worker_wait cond = (*requires SYNCHRONIZED*)
   207  (change_active false; broadcast scheduler_event;
   208   wait cond;
   209   change_active true; broadcast scheduler_event);
   210 
   211 fun worker_next () = (*requires SYNCHRONIZED*)
   212   if ! excessive > 0 then
   213     (dec excessive;
   214      change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
   215      broadcast scheduler_event;
   216      NONE)
   217   else if overloaded () then (worker_wait scheduler_event; worker_next ())
   218   else
   219     (case change_result queue Task_Queue.dequeue of
   220       NONE => (worker_wait work_available; worker_next ())
   221     | some => some);
   222 
   223 fun worker_loop name =
   224   (case SYNCHRONIZED name (fn () => worker_next ()) of
   225     NONE => ()
   226   | SOME work => (execute name work; worker_loop name));
   227 
   228 fun worker_start name = (*requires SYNCHRONIZED*)
   229   change workers (cons (SimpleThread.fork false (fn () => worker_loop name), true));
   230 
   231 
   232 (* scheduler *)
   233 
   234 fun scheduler_next () = (*requires SYNCHRONIZED*)
   235   let
   236     (*queue status*)
   237     val _ = Multithreading.tracing 6 (fn () =>
   238       let val {ready, pending, running} = Task_Queue.status (! queue) in
   239         "SCHEDULE: " ^
   240           string_of_int ready ^ " ready, " ^
   241           string_of_int pending ^ " pending, " ^
   242           string_of_int running ^ " running"
   243       end);
   244 
   245     (*worker threads*)
   246     val _ =
   247       if forall (Thread.isActive o #1) (! workers) then ()
   248       else
   249         (case List.partition (Thread.isActive o #1) (! workers) of
   250           (_, []) => ()
   251         | (active, inactive) =>
   252             (workers := active; Multithreading.tracing 0 (fn () =>
   253               "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads")));
   254     val _ = trace_active ();
   255 
   256     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   257     val mm = (m * 3) div 2;
   258     val l = length (! workers);
   259     val _ = excessive := l - mm;
   260     val _ =
   261       if mm > l then
   262        (funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ();
   263         broadcast scheduler_event)
   264       else ();
   265 
   266     (*canceled groups*)
   267     val _ = change canceled (filter_out (Task_Queue.cancel (! queue)));
   268 
   269     val timeout =
   270       Time.fromMilliseconds (if not (! do_shutdown) andalso null (! canceled) then 500 else 50);
   271     val _ = interruptible (fn () => wait_timeout scheduler_event timeout) ()
   272       handle Exn.Interrupt => List.app do_cancel (Task_Queue.cancel_all (! queue));
   273 
   274     (*shutdown*)
   275     val continue = not (! do_shutdown andalso null (! workers));
   276     val _ = if continue then () else scheduler := NONE;
   277     val _ = broadcast scheduler_event;
   278   in continue end;
   279 
   280 fun scheduler_loop () =
   281   while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ();
   282 
   283 fun scheduler_active () = (*requires SYNCHRONIZED*)
   284   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   285 
   286 fun scheduler_check name = SYNCHRONIZED name (fn () =>
   287   if not (scheduler_active ()) then
   288    (do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop);
   289     broadcast scheduler_event)
   290   else if ! do_shutdown then error "Scheduler shutdown in progress"
   291   else ());
   292 
   293 
   294 
   295 (** futures **)
   296 
   297 (* fork *)
   298 
   299 fun fork_future opt_group deps pri e =
   300   let
   301     val _ = scheduler_check "future check";
   302 
   303     val group =
   304       (case opt_group of
   305         SOME group => group
   306       | NONE => Task_Queue.new_group (worker_group ()));
   307     val (result, job) = future_job group e;
   308     val task = SYNCHRONIZED "future" (fn () =>
   309       let
   310         val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job);
   311         val _ = if minimal then signal work_available else ();
   312       in task end);
   313   in Future {task = task, group = group, result = result} end;
   314 
   315 fun fork e = fork_future NONE [] 0 e;
   316 fun fork_group group e = fork_future (SOME group) [] 0 e;
   317 fun fork_deps deps e = fork_future NONE (map task_of deps) 0 e;
   318 fun fork_pri pri e = fork_future NONE [] pri e;
   319 
   320 
   321 (* join *)
   322 
   323 local
   324 
   325 fun get_result x =
   326   (case peek x of
   327     NONE => Exn.Exn (SYS_ERROR "unfinished future")
   328   | SOME (Exn.Exn Exn.Interrupt) =>
   329       Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
   330   | SOME res => res);
   331 
   332 fun join_next deps = (*requires SYNCHRONIZED*)
   333   if overloaded () then (worker_wait scheduler_event; join_next deps)
   334   else change_result queue (Task_Queue.dequeue_towards deps);
   335 
   336 fun join_deps deps =
   337   (case SYNCHRONIZED "join" (fn () => join_next deps) of
   338     NONE => ()
   339   | SOME (work, deps') => (execute "join" work; join_deps deps'));
   340 
   341 in
   342 
   343 fun join_results xs =
   344   if forall is_finished xs then map get_result xs
   345   else uninterruptible (fn _ => fn () =>
   346     let
   347       val _ = scheduler_check "join check";
   348       val _ = Multithreading.self_critical () andalso
   349         error "Cannot join future values within critical section";
   350 
   351       val worker = is_worker ();
   352       val _ = if worker then join_deps (map task_of xs) else ();
   353 
   354       fun join_wait x =
   355         if SYNCHRONIZED "join_wait" (fn () =>
   356           is_finished x orelse ((if worker then worker_wait else wait) work_finished; false))
   357         then () else join_wait x;
   358 
   359       val _ = xs |> List.app (fn x =>
   360         let val time = Multithreading.real_time join_wait x in
   361           Multithreading.tracing_time true time
   362             (fn () => "joined after " ^ Time.toString time)
   363         end);
   364     in map get_result xs end) ();
   365 
   366 end;
   367 
   368 fun join_result x = singleton join_results x;
   369 fun join x = Exn.release (join_result x);
   370 
   371 
   372 (* map *)
   373 
   374 fun map_future f x =
   375   let
   376     val _ = scheduler_check "map_future check";
   377 
   378     val task = task_of x;
   379     val group = Task_Queue.new_group (SOME (group_of x));
   380     val (result, job) = future_job group (fn () => f (join x));
   381 
   382     val extended = SYNCHRONIZED "map_future" (fn () =>
   383       (case Task_Queue.extend task job (! queue) of
   384         SOME queue' => (queue := queue'; true)
   385       | NONE => false));
   386   in
   387     if extended then Future {task = task, group = group, result = result}
   388     else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
   389   end;
   390 
   391 
   392 (* cancellation *)
   393 
   394 fun interruptible_task f x =
   395   if Multithreading.available then
   396     Multithreading.with_attributes
   397       (if is_worker ()
   398        then Multithreading.restricted_interrupts
   399        else Multithreading.regular_interrupts)
   400       (fn _ => f) x
   401   else interruptible f x;
   402 
   403 (*interrupt: permissive signal, may get ignored*)
   404 fun interrupt_task id = SYNCHRONIZED "interrupt"
   405   (fn () => Task_Queue.interrupt_external (! queue) id);
   406 
   407 (*cancel: present and future group members will be interrupted eventually*)
   408 fun cancel_group group =
   409  (scheduler_check "cancel check";
   410   SYNCHRONIZED "cancel" (fn () => (do_cancel group; broadcast scheduler_event)));
   411 
   412 fun cancel x = cancel_group (group_of x);
   413 
   414 
   415 (** global join and shutdown **)
   416 
   417 fun shutdown () =
   418   if Multithreading.available then
   419    (scheduler_check "shutdown check";
   420     SYNCHRONIZED "shutdown" (fn () =>
   421      (while not (scheduler_active ()) do wait scheduler_event;
   422       while not (Task_Queue.is_empty (! queue)) do wait scheduler_event;
   423       do_shutdown := true;
   424       while scheduler_active () do
   425        (broadcast work_available;
   426         broadcast scheduler_event;
   427         wait scheduler_event))))
   428   else ();
   429 
   430 
   431 (*final declarations of this structure!*)
   432 val map = map_future;
   433 
   434 end;
   435 
   436 type 'a future = 'a Future.future;
   437