src/Pure/Concurrent/future.ML
author wenzelm
Mon, 31 Jan 2011 23:02:53 +0100
changeset 42545 7da257539a8d
parent 42544 1c191a39549f
child 42559 79716cb61bfd
permissions -rw-r--r--
tuned signature;
tuned vacous forks;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Future values, see also
     5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
     6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
     7 
     8 Notes:
     9 
    10   * Futures are similar to delayed evaluation, i.e. delay/force is
    11     generalized to fork/join (and variants).  The idea is to model
    12     parallel value-oriented computations, but *not* communicating
    13     processes.
    14 
    15   * Futures are grouped; failure of one group member causes the whole
    16     group to be interrupted eventually.  Groups are block-structured.
    17 
    18   * Forked futures are evaluated spontaneously by a farm of worker
    19     threads in the background; join resynchronizes the computation and
    20     delivers results (values or exceptions).
    21 
    22   * The pool of worker threads is limited, usually in correlation with
    23     the number of physical cores on the machine.  Note that allocation
    24     of runtime resources is distorted either if workers yield CPU time
    25     (e.g. via system sleep or wait operations), or if non-worker
    26     threads contend for significant runtime resources independently.
    27 
    28   * Promised futures are fulfilled by external means.  There is no
    29     associated evaluation task, but other futures can depend on them
    30     as usual.
    31 *)
    32 
    33 signature FUTURE =
    34 sig
    35   type task = Task_Queue.task
    36   type group = Task_Queue.group
    37   val is_worker: unit -> bool
    38   val worker_task: unit -> Task_Queue.task option
    39   val worker_group: unit -> Task_Queue.group option
    40   val worker_subgroup: unit -> Task_Queue.group
    41   val worker_waiting: (unit -> 'a) -> 'a
    42   type 'a future
    43   val task_of: 'a future -> task
    44   val group_of: 'a future -> group
    45   val peek: 'a future -> 'a Exn.result option
    46   val is_finished: 'a future -> bool
    47   val forks: {name: string, group: group option, deps: task list, pri: int} ->
    48     (unit -> 'a) list -> 'a future list
    49   val fork_pri: int -> (unit -> 'a) -> 'a future
    50   val fork: (unit -> 'a) -> 'a future
    51   val join_results: 'a future list -> 'a Exn.result list
    52   val join_result: 'a future -> 'a Exn.result
    53   val join: 'a future -> 'a
    54   val value: 'a -> 'a future
    55   val map: ('a -> 'b) -> 'a future -> 'b future
    56   val promise_group: group -> 'a future
    57   val promise: unit -> 'a future
    58   val fulfill_result: 'a future -> 'a Exn.result -> unit
    59   val fulfill: 'a future -> 'a -> unit
    60   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    61   val cancel_group: group -> unit
    62   val cancel: 'a future -> unit
    63   val shutdown: unit -> unit
    64   val status: (unit -> 'a) -> 'a
    65 end;
    66 
    67 structure Future: FUTURE =
    68 struct
    69 
    70 (** future values **)
    71 
    72 (* identifiers *)
    73 
    74 type task = Task_Queue.task;
    75 type group = Task_Queue.group;
    76 
    77 local
    78   val tag = Universal.tag () : (task * group) option Universal.tag;
    79 in
    80   fun thread_data () = the_default NONE (Thread.getLocal tag);
    81   fun setmp_thread_data data f x =
    82     Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
    83 end;
    84 
    85 val is_worker = is_some o thread_data;
    86 val worker_task = Option.map #1 o thread_data;
    87 val worker_group = Option.map #2 o thread_data;
    88 fun worker_subgroup () = Task_Queue.new_group (worker_group ());
    89 
    90 fun worker_waiting e =
    91   (case worker_task () of
    92     NONE => e ()
    93   | SOME task => Task_Queue.waiting task e);
    94 
    95 
    96 (* datatype future *)
    97 
    98 type 'a result = 'a Exn.result Single_Assignment.var;
    99 
   100 datatype 'a future = Future of
   101  {promised: bool,
   102   task: task,
   103   group: group,
   104   result: 'a result};
   105 
   106 fun task_of (Future {task, ...}) = task;
   107 fun group_of (Future {group, ...}) = group;
   108 fun result_of (Future {result, ...}) = result;
   109 
   110 fun peek x = Single_Assignment.peek (result_of x);
   111 fun is_finished x = is_some (peek x);
   112 
   113 fun assign_result group result res =
   114   let
   115     val _ = Single_Assignment.assign result res
   116       handle exn as Fail _ =>
   117         (case Single_Assignment.peek result of
   118           SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
   119         | _ => reraise exn);
   120     val ok =
   121       (case the (Single_Assignment.peek result) of
   122         Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
   123       | Exn.Result _ => true);
   124   in ok end;
   125 
   126 
   127 
   128 (** scheduling **)
   129 
   130 (* synchronization *)
   131 
   132 val scheduler_event = ConditionVar.conditionVar ();
   133 val work_available = ConditionVar.conditionVar ();
   134 val work_finished = ConditionVar.conditionVar ();
   135 
   136 local
   137   val lock = Mutex.mutex ();
   138 in
   139 
   140 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
   141 
   142 fun wait cond = (*requires SYNCHRONIZED*)
   143   Multithreading.sync_wait NONE NONE cond lock;
   144 
   145 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
   146   Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
   147 
   148 fun signal cond = (*requires SYNCHRONIZED*)
   149   ConditionVar.signal cond;
   150 
   151 fun broadcast cond = (*requires SYNCHRONIZED*)
   152   ConditionVar.broadcast cond;
   153 
   154 fun broadcast_work () = (*requires SYNCHRONIZED*)
   155  (ConditionVar.broadcast work_available;
   156   ConditionVar.broadcast work_finished);
   157 
   158 end;
   159 
   160 
   161 (* global state *)
   162 
   163 val queue = Unsynchronized.ref Task_Queue.empty;
   164 val next = Unsynchronized.ref 0;
   165 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
   166 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
   167 val do_shutdown = Unsynchronized.ref false;
   168 val max_workers = Unsynchronized.ref 0;
   169 val max_active = Unsynchronized.ref 0;
   170 val worker_trend = Unsynchronized.ref 0;
   171 
   172 datatype worker_state = Working | Waiting | Sleeping;
   173 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
   174 
   175 fun count_workers state = (*requires SYNCHRONIZED*)
   176   fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
   177 
   178 
   179 (* execute future jobs *)
   180 
   181 fun future_job group (e: unit -> 'a) =
   182   let
   183     val result = Single_Assignment.var "future" : 'a result;
   184     val pos = Position.thread_data ();
   185     fun job ok =
   186       let
   187         val res =
   188           if ok then
   189             Exn.capture (fn () =>
   190               Multithreading.with_attributes Multithreading.private_interrupts
   191                 (fn _ => Position.setmp_thread_data pos e ())) ()
   192           else Exn.interrupt_exn;
   193       in assign_result group result res end;
   194   in (result, job) end;
   195 
   196 fun cancel_now group = (*requires SYNCHRONIZED*)
   197   Task_Queue.cancel (! queue) group;
   198 
   199 fun cancel_later group = (*requires SYNCHRONIZED*)
   200  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   201   broadcast scheduler_event);
   202 
   203 fun execute (task, group, jobs) =
   204   let
   205     val valid = not (Task_Queue.is_canceled group);
   206     val ok =
   207       Task_Queue.running task (fn () =>
   208         setmp_thread_data (task, group) (fn () =>
   209           fold (fn job => fn ok => job valid andalso ok) jobs true) ());
   210     val _ = Multithreading.tracing 1 (fn () =>
   211       let
   212         val s = Task_Queue.str_of_task task;
   213         fun micros time = string_of_int (Time.toNanoseconds time div 1000);
   214         val (run, wait) = pairself micros (Task_Queue.timing_of_task task);
   215       in "TASK " ^ s ^ " " ^ run ^ " " ^ wait end);
   216     val _ = SYNCHRONIZED "finish" (fn () =>
   217       let
   218         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   219         val _ =
   220           if ok then ()
   221           else if cancel_now group then ()
   222           else cancel_later group;
   223         val _ = broadcast work_finished;
   224         val _ = if maximal then () else signal work_available;
   225       in () end);
   226   in () end;
   227 
   228 
   229 (* worker threads *)
   230 
   231 fun worker_wait active cond = (*requires SYNCHRONIZED*)
   232   let
   233     val state =
   234       (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
   235         SOME state => state
   236       | NONE => raise Fail "Unregistered worker thread");
   237     val _ = state := (if active then Waiting else Sleeping);
   238     val _ = wait cond;
   239     val _ = state := Working;
   240   in () end;
   241 
   242 fun worker_next () = (*requires SYNCHRONIZED*)
   243   if length (! workers) > ! max_workers then
   244     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
   245      signal work_available;
   246      NONE)
   247   else if count_workers Working > ! max_active then
   248     (worker_wait false work_available; worker_next ())
   249   else
   250     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   251       NONE => (worker_wait false work_available; worker_next ())
   252     | some => (signal work_available; some));
   253 
   254 fun worker_loop name =
   255   (case SYNCHRONIZED name (fn () => worker_next ()) of
   256     NONE => ()
   257   | SOME work => (execute work; worker_loop name));
   258 
   259 fun worker_start name = (*requires SYNCHRONIZED*)
   260   Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
   261     Unsynchronized.ref Working));
   262 
   263 
   264 (* scheduler *)
   265 
   266 val status_ticks = Unsynchronized.ref 0;
   267 
   268 val last_round = Unsynchronized.ref Time.zeroTime;
   269 val next_round = seconds 0.05;
   270 
   271 fun scheduler_next () = (*requires SYNCHRONIZED*)
   272   let
   273     val now = Time.now ();
   274     val tick = Time.<= (Time.+ (! last_round, next_round), now);
   275     val _ = if tick then last_round := now else ();
   276 
   277 
   278     (* queue and worker status *)
   279 
   280     val _ =
   281       if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
   282     val _ =
   283       if tick andalso ! status_ticks = 0 then
   284         Multithreading.tracing 1 (fn () =>
   285           let
   286             val {ready, pending, running, passive} = Task_Queue.status (! queue);
   287             val total = length (! workers);
   288             val active = count_workers Working;
   289             val waiting = count_workers Waiting;
   290           in
   291             "SCHEDULE " ^ Time.toString now ^ ": " ^
   292               string_of_int ready ^ " ready, " ^
   293               string_of_int pending ^ " pending, " ^
   294               string_of_int running ^ " running, " ^
   295               string_of_int passive ^ " passive; " ^
   296               string_of_int total ^ " workers, " ^
   297               string_of_int active ^ " active, " ^
   298               string_of_int waiting ^ " waiting "
   299           end)
   300       else ();
   301 
   302     val _ =
   303       if forall (Thread.isActive o #1) (! workers) then ()
   304       else
   305         let
   306           val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
   307           val _ = workers := alive;
   308         in
   309           Multithreading.tracing 0 (fn () =>
   310             "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
   311         end;
   312 
   313 
   314     (* worker pool adjustments *)
   315 
   316     val max_active0 = ! max_active;
   317     val max_workers0 = ! max_workers;
   318 
   319     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   320     val _ = max_active := m;
   321 
   322     val mm =
   323       if ! do_shutdown then 0
   324       else if m = 9999 then 1
   325       else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
   326     val _ =
   327       if tick andalso mm > ! max_workers then
   328         Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
   329       else if tick andalso mm < ! max_workers then
   330         Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
   331       else ();
   332     val _ =
   333       if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
   334         max_workers := mm
   335       else if ! worker_trend > 5 andalso ! max_workers < 2 * m then
   336         max_workers := Int.min (mm, 2 * m)
   337       else ();
   338 
   339     val missing = ! max_workers - length (! workers);
   340     val _ =
   341       if missing > 0 then
   342         funpow missing (fn () =>
   343           ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
   344       else ();
   345 
   346     val _ =
   347       if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
   348       else signal work_available;
   349 
   350 
   351     (* canceled groups *)
   352 
   353     val _ =
   354       if null (! canceled) then ()
   355       else
   356        (Multithreading.tracing 1 (fn () =>
   357           string_of_int (length (! canceled)) ^ " canceled groups");
   358         Unsynchronized.change canceled (filter_out cancel_now);
   359         broadcast_work ());
   360 
   361 
   362     (* delay loop *)
   363 
   364     val _ = Exn.release (wait_timeout next_round scheduler_event);
   365 
   366 
   367     (* shutdown *)
   368 
   369     val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
   370     val continue = not (! do_shutdown andalso null (! workers));
   371     val _ = if continue then () else scheduler := NONE;
   372 
   373     val _ = broadcast scheduler_event;
   374   in continue end
   375   handle exn =>
   376     if Exn.is_interrupt exn then
   377      (Multithreading.tracing 1 (fn () => "Interrupt");
   378       List.app cancel_later (Task_Queue.cancel_all (! queue));
   379       broadcast_work (); true)
   380     else reraise exn;
   381 
   382 fun scheduler_loop () =
   383   while
   384     Multithreading.with_attributes
   385       (Multithreading.sync_interrupts Multithreading.public_interrupts)
   386       (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
   387   do ();
   388 
   389 fun scheduler_active () = (*requires SYNCHRONIZED*)
   390   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   391 
   392 fun scheduler_check () = (*requires SYNCHRONIZED*)
   393  (do_shutdown := false;
   394   if scheduler_active () then ()
   395   else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
   396 
   397 
   398 
   399 (** futures **)
   400 
   401 (* fork *)
   402 
   403 fun forks {name, group, deps, pri} es =
   404   if null es then []
   405   else
   406     let
   407       val grp =
   408         (case group of
   409           NONE => worker_subgroup ()
   410         | SOME grp => grp);
   411       fun enqueue e (minimal, queue) =
   412         let
   413           val (result, job) = future_job grp e;
   414           val ((task, minimal'), queue') = Task_Queue.enqueue name grp deps pri job queue;
   415           val future = Future {promised = false, task = task, group = grp, result = result};
   416         in (future, (minimal orelse minimal', queue')) end;
   417     in
   418       SYNCHRONIZED "enqueue" (fn () =>
   419         let
   420           val (futures, minimal) =
   421             Unsynchronized.change_result queue (fn q =>
   422               let val (futures, (minimal, q')) = fold_map enqueue es (false, q)
   423               in ((futures, minimal), q') end);
   424           val _ = if minimal then signal work_available else ();
   425           val _ = scheduler_check ();
   426         in futures end)
   427     end;
   428 
   429 fun fork_pri pri e = singleton (forks {name = "", group = NONE, deps = [], pri = pri}) e;
   430 fun fork e = fork_pri 0 e;
   431 
   432 
   433 (* join *)
   434 
   435 local
   436 
   437 fun get_result x =
   438   (case peek x of
   439     NONE => Exn.Exn (Fail "Unfinished future")
   440   | SOME res =>
   441       if Exn.is_interrupt_exn res then
   442         (case Exn.flatten_list (Task_Queue.group_status (group_of x)) of
   443           [] => res
   444         | exns => Exn.Exn (Exn.EXCEPTIONS exns))
   445       else res);
   446 
   447 fun join_next deps = (*requires SYNCHRONIZED*)
   448   if null deps then NONE
   449   else
   450     (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
   451       (NONE, []) => NONE
   452     | (NONE, deps') => (worker_wait true work_finished; join_next deps')
   453     | (SOME work, deps') => SOME (work, deps'));
   454 
   455 fun execute_work NONE = ()
   456   | execute_work (SOME (work, deps')) = (execute work; join_work deps')
   457 and join_work deps =
   458   execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
   459 
   460 fun join_depend task deps =
   461   execute_work (SYNCHRONIZED "join" (fn () =>
   462     (Unsynchronized.change queue (Task_Queue.depend task deps); join_next deps)));
   463 
   464 in
   465 
   466 fun join_results xs =
   467   if forall is_finished xs then map get_result xs
   468   else if Multithreading.self_critical () then
   469     error "Cannot join future values within critical section"
   470   else
   471     worker_waiting (fn () =>
   472       (case worker_task () of
   473         SOME task => join_depend task (map task_of xs)
   474       | NONE => List.app (ignore o Single_Assignment.await o result_of) xs;
   475       map get_result xs));
   476 
   477 end;
   478 
   479 fun join_result x = singleton join_results x;
   480 fun join x = Exn.release (join_result x);
   481 
   482 
   483 (* fast-path versions -- bypassing full task management *)
   484 
   485 fun value (x: 'a) =
   486   let
   487     val group = Task_Queue.new_group NONE;
   488     val result = Single_Assignment.var "value" : 'a result;
   489     val _ = assign_result group result (Exn.Result x);
   490   in Future {promised = false, task = Task_Queue.dummy_task, group = group, result = result} end;
   491 
   492 fun map_future f x =
   493   let
   494     val task = task_of x;
   495     val group = Task_Queue.new_group (SOME (group_of x));
   496     val (result, job) = future_job group (fn () => f (join x));
   497 
   498     val extended = SYNCHRONIZED "extend" (fn () =>
   499       (case Task_Queue.extend task job (! queue) of
   500         SOME queue' => (queue := queue'; true)
   501       | NONE => false));
   502   in
   503     if extended then Future {promised = false, task = task, group = group, result = result}
   504     else
   505       singleton
   506         (forks {name = "Future.map", group = SOME group,
   507           deps = [task], pri = Task_Queue.pri_of_task task})
   508         (fn () => f (join x))
   509   end;
   510 
   511 
   512 (* promised futures -- fulfilled by external means *)
   513 
   514 fun promise_group group : 'a future =
   515   let
   516     val result = Single_Assignment.var "promise" : 'a result;
   517     fun abort () = assign_result group result Exn.interrupt_exn
   518       handle Fail _ => true
   519         | exn =>
   520             if Exn.is_interrupt exn then raise Fail "Concurrent attempt to fulfill promise"
   521             else reraise exn;
   522     val task = SYNCHRONIZED "enqueue_passive" (fn () =>
   523       Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));
   524   in Future {promised = true, task = task, group = group, result = result} end;
   525 
   526 fun promise () = promise_group (worker_subgroup ());
   527 
   528 fun fulfill_result (Future {promised, task, group, result}) res =
   529   if not promised then raise Fail "Not a promised future"
   530   else
   531     let
   532       fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
   533       val _ =
   534         Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
   535           let
   536             val still_passive =
   537               SYNCHRONIZED "fulfill_result" (fn () =>
   538                 Unsynchronized.change_result queue
   539                   (Task_Queue.dequeue_passive (Thread.self ()) task));
   540           in if still_passive then execute (task, group, [job]) else () end);
   541       val _ = worker_waiting (fn () => Single_Assignment.await result);
   542     in () end;
   543 
   544 fun fulfill x res = fulfill_result x (Exn.Result res);
   545 
   546 
   547 (* cancellation *)
   548 
   549 fun interruptible_task f x =
   550   if Multithreading.available then
   551     Multithreading.with_attributes
   552       (if is_worker ()
   553        then Multithreading.private_interrupts
   554        else Multithreading.public_interrupts)
   555       (fn _ => f x)
   556   else interruptible f x;
   557 
   558 (*cancel: present and future group members will be interrupted eventually*)
   559 fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
   560  (if cancel_now group then () else cancel_later group;
   561   signal work_available; scheduler_check ()));
   562 
   563 fun cancel x = cancel_group (group_of x);
   564 
   565 
   566 (* shutdown *)
   567 
   568 fun shutdown () =
   569   if Multithreading.available then
   570     SYNCHRONIZED "shutdown" (fn () =>
   571      while scheduler_active () do
   572       (wait scheduler_event; broadcast_work ()))
   573   else ();
   574 
   575 
   576 (* status markup *)
   577 
   578 fun status e =
   579   let
   580     val task_props =
   581       (case worker_task () of
   582         NONE => I
   583       | SOME task => Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)]);
   584     val _ = Output.status (Markup.markup (task_props Markup.forked) "");
   585     val x = e ();  (*sic -- report "joined" only for success*)
   586     val _ = Output.status (Markup.markup (task_props Markup.joined) "");
   587   in x end;
   588 
   589 
   590 (*final declarations of this structure!*)
   591 val map = map_future;
   592 
   593 end;
   594 
   595 type 'a future = 'a Future.future;
   596