src/Pure/Concurrent/future.ML
author wenzelm
Wed, 02 Feb 2011 13:38:09 +0100
changeset 42561 b5d7b15166bf
parent 42560 a4c822915eaa
child 42563 73dde8006820
permissions -rw-r--r--
Future.join_results: discontinued post-hoc recording of dynamic dependencies;
abstract Task_Queue.deps;
tuned signature;
tuned;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Future values, see also
     5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
     6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
     7 
     8 Notes:
     9 
    10   * Futures are similar to delayed evaluation, i.e. delay/force is
    11     generalized to fork/join (and variants).  The idea is to model
    12     parallel value-oriented computations, but *not* communicating
    13     processes.
    14 
    15   * Futures are grouped; failure of one group member causes the whole
    16     group to be interrupted eventually.  Groups are block-structured.
    17 
    18   * Forked futures are evaluated spontaneously by a farm of worker
    19     threads in the background; join resynchronizes the computation and
    20     delivers results (values or exceptions).
    21 
    22   * The pool of worker threads is limited, usually in correlation with
    23     the number of physical cores on the machine.  Note that allocation
    24     of runtime resources is distorted either if workers yield CPU time
    25     (e.g. via system sleep or wait operations), or if non-worker
    26     threads contend for significant runtime resources independently.
    27 
    28   * Promised futures are fulfilled by external means.  There is no
    29     associated evaluation task, but other futures can depend on them
    30     as usual.
    31 *)
    32 
    33 signature FUTURE =
    34 sig
    35   type task = Task_Queue.task
    36   type group = Task_Queue.group
    37   val is_worker: unit -> bool
    38   val worker_task: unit -> Task_Queue.task option
    39   val worker_group: unit -> Task_Queue.group option
    40   val worker_subgroup: unit -> Task_Queue.group
    41   type 'a future
    42   val task_of: 'a future -> task
    43   val group_of: 'a future -> group
    44   val peek: 'a future -> 'a Exn.result option
    45   val is_finished: 'a future -> bool
    46   val forks: {name: string, group: group option, deps: task list, pri: int} ->
    47     (unit -> 'a) list -> 'a future list
    48   val fork_pri: int -> (unit -> 'a) -> 'a future
    49   val fork: (unit -> 'a) -> 'a future
    50   val join_results: 'a future list -> 'a Exn.result list
    51   val join_result: 'a future -> 'a Exn.result
    52   val join: 'a future -> 'a
    53   val value: 'a -> 'a future
    54   val map: ('a -> 'b) -> 'a future -> 'b future
    55   val promise_group: group -> 'a future
    56   val promise: unit -> 'a future
    57   val fulfill_result: 'a future -> 'a Exn.result -> unit
    58   val fulfill: 'a future -> 'a -> unit
    59   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    60   val cancel_group: group -> unit
    61   val cancel: 'a future -> unit
    62   val shutdown: unit -> unit
    63   val status: (unit -> 'a) -> 'a
    64 end;
    65 
    66 structure Future: FUTURE =
    67 struct
    68 
    69 (** future values **)
    70 
    71 (* identifiers *)
    72 
    73 type task = Task_Queue.task;
    74 type group = Task_Queue.group;
    75 
    76 local
    77   val tag = Universal.tag () : (task * group) option Universal.tag;
    78 in
    79   fun thread_data () = the_default NONE (Thread.getLocal tag);
    80   fun setmp_thread_data data f x =
    81     Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
    82 end;
    83 
    84 val is_worker = is_some o thread_data;
    85 val worker_task = Option.map #1 o thread_data;
    86 val worker_group = Option.map #2 o thread_data;
    87 fun worker_subgroup () = Task_Queue.new_group (worker_group ());
    88 
    89 fun worker_joining e =
    90   (case worker_task () of
    91     NONE => e ()
    92   | SOME task => Task_Queue.joining task e);
    93 
    94 fun worker_waiting deps e =
    95   (case worker_task () of
    96     NONE => e ()
    97   | SOME task => Task_Queue.waiting task deps e);
    98 
    99 
   100 (* datatype future *)
   101 
   102 type 'a result = 'a Exn.result Single_Assignment.var;
   103 
   104 datatype 'a future = Future of
   105  {promised: bool,
   106   task: task,
   107   group: group,
   108   result: 'a result};
   109 
   110 fun task_of (Future {task, ...}) = task;
   111 fun group_of (Future {group, ...}) = group;
   112 fun result_of (Future {result, ...}) = result;
   113 
   114 fun peek x = Single_Assignment.peek (result_of x);
   115 fun is_finished x = is_some (peek x);
   116 
   117 fun assign_result group result res =
   118   let
   119     val _ = Single_Assignment.assign result res
   120       handle exn as Fail _ =>
   121         (case Single_Assignment.peek result of
   122           SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
   123         | _ => reraise exn);
   124     val ok =
   125       (case the (Single_Assignment.peek result) of
   126         Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
   127       | Exn.Result _ => true);
   128   in ok end;
   129 
   130 
   131 
   132 (** scheduling **)
   133 
   134 (* synchronization *)
   135 
   136 val scheduler_event = ConditionVar.conditionVar ();
   137 val work_available = ConditionVar.conditionVar ();
   138 val work_finished = ConditionVar.conditionVar ();
   139 
   140 local
   141   val lock = Mutex.mutex ();
   142 in
   143 
   144 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
   145 
   146 fun wait cond = (*requires SYNCHRONIZED*)
   147   Multithreading.sync_wait NONE NONE cond lock;
   148 
   149 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
   150   Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
   151 
   152 fun signal cond = (*requires SYNCHRONIZED*)
   153   ConditionVar.signal cond;
   154 
   155 fun broadcast cond = (*requires SYNCHRONIZED*)
   156   ConditionVar.broadcast cond;
   157 
   158 fun broadcast_work () = (*requires SYNCHRONIZED*)
   159  (ConditionVar.broadcast work_available;
   160   ConditionVar.broadcast work_finished);
   161 
   162 end;
   163 
   164 
   165 (* global state *)
   166 
   167 val queue = Unsynchronized.ref Task_Queue.empty;
   168 val next = Unsynchronized.ref 0;
   169 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
   170 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
   171 val do_shutdown = Unsynchronized.ref false;
   172 val max_workers = Unsynchronized.ref 0;
   173 val max_active = Unsynchronized.ref 0;
   174 val worker_trend = Unsynchronized.ref 0;
   175 
   176 datatype worker_state = Working | Waiting | Sleeping;
   177 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
   178 
   179 fun count_workers state = (*requires SYNCHRONIZED*)
   180   fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
   181 
   182 
   183 (* execute future jobs *)
   184 
   185 fun future_job group (e: unit -> 'a) =
   186   let
   187     val result = Single_Assignment.var "future" : 'a result;
   188     val pos = Position.thread_data ();
   189     fun job ok =
   190       let
   191         val res =
   192           if ok then
   193             Exn.capture (fn () =>
   194               Multithreading.with_attributes Multithreading.private_interrupts
   195                 (fn _ => Position.setmp_thread_data pos e ())) ()
   196           else Exn.interrupt_exn;
   197       in assign_result group result res end;
   198   in (result, job) end;
   199 
   200 fun cancel_now group = (*requires SYNCHRONIZED*)
   201   Task_Queue.cancel (! queue) group;
   202 
   203 fun cancel_later group = (*requires SYNCHRONIZED*)
   204  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   205   broadcast scheduler_event);
   206 
   207 fun execute (task, group, jobs) =
   208   let
   209     val valid = not (Task_Queue.is_canceled group);
   210     val ok =
   211       Task_Queue.running task (fn () =>
   212         setmp_thread_data (task, group) (fn () =>
   213           fold (fn job => fn ok => job valid andalso ok) jobs true) ());
   214     val _ = Multithreading.tracing 1 (fn () =>
   215       let
   216         val s = Task_Queue.str_of_task task;
   217         fun micros time = string_of_int (Time.toNanoseconds time div 1000);
   218         val (run, wait, deps) = Task_Queue.timing_of_task task;
   219       in "TASK " ^ s ^ " " ^ micros run ^ " " ^ micros wait ^ " (" ^ commas deps ^ ")" end);
   220     val _ = SYNCHRONIZED "finish" (fn () =>
   221       let
   222         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   223         val _ =
   224           if ok then ()
   225           else if cancel_now group then ()
   226           else cancel_later group;
   227         val _ = broadcast work_finished;
   228         val _ = if maximal then () else signal work_available;
   229       in () end);
   230   in () end;
   231 
   232 
   233 (* worker threads *)
   234 
   235 fun worker_wait active cond = (*requires SYNCHRONIZED*)
   236   let
   237     val state =
   238       (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
   239         SOME state => state
   240       | NONE => raise Fail "Unregistered worker thread");
   241     val _ = state := (if active then Waiting else Sleeping);
   242     val _ = wait cond;
   243     val _ = state := Working;
   244   in () end;
   245 
   246 fun worker_next () = (*requires SYNCHRONIZED*)
   247   if length (! workers) > ! max_workers then
   248     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
   249      signal work_available;
   250      NONE)
   251   else if count_workers Working > ! max_active then
   252     (worker_wait false work_available; worker_next ())
   253   else
   254     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   255       NONE => (worker_wait false work_available; worker_next ())
   256     | some => (signal work_available; some));
   257 
   258 fun worker_loop name =
   259   (case SYNCHRONIZED name (fn () => worker_next ()) of
   260     NONE => ()
   261   | SOME work => (execute work; worker_loop name));
   262 
   263 fun worker_start name = (*requires SYNCHRONIZED*)
   264   Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
   265     Unsynchronized.ref Working));
   266 
   267 
   268 (* scheduler *)
   269 
   270 val status_ticks = Unsynchronized.ref 0;
   271 
   272 val last_round = Unsynchronized.ref Time.zeroTime;
   273 val next_round = seconds 0.05;
   274 
   275 fun scheduler_next () = (*requires SYNCHRONIZED*)
   276   let
   277     val now = Time.now ();
   278     val tick = Time.<= (Time.+ (! last_round, next_round), now);
   279     val _ = if tick then last_round := now else ();
   280 
   281 
   282     (* queue and worker status *)
   283 
   284     val _ =
   285       if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
   286     val _ =
   287       if tick andalso ! status_ticks = 0 then
   288         Multithreading.tracing 1 (fn () =>
   289           let
   290             val {ready, pending, running, passive} = Task_Queue.status (! queue);
   291             val total = length (! workers);
   292             val active = count_workers Working;
   293             val waiting = count_workers Waiting;
   294           in
   295             "SCHEDULE " ^ Time.toString now ^ ": " ^
   296               string_of_int ready ^ " ready, " ^
   297               string_of_int pending ^ " pending, " ^
   298               string_of_int running ^ " running, " ^
   299               string_of_int passive ^ " passive; " ^
   300               string_of_int total ^ " workers, " ^
   301               string_of_int active ^ " active, " ^
   302               string_of_int waiting ^ " waiting "
   303           end)
   304       else ();
   305 
   306     val _ =
   307       if forall (Thread.isActive o #1) (! workers) then ()
   308       else
   309         let
   310           val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
   311           val _ = workers := alive;
   312         in
   313           Multithreading.tracing 0 (fn () =>
   314             "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
   315         end;
   316 
   317 
   318     (* worker pool adjustments *)
   319 
   320     val max_active0 = ! max_active;
   321     val max_workers0 = ! max_workers;
   322 
   323     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   324     val _ = max_active := m;
   325 
   326     val mm =
   327       if ! do_shutdown then 0
   328       else if m = 9999 then 1
   329       else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
   330     val _ =
   331       if tick andalso mm > ! max_workers then
   332         Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
   333       else if tick andalso mm < ! max_workers then
   334         Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
   335       else ();
   336     val _ =
   337       if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
   338         max_workers := mm
   339       else if ! worker_trend > 5 andalso ! max_workers < 2 * m then
   340         max_workers := Int.min (mm, 2 * m)
   341       else ();
   342 
   343     val missing = ! max_workers - length (! workers);
   344     val _ =
   345       if missing > 0 then
   346         funpow missing (fn () =>
   347           ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
   348       else ();
   349 
   350     val _ =
   351       if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
   352       else signal work_available;
   353 
   354 
   355     (* canceled groups *)
   356 
   357     val _ =
   358       if null (! canceled) then ()
   359       else
   360        (Multithreading.tracing 1 (fn () =>
   361           string_of_int (length (! canceled)) ^ " canceled groups");
   362         Unsynchronized.change canceled (filter_out cancel_now);
   363         broadcast_work ());
   364 
   365 
   366     (* delay loop *)
   367 
   368     val _ = Exn.release (wait_timeout next_round scheduler_event);
   369 
   370 
   371     (* shutdown *)
   372 
   373     val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
   374     val continue = not (! do_shutdown andalso null (! workers));
   375     val _ = if continue then () else scheduler := NONE;
   376 
   377     val _ = broadcast scheduler_event;
   378   in continue end
   379   handle exn =>
   380     if Exn.is_interrupt exn then
   381      (Multithreading.tracing 1 (fn () => "Interrupt");
   382       List.app cancel_later (Task_Queue.cancel_all (! queue));
   383       broadcast_work (); true)
   384     else reraise exn;
   385 
   386 fun scheduler_loop () =
   387   while
   388     Multithreading.with_attributes
   389       (Multithreading.sync_interrupts Multithreading.public_interrupts)
   390       (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
   391   do ();
   392 
   393 fun scheduler_active () = (*requires SYNCHRONIZED*)
   394   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   395 
   396 fun scheduler_check () = (*requires SYNCHRONIZED*)
   397  (do_shutdown := false;
   398   if scheduler_active () then ()
   399   else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
   400 
   401 
   402 
   403 (** futures **)
   404 
   405 (* fork *)
   406 
   407 fun forks {name, group, deps, pri} es =
   408   if null es then []
   409   else
   410     let
   411       val grp =
   412         (case group of
   413           NONE => worker_subgroup ()
   414         | SOME grp => grp);
   415       fun enqueue e (minimal, queue) =
   416         let
   417           val (result, job) = future_job grp e;
   418           val ((task, minimal'), queue') = Task_Queue.enqueue name grp deps pri job queue;
   419           val future = Future {promised = false, task = task, group = grp, result = result};
   420         in (future, (minimal orelse minimal', queue')) end;
   421     in
   422       SYNCHRONIZED "enqueue" (fn () =>
   423         let
   424           val (futures, minimal) =
   425             Unsynchronized.change_result queue (fn q =>
   426               let val (futures, (minimal, q')) = fold_map enqueue es (false, q)
   427               in ((futures, minimal), q') end);
   428           val _ = if minimal then signal work_available else ();
   429           val _ = scheduler_check ();
   430         in futures end)
   431     end;
   432 
   433 fun fork_pri pri e = singleton (forks {name = "", group = NONE, deps = [], pri = pri}) e;
   434 fun fork e = fork_pri 0 e;
   435 
   436 
   437 (* join *)
   438 
   439 local
   440 
   441 fun get_result x =
   442   (case peek x of
   443     NONE => Exn.Exn (Fail "Unfinished future")
   444   | SOME res =>
   445       if Exn.is_interrupt_exn res then
   446         (case Exn.flatten_list (Task_Queue.group_status (group_of x)) of
   447           [] => res
   448         | exns => Exn.Exn (Exn.EXCEPTIONS exns))
   449       else res);
   450 
   451 fun join_next deps = (*requires SYNCHRONIZED*)
   452   if Task_Queue.finished_deps deps then NONE
   453   else
   454     (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
   455       (NONE, deps') =>
   456         if Task_Queue.finished_deps deps' then NONE
   457         else (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
   458     | (SOME work, deps') => SOME (work, deps'));
   459 
   460 fun execute_work NONE = ()
   461   | execute_work (SOME (work, deps')) = (worker_joining (fn () => execute work); join_work deps')
   462 and join_work deps =
   463   execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
   464 
   465 in
   466 
   467 fun join_results xs =
   468   let
   469     val _ =
   470       if forall is_finished xs then ()
   471       else if Multithreading.self_critical () then
   472         error "Cannot join future values within critical section"
   473       else if is_some (thread_data ()) then
   474         join_work (Task_Queue.init_deps (map task_of xs))
   475       else List.app (ignore o Single_Assignment.await o result_of) xs;
   476   in map get_result xs end;
   477 
   478 end;
   479 
   480 fun join_result x = singleton join_results x;
   481 fun join x = Exn.release (join_result x);
   482 
   483 
   484 (* fast-path versions -- bypassing full task management *)
   485 
   486 fun value (x: 'a) =
   487   let
   488     val group = Task_Queue.new_group NONE;
   489     val result = Single_Assignment.var "value" : 'a result;
   490     val _ = assign_result group result (Exn.Result x);
   491   in Future {promised = false, task = Task_Queue.dummy_task, group = group, result = result} end;
   492 
   493 fun map_future f x =
   494   let
   495     val task = task_of x;
   496     val group = Task_Queue.new_group (SOME (group_of x));
   497     val (result, job) = future_job group (fn () => f (join x));
   498 
   499     val extended = SYNCHRONIZED "extend" (fn () =>
   500       (case Task_Queue.extend task job (! queue) of
   501         SOME queue' => (queue := queue'; true)
   502       | NONE => false));
   503   in
   504     if extended then Future {promised = false, task = task, group = group, result = result}
   505     else
   506       singleton
   507         (forks {name = "Future.map", group = SOME group,
   508           deps = [task], pri = Task_Queue.pri_of_task task})
   509         (fn () => f (join x))
   510   end;
   511 
   512 
   513 (* promised futures -- fulfilled by external means *)
   514 
   515 fun promise_group group : 'a future =
   516   let
   517     val result = Single_Assignment.var "promise" : 'a result;
   518     fun abort () = assign_result group result Exn.interrupt_exn
   519       handle Fail _ => true
   520         | exn =>
   521             if Exn.is_interrupt exn then raise Fail "Concurrent attempt to fulfill promise"
   522             else reraise exn;
   523     val task = SYNCHRONIZED "enqueue_passive" (fn () =>
   524       Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));
   525   in Future {promised = true, task = task, group = group, result = result} end;
   526 
   527 fun promise () = promise_group (worker_subgroup ());
   528 
   529 fun fulfill_result (Future {promised, task, group, result}) res =
   530   if not promised then raise Fail "Not a promised future"
   531   else
   532     let
   533       fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
   534       val _ =
   535         Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
   536           let
   537             val still_passive =
   538               SYNCHRONIZED "fulfill_result" (fn () =>
   539                 Unsynchronized.change_result queue
   540                   (Task_Queue.dequeue_passive (Thread.self ()) task));
   541           in if still_passive then execute (task, group, [job]) else () end);
   542       val _ =
   543         worker_waiting (Task_Queue.init_deps [task])
   544           (fn () => Single_Assignment.await result);
   545     in () end;
   546 
   547 fun fulfill x res = fulfill_result x (Exn.Result res);
   548 
   549 
   550 (* cancellation *)
   551 
   552 fun interruptible_task f x =
   553   if Multithreading.available then
   554     Multithreading.with_attributes
   555       (if is_worker ()
   556        then Multithreading.private_interrupts
   557        else Multithreading.public_interrupts)
   558       (fn _ => f x)
   559   else interruptible f x;
   560 
   561 (*cancel: present and future group members will be interrupted eventually*)
   562 fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
   563  (if cancel_now group then () else cancel_later group;
   564   signal work_available; scheduler_check ()));
   565 
   566 fun cancel x = cancel_group (group_of x);
   567 
   568 
   569 (* shutdown *)
   570 
   571 fun shutdown () =
   572   if Multithreading.available then
   573     SYNCHRONIZED "shutdown" (fn () =>
   574      while scheduler_active () do
   575       (wait scheduler_event; broadcast_work ()))
   576   else ();
   577 
   578 
   579 (* status markup *)
   580 
   581 fun status e =
   582   let
   583     val task_props =
   584       (case worker_task () of
   585         NONE => I
   586       | SOME task => Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)]);
   587     val _ = Output.status (Markup.markup (task_props Markup.forked) "");
   588     val x = e ();  (*sic -- report "joined" only for success*)
   589     val _ = Output.status (Markup.markup (task_props Markup.joined) "");
   590   in x end;
   591 
   592 
   593 (*final declarations of this structure!*)
   594 val map = map_future;
   595 
   596 end;
   597 
   598 type 'a future = 'a Future.future;
   599