src/Pure/Concurrent/future.ML
author wenzelm
Sat, 29 May 2010 15:31:15 +0200
changeset 37182 71c8565dae38
parent 37053 78d88b670a53
child 37216 3165bc303f66
permissions -rw-r--r--
future result: retain plain Interrupt for vacuous group exceptions;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Future values, see also
     5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
     6 
     7 Notes:
     8 
     9   * Futures are similar to delayed evaluation, i.e. delay/force is
    10     generalized to fork/join (and variants).  The idea is to model
    11     parallel value-oriented computations, but *not* communicating
    12     processes.
    13 
    14   * Futures are grouped; failure of one group member causes the whole
    15     group to be interrupted eventually.  Groups are block-structured.
    16 
    17   * Forked futures are evaluated spontaneously by a farm of worker
    18     threads in the background; join resynchronizes the computation and
    19     delivers results (values or exceptions).
    20 
    21   * The pool of worker threads is limited, usually in correlation with
    22     the number of physical cores on the machine.  Note that allocation
    23     of runtime resources is distorted either if workers yield CPU time
    24     (e.g. via system sleep or wait operations), or if non-worker
    25     threads contend for significant runtime resources independently.
    26 
    27   * Promised futures are fulfilled by external means.  There is no
    28     associated evaluation task, but other futures can depend on them
    29     as usual.
    30 *)
    31 
    32 signature FUTURE =
    33 sig
    34   type task = Task_Queue.task
    35   type group = Task_Queue.group
    36   val is_worker: unit -> bool
    37   val worker_task: unit -> Task_Queue.task option
    38   val worker_group: unit -> Task_Queue.group option
    39   type 'a future
    40   val task_of: 'a future -> task
    41   val group_of: 'a future -> group
    42   val peek: 'a future -> 'a Exn.result option
    43   val is_finished: 'a future -> bool
    44   val fork_group: group -> (unit -> 'a) -> 'a future
    45   val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future
    46   val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
    47   val fork_pri: int -> (unit -> 'a) -> 'a future
    48   val fork: (unit -> 'a) -> 'a future
    49   val join_results: 'a future list -> 'a Exn.result list
    50   val join_result: 'a future -> 'a Exn.result
    51   val join: 'a future -> 'a
    52   val value: 'a -> 'a future
    53   val map: ('a -> 'b) -> 'a future -> 'b future
    54   val promise_group: group -> 'a future
    55   val promise: unit -> 'a future
    56   val fulfill_result: 'a future -> 'a Exn.result -> unit
    57   val fulfill: 'a future -> 'a -> unit
    58   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    59   val cancel_group: group -> unit
    60   val cancel: 'a future -> unit
    61   val shutdown: unit -> unit
    62 end;
    63 
    64 structure Future: FUTURE =
    65 struct
    66 
    67 (** future values **)
    68 
    69 (* identifiers *)
    70 
    71 type task = Task_Queue.task;
    72 type group = Task_Queue.group;
    73 
    74 local
    75   val tag = Universal.tag () : (task * group) option Universal.tag;
    76 in
    77   fun thread_data () = the_default NONE (Thread.getLocal tag);
    78   fun setmp_thread_data data f x =
    79     Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
    80 end;
    81 
    82 val is_worker = is_some o thread_data;
    83 val worker_task = Option.map #1 o thread_data;
    84 val worker_group = Option.map #2 o thread_data;
    85 
    86 fun new_group () = Task_Queue.new_group (worker_group ());
    87 
    88 
    89 (* datatype future *)
    90 
    91 type 'a result = 'a Exn.result Single_Assignment.var;
    92 
    93 datatype 'a future = Future of
    94  {promised: bool,
    95   task: task,
    96   group: group,
    97   result: 'a result};
    98 
    99 fun task_of (Future {task, ...}) = task;
   100 fun group_of (Future {group, ...}) = group;
   101 fun result_of (Future {result, ...}) = result;
   102 
   103 fun peek x = Single_Assignment.peek (result_of x);
   104 fun is_finished x = is_some (peek x);
   105 
   106 fun assign_result group result res =
   107   let
   108     val _ = Single_Assignment.assign result res;
   109     val ok =
   110       (case res of
   111         Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
   112       | Exn.Result _ => true);
   113   in ok end;
   114 
   115 
   116 
   117 (** scheduling **)
   118 
   119 (* synchronization *)
   120 
   121 val scheduler_event = ConditionVar.conditionVar ();
   122 val work_available = ConditionVar.conditionVar ();
   123 val work_finished = ConditionVar.conditionVar ();
   124 
   125 local
   126   val lock = Mutex.mutex ();
   127 in
   128 
   129 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
   130 
   131 fun wait cond = (*requires SYNCHRONIZED*)
   132   Multithreading.sync_wait NONE NONE cond lock;
   133 
   134 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
   135   Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
   136 
   137 fun signal cond = (*requires SYNCHRONIZED*)
   138   ConditionVar.signal cond;
   139 
   140 fun broadcast cond = (*requires SYNCHRONIZED*)
   141   ConditionVar.broadcast cond;
   142 
   143 fun broadcast_work () = (*requires SYNCHRONIZED*)
   144  (ConditionVar.broadcast work_available;
   145   ConditionVar.broadcast work_finished);
   146 
   147 end;
   148 
   149 
   150 (* global state *)
   151 
   152 val queue = Unsynchronized.ref Task_Queue.empty;
   153 val next = Unsynchronized.ref 0;
   154 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
   155 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
   156 val do_shutdown = Unsynchronized.ref false;
   157 val max_workers = Unsynchronized.ref 0;
   158 val max_active = Unsynchronized.ref 0;
   159 val worker_trend = Unsynchronized.ref 0;
   160 
   161 datatype worker_state = Working | Waiting | Sleeping;
   162 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
   163 
   164 fun count_workers state = (*requires SYNCHRONIZED*)
   165   fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
   166 
   167 
   168 (* execute future jobs *)
   169 
   170 fun future_job group (e: unit -> 'a) =
   171   let
   172     val result = Single_Assignment.var "future" : 'a result;
   173     val pos = Position.thread_data ();
   174     fun job ok =
   175       let
   176         val res =
   177           if ok then
   178             Exn.capture (fn () =>
   179               Multithreading.with_attributes Multithreading.private_interrupts
   180                 (fn _ => Position.setmp_thread_data pos e ())) ()
   181           else Exn.Exn Exn.Interrupt;
   182       in assign_result group result res end;
   183   in (result, job) end;
   184 
   185 fun cancel_now group = (*requires SYNCHRONIZED*)
   186   Task_Queue.cancel (! queue) group;
   187 
   188 fun cancel_later group = (*requires SYNCHRONIZED*)
   189  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   190   broadcast scheduler_event);
   191 
   192 fun execute (task, group, jobs) =
   193   let
   194     val valid = not (Task_Queue.is_canceled group);
   195     val ok = setmp_thread_data (task, group) (fn () =>
   196       fold (fn job => fn ok => job valid andalso ok) jobs true) ();
   197     val _ = SYNCHRONIZED "finish" (fn () =>
   198       let
   199         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   200         val _ =
   201           if ok then ()
   202           else if cancel_now group then ()
   203           else cancel_later group;
   204         val _ = broadcast work_finished;
   205         val _ = if maximal then () else signal work_available;
   206       in () end);
   207   in () end;
   208 
   209 
   210 (* worker threads *)
   211 
   212 fun worker_wait active cond = (*requires SYNCHRONIZED*)
   213   let
   214     val state =
   215       (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
   216         SOME state => state
   217       | NONE => raise Fail "Unregistered worker thread");
   218     val _ = state := (if active then Waiting else Sleeping);
   219     val _ = wait cond;
   220     val _ = state := Working;
   221   in () end;
   222 
   223 fun worker_next () = (*requires SYNCHRONIZED*)
   224   if length (! workers) > ! max_workers then
   225     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
   226      signal work_available;
   227      NONE)
   228   else if count_workers Working > ! max_active then
   229     (worker_wait false work_available; worker_next ())
   230   else
   231     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   232       NONE => (worker_wait false work_available; worker_next ())
   233     | some => (signal work_available; some));
   234 
   235 fun worker_loop name =
   236   (case SYNCHRONIZED name (fn () => worker_next ()) of
   237     NONE => ()
   238   | SOME work => (execute work; worker_loop name));
   239 
   240 fun worker_start name = (*requires SYNCHRONIZED*)
   241   Unsynchronized.change workers (cons (SimpleThread.fork false (fn () => worker_loop name),
   242     Unsynchronized.ref Working));
   243 
   244 
   245 (* scheduler *)
   246 
   247 val status_ticks = Unsynchronized.ref 0;
   248 
   249 val last_round = Unsynchronized.ref Time.zeroTime;
   250 val next_round = Time.fromMilliseconds 50;
   251 
   252 fun scheduler_next () = (*requires SYNCHRONIZED*)
   253   let
   254     val now = Time.now ();
   255     val tick = Time.<= (Time.+ (! last_round, next_round), now);
   256     val _ = if tick then last_round := now else ();
   257 
   258 
   259     (* queue and worker status *)
   260 
   261     val _ =
   262       if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
   263     val _ =
   264       if tick andalso ! status_ticks = 0 then
   265         Multithreading.tracing 1 (fn () =>
   266           let
   267             val {ready, pending, running, passive} = Task_Queue.status (! queue);
   268             val total = length (! workers);
   269             val active = count_workers Working;
   270             val waiting = count_workers Waiting;
   271           in
   272             "SCHEDULE " ^ Time.toString now ^ ": " ^
   273               string_of_int ready ^ " ready, " ^
   274               string_of_int pending ^ " pending, " ^
   275               string_of_int running ^ " running, " ^
   276               string_of_int passive ^ " passive; " ^
   277               string_of_int total ^ " workers, " ^
   278               string_of_int active ^ " active, " ^
   279               string_of_int waiting ^ " waiting "
   280           end)
   281       else ();
   282 
   283     val _ =
   284       if forall (Thread.isActive o #1) (! workers) then ()
   285       else
   286         let
   287           val  (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
   288           val _ = workers := alive;
   289         in
   290           Multithreading.tracing 0 (fn () =>
   291             "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
   292         end;
   293 
   294 
   295     (* worker pool adjustments *)
   296 
   297     val max_active0 = ! max_active;
   298     val max_workers0 = ! max_workers;
   299 
   300     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   301     val _ = max_active := m;
   302 
   303     val mm =
   304       if ! do_shutdown then 0
   305       else if m = 9999 then 1
   306       else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
   307     val _ =
   308       if tick andalso mm > ! max_workers then
   309         Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
   310       else if tick andalso mm < ! max_workers then
   311         Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
   312       else ();
   313     val _ =
   314       if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
   315         max_workers := mm
   316       else if ! worker_trend > 5 andalso ! max_workers < 2 * m then
   317         max_workers := Int.min (mm, 2 * m)
   318       else ();
   319 
   320     val missing = ! max_workers - length (! workers);
   321     val _ =
   322       if missing > 0 then
   323         funpow missing (fn () =>
   324           ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
   325       else ();
   326 
   327     val _ =
   328       if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
   329       else signal work_available;
   330 
   331 
   332     (* canceled groups *)
   333 
   334     val _ =
   335       if null (! canceled) then ()
   336       else
   337        (Multithreading.tracing 1 (fn () =>
   338           string_of_int (length (! canceled)) ^ " canceled groups");
   339         Unsynchronized.change canceled (filter_out cancel_now);
   340         broadcast_work ());
   341 
   342 
   343     (* delay loop *)
   344 
   345     val _ = Exn.release (wait_timeout next_round scheduler_event);
   346 
   347 
   348     (* shutdown *)
   349 
   350     val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
   351     val continue = not (! do_shutdown andalso null (! workers));
   352     val _ = if continue then () else scheduler := NONE;
   353 
   354     val _ = broadcast scheduler_event;
   355   in continue end
   356   handle Exn.Interrupt =>
   357    (Multithreading.tracing 1 (fn () => "Interrupt");
   358     List.app cancel_later (Task_Queue.cancel_all (! queue));
   359     broadcast_work (); true);
   360 
   361 fun scheduler_loop () =
   362   while
   363     Multithreading.with_attributes
   364       (Multithreading.sync_interrupts Multithreading.public_interrupts)
   365       (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
   366   do ();
   367 
   368 fun scheduler_active () = (*requires SYNCHRONIZED*)
   369   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   370 
   371 fun scheduler_check () = (*requires SYNCHRONIZED*)
   372  (do_shutdown := false;
   373   if scheduler_active () then ()
   374   else scheduler := SOME (SimpleThread.fork false scheduler_loop));
   375 
   376 
   377 
   378 (** futures **)
   379 
   380 (* fork *)
   381 
   382 fun fork_future opt_group deps pri e =
   383   let
   384     val group =
   385       (case opt_group of
   386         NONE => new_group ()
   387       | SOME group => group);
   388     val (result, job) = future_job group e;
   389     val task = SYNCHRONIZED "enqueue" (fn () =>
   390       let
   391         val (task, minimal) =
   392           Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job);
   393         val _ = if minimal then signal work_available else ();
   394         val _ = scheduler_check ();
   395       in task end);
   396   in Future {promised = false, task = task, group = group, result = result} end;
   397 
   398 fun fork_group group e = fork_future (SOME group) [] 0 e;
   399 fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e;
   400 fun fork_deps deps e = fork_deps_pri deps 0 e;
   401 fun fork_pri pri e = fork_deps_pri [] pri e;
   402 fun fork e = fork_deps [] e;
   403 
   404 
   405 (* join *)
   406 
   407 local
   408 
   409 fun get_result x =
   410   (case peek x of
   411     NONE => Exn.Exn (SYS_ERROR "unfinished future")
   412   | SOME (exn as Exn.Exn Exn.Interrupt) =>
   413       (case Exn.flatten_list (Task_Queue.group_status (group_of x)) of
   414         [] => exn
   415       | exns => Exn.Exn (Exn.EXCEPTIONS exns))
   416   | SOME res => res);
   417 
   418 fun join_next deps = (*requires SYNCHRONIZED*)
   419   if null deps then NONE
   420   else
   421     (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
   422       (NONE, []) => NONE
   423     | (NONE, deps') => (worker_wait true work_finished; join_next deps')
   424     | (SOME work, deps') => SOME (work, deps'));
   425 
   426 fun execute_work NONE = ()
   427   | execute_work (SOME (work, deps')) = (execute work; join_work deps')
   428 and join_work deps =
   429   execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
   430 
   431 fun join_depend task deps =
   432   execute_work (SYNCHRONIZED "join" (fn () =>
   433     (Unsynchronized.change queue (Task_Queue.depend task deps); join_next deps)));
   434 
   435 in
   436 
   437 fun join_results xs =
   438   if forall is_finished xs then map get_result xs
   439   else if Multithreading.self_critical () then
   440     error "Cannot join future values within critical section"
   441   else
   442     (case worker_task () of
   443       SOME task => join_depend task (map task_of xs)
   444     | NONE => List.app (ignore o Single_Assignment.await o result_of) xs;
   445     map get_result xs);
   446 
   447 end;
   448 
   449 fun join_result x = singleton join_results x;
   450 fun join x = Exn.release (join_result x);
   451 
   452 
   453 (* fast-path versions -- bypassing full task management *)
   454 
   455 fun value (x: 'a) =
   456   let
   457     val group = Task_Queue.new_group NONE;
   458     val result = Single_Assignment.var "value" : 'a result;
   459     val _ = assign_result group result (Exn.Result x);
   460   in Future {promised = false, task = Task_Queue.dummy_task, group = group, result = result} end;
   461 
   462 fun map_future f x =
   463   let
   464     val task = task_of x;
   465     val group = Task_Queue.new_group (SOME (group_of x));
   466     val (result, job) = future_job group (fn () => f (join x));
   467 
   468     val extended = SYNCHRONIZED "extend" (fn () =>
   469       (case Task_Queue.extend task job (! queue) of
   470         SOME queue' => (queue := queue'; true)
   471       | NONE => false));
   472   in
   473     if extended then Future {promised = false, task = task, group = group, result = result}
   474     else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
   475   end;
   476 
   477 
   478 (* promised futures -- fulfilled by external means *)
   479 
   480 fun promise_group group : 'a future =
   481   let
   482     val result = Single_Assignment.var "promise" : 'a result;
   483     val task = SYNCHRONIZED "enqueue" (fn () =>
   484       Unsynchronized.change_result queue (Task_Queue.enqueue_passive group));
   485   in Future {promised = true, task = task, group = group, result = result} end;
   486 
   487 fun promise () = promise_group (new_group ());
   488 
   489 fun fulfill_result (Future {promised, task, group, result}) res =
   490   let
   491     val _ = promised orelse raise Fail "Not a promised future";
   492     fun job ok = assign_result group result (if ok then res else Exn.Exn Exn.Interrupt);
   493     val _ = execute (task, group, [job]);
   494   in () end;
   495 
   496 fun fulfill x res = fulfill_result x (Exn.Result res);
   497 
   498 
   499 (* cancellation *)
   500 
   501 fun interruptible_task f x =
   502   if Multithreading.available then
   503     Multithreading.with_attributes
   504       (if is_worker ()
   505        then Multithreading.private_interrupts
   506        else Multithreading.public_interrupts)
   507       (fn _ => f x)
   508   else interruptible f x;
   509 
   510 (*cancel: present and future group members will be interrupted eventually*)
   511 fun cancel_group group =
   512   SYNCHRONIZED "cancel" (fn () => if cancel_now group then () else cancel_later group);
   513 fun cancel x = cancel_group (group_of x);
   514 
   515 
   516 (* shutdown *)
   517 
   518 fun shutdown () =
   519   if Multithreading.available then
   520     SYNCHRONIZED "shutdown" (fn () =>
   521      while scheduler_active () do
   522       (wait scheduler_event; broadcast_work ()))
   523   else ();
   524 
   525 
   526 (*final declarations of this structure!*)
   527 val map = map_future;
   528 
   529 end;
   530 
   531 type 'a future = 'a Future.future;
   532