src/Pure/Concurrent/future.ML
author wenzelm
Sat, 23 Jul 2011 21:29:56 +0200
changeset 44823 804783d6ee26
parent 44633 e72ba84ae58f
child 44824 318ca53e3fbb
permissions -rw-r--r--
more detailed tracing;
tuned;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Future values, see also
     5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
     6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
     7 
     8 Notes:
     9 
    10   * Futures are similar to delayed evaluation, i.e. delay/force is
    11     generalized to fork/join (and variants).  The idea is to model
    12     parallel value-oriented computations, but *not* communicating
    13     processes.
    14 
    15   * Futures are grouped; failure of one group member causes the whole
    16     group to be interrupted eventually.  Groups are block-structured.
    17 
    18   * Forked futures are evaluated spontaneously by a farm of worker
    19     threads in the background; join resynchronizes the computation and
    20     delivers results (values or exceptions).
    21 
    22   * The pool of worker threads is limited, usually in correlation with
    23     the number of physical cores on the machine.  Note that allocation
    24     of runtime resources is distorted either if workers yield CPU time
    25     (e.g. via system sleep or wait operations), or if non-worker
    26     threads contend for significant runtime resources independently.
    27 
    28   * Promised futures are fulfilled by external means.  There is no
    29     associated evaluation task, but other futures can depend on them
    30     as usual.
    31 *)
    32 
    33 signature FUTURE =
    34 sig
    35   val worker_task: unit -> Task_Queue.task option
    36   val worker_group: unit -> Task_Queue.group option
    37   val worker_subgroup: unit -> Task_Queue.group
    38   type 'a future
    39   val task_of: 'a future -> Task_Queue.task
    40   val peek: 'a future -> 'a Exn.result option
    41   val is_finished: 'a future -> bool
    42   val forks:
    43     {name: string, group: Task_Queue.group option, deps: Task_Queue.task list, pri: int} ->
    44       (unit -> 'a) list -> 'a future list
    45   val fork_pri: int -> (unit -> 'a) -> 'a future
    46   val fork: (unit -> 'a) -> 'a future
    47   val join_results: 'a future list -> 'a Exn.result list
    48   val join_result: 'a future -> 'a Exn.result
    49   val join: 'a future -> 'a
    50   val value: 'a -> 'a future
    51   val map: ('a -> 'b) -> 'a future -> 'b future
    52   val cond_forks:
    53     {name: string, group: Task_Queue.group option, deps: Task_Queue.task list, pri: int} ->
    54       (unit -> 'a) list -> 'a future list
    55   val promise_group: Task_Queue.group -> 'a future
    56   val promise: unit -> 'a future
    57   val fulfill_result: 'a future -> 'a Exn.result -> unit
    58   val fulfill: 'a future -> 'a -> unit
    59   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    60   val cancel_group: Task_Queue.group -> unit
    61   val cancel: 'a future -> unit
    62   val shutdown: unit -> unit
    63   val status: (unit -> 'a) -> 'a
    64 end;
    65 
    66 structure Future: FUTURE =
    67 struct
    68 
    69 (** future values **)
    70 
    71 (* identifiers *)
    72 
    73 local
    74   val tag = Universal.tag () : Task_Queue.task option Universal.tag;
    75 in
    76   fun worker_task () = the_default NONE (Thread.getLocal tag);
    77   fun setmp_worker_task data f x =
    78     Library.setmp_thread_data tag (worker_task ()) (SOME data) f x;
    79 end;
    80 
    81 val worker_group = Option.map Task_Queue.group_of_task o worker_task;
    82 fun worker_subgroup () = Task_Queue.new_group (worker_group ());
    83 
    84 fun worker_joining e =
    85   (case worker_task () of
    86     NONE => e ()
    87   | SOME task => Task_Queue.joining task e);
    88 
    89 fun worker_waiting deps e =
    90   (case worker_task () of
    91     NONE => e ()
    92   | SOME task => Task_Queue.waiting task deps e);
    93 
    94 
    95 (* datatype future *)
    96 
    97 type 'a result = 'a Exn.result Single_Assignment.var;
    98 
    99 datatype 'a future = Future of
   100  {promised: bool,
   101   task: Task_Queue.task,
   102   result: 'a result};
   103 
   104 fun task_of (Future {task, ...}) = task;
   105 fun result_of (Future {result, ...}) = result;
   106 
   107 fun peek x = Single_Assignment.peek (result_of x);
   108 fun is_finished x = is_some (peek x);
   109 
   110 fun assign_result group result res =
   111   let
   112     val _ = Single_Assignment.assign result res
   113       handle exn as Fail _ =>
   114         (case Single_Assignment.peek result of
   115           SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
   116         | _ => reraise exn);
   117     val ok =
   118       (case the (Single_Assignment.peek result) of
   119         Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
   120       | Exn.Res _ => true);
   121   in ok end;
   122 
   123 
   124 
   125 (** scheduling **)
   126 
   127 (* synchronization *)
   128 
   129 val scheduler_event = ConditionVar.conditionVar ();
   130 val work_available = ConditionVar.conditionVar ();
   131 val work_finished = ConditionVar.conditionVar ();
   132 
   133 local
   134   val lock = Mutex.mutex ();
   135 in
   136 
   137 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
   138 
   139 fun wait cond = (*requires SYNCHRONIZED*)
   140   Multithreading.sync_wait NONE NONE cond lock;
   141 
   142 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
   143   Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
   144 
   145 fun signal cond = (*requires SYNCHRONIZED*)
   146   ConditionVar.signal cond;
   147 
   148 fun broadcast cond = (*requires SYNCHRONIZED*)
   149   ConditionVar.broadcast cond;
   150 
   151 fun broadcast_work () = (*requires SYNCHRONIZED*)
   152  (ConditionVar.broadcast work_available;
   153   ConditionVar.broadcast work_finished);
   154 
   155 end;
   156 
   157 
   158 (* global state *)
   159 
   160 val queue = Unsynchronized.ref Task_Queue.empty;
   161 val next = Unsynchronized.ref 0;
   162 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
   163 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
   164 val do_shutdown = Unsynchronized.ref false;
   165 val max_workers = Unsynchronized.ref 0;
   166 val max_active = Unsynchronized.ref 0;
   167 val worker_trend = Unsynchronized.ref 0;
   168 
   169 datatype worker_state = Working | Waiting | Sleeping;
   170 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
   171 
   172 fun count_workers state = (*requires SYNCHRONIZED*)
   173   fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
   174 
   175 
   176 (* execute future jobs *)
   177 
   178 fun future_job group (e: unit -> 'a) =
   179   let
   180     val result = Single_Assignment.var "future" : 'a result;
   181     val pos = Position.thread_data ();
   182     fun job ok =
   183       let
   184         val res =
   185           if ok then
   186             Exn.capture (fn () =>
   187               Multithreading.with_attributes Multithreading.private_interrupts
   188                 (fn _ => Position.setmp_thread_data pos e ()) before
   189               Multithreading.interrupted ()) ()
   190           else Exn.interrupt_exn;
   191       in assign_result group result res end;
   192   in (result, job) end;
   193 
   194 fun cancel_now group = (*requires SYNCHRONIZED*)
   195   Task_Queue.cancel (! queue) group;
   196 
   197 fun cancel_later group = (*requires SYNCHRONIZED*)
   198  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   199   broadcast scheduler_event);
   200 
   201 fun execute (task, jobs) =
   202   let
   203     val group = Task_Queue.group_of_task task;
   204     val valid = not (Task_Queue.is_canceled group);
   205     val ok =
   206       Task_Queue.running task (fn () =>
   207         setmp_worker_task task (fn () =>
   208           fold (fn job => fn ok => job valid andalso ok) jobs true) ());
   209     val _ = Multithreading.tracing 2 (fn () =>
   210       let
   211         val s = Task_Queue.str_of_task_groups task;
   212         fun micros time = string_of_int (Time.toNanoseconds time div 1000);
   213         val (run, wait, deps) = Task_Queue.timing_of_task task;
   214       in "TASK " ^ s ^ " " ^ micros run ^ " " ^ micros wait ^ " (" ^ commas deps ^ ")" end);
   215     val _ = SYNCHRONIZED "finish" (fn () =>
   216       let
   217         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   218         val _ =
   219           if ok then ()
   220           else if cancel_now group then ()
   221           else cancel_later group;
   222         val _ = broadcast work_finished;
   223         val _ = if maximal then () else signal work_available;
   224       in () end);
   225   in () end;
   226 
   227 
   228 (* worker threads *)
   229 
   230 fun worker_wait active cond = (*requires SYNCHRONIZED*)
   231   let
   232     val state =
   233       (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
   234         SOME state => state
   235       | NONE => raise Fail "Unregistered worker thread");
   236     val _ = state := (if active then Waiting else Sleeping);
   237     val _ = wait cond;
   238     val _ = state := Working;
   239   in () end;
   240 
   241 fun worker_next () = (*requires SYNCHRONIZED*)
   242   if length (! workers) > ! max_workers then
   243     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
   244      signal work_available;
   245      NONE)
   246   else if count_workers Working > ! max_active then
   247     (worker_wait false work_available; worker_next ())
   248   else
   249     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   250       NONE => (worker_wait false work_available; worker_next ())
   251     | some => (signal work_available; some));
   252 
   253 fun worker_loop name =
   254   (case SYNCHRONIZED name (fn () => worker_next ()) of
   255     NONE => ()
   256   | SOME work => (execute work; worker_loop name));
   257 
   258 fun worker_start name = (*requires SYNCHRONIZED*)
   259   Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
   260     Unsynchronized.ref Working));
   261 
   262 
   263 (* scheduler *)
   264 
   265 val status_ticks = Unsynchronized.ref 0;
   266 
   267 val last_round = Unsynchronized.ref Time.zeroTime;
   268 val next_round = seconds 0.05;
   269 
   270 fun scheduler_next () = (*requires SYNCHRONIZED*)
   271   let
   272     val now = Time.now ();
   273     val tick = Time.<= (Time.+ (! last_round, next_round), now);
   274     val _ = if tick then last_round := now else ();
   275 
   276 
   277     (* queue and worker status *)
   278 
   279     val _ =
   280       if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
   281     val _ =
   282       if tick andalso ! status_ticks = 0 then
   283         Multithreading.tracing 1 (fn () =>
   284           let
   285             val {ready, pending, running, passive} = Task_Queue.status (! queue);
   286             val total = length (! workers);
   287             val active = count_workers Working;
   288             val waiting = count_workers Waiting;
   289           in
   290             "SCHEDULE " ^ Time.toString now ^ ": " ^
   291               string_of_int ready ^ " ready, " ^
   292               string_of_int pending ^ " pending, " ^
   293               string_of_int running ^ " running, " ^
   294               string_of_int passive ^ " passive; " ^
   295               string_of_int total ^ " workers, " ^
   296               string_of_int active ^ " active, " ^
   297               string_of_int waiting ^ " waiting "
   298           end)
   299       else ();
   300 
   301     val _ =
   302       if forall (Thread.isActive o #1) (! workers) then ()
   303       else
   304         let
   305           val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
   306           val _ = workers := alive;
   307         in
   308           Multithreading.tracing 0 (fn () =>
   309             "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
   310         end;
   311 
   312 
   313     (* worker pool adjustments *)
   314 
   315     val max_active0 = ! max_active;
   316     val max_workers0 = ! max_workers;
   317 
   318     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   319     val _ = max_active := m;
   320 
   321     val mm =
   322       if ! do_shutdown then 0
   323       else if m = 9999 then 1
   324       else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
   325     val _ =
   326       if tick andalso mm > ! max_workers then
   327         Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
   328       else if tick andalso mm < ! max_workers then
   329         Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
   330       else ();
   331     val _ =
   332       if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
   333         max_workers := mm
   334       else if ! worker_trend > 5 andalso ! max_workers < 2 * m then
   335         max_workers := Int.min (mm, 2 * m)
   336       else ();
   337 
   338     val missing = ! max_workers - length (! workers);
   339     val _ =
   340       if missing > 0 then
   341         funpow missing (fn () =>
   342           ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
   343       else ();
   344 
   345     val _ =
   346       if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
   347       else signal work_available;
   348 
   349 
   350     (* canceled groups *)
   351 
   352     val _ =
   353       if null (! canceled) then ()
   354       else
   355        (Multithreading.tracing 1 (fn () =>
   356           string_of_int (length (! canceled)) ^ " canceled groups");
   357         Unsynchronized.change canceled (filter_out cancel_now);
   358         broadcast_work ());
   359 
   360 
   361     (* delay loop *)
   362 
   363     val _ = Exn.release (wait_timeout next_round scheduler_event);
   364 
   365 
   366     (* shutdown *)
   367 
   368     val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
   369     val continue = not (! do_shutdown andalso null (! workers));
   370     val _ = if continue then () else scheduler := NONE;
   371 
   372     val _ = broadcast scheduler_event;
   373   in continue end
   374   handle exn =>
   375     if Exn.is_interrupt exn then
   376      (Multithreading.tracing 1 (fn () => "Interrupt");
   377       List.app cancel_later (Task_Queue.cancel_all (! queue));
   378       broadcast_work (); true)
   379     else reraise exn;
   380 
   381 fun scheduler_loop () =
   382   while
   383     Multithreading.with_attributes
   384       (Multithreading.sync_interrupts Multithreading.public_interrupts)
   385       (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
   386   do ();
   387 
   388 fun scheduler_active () = (*requires SYNCHRONIZED*)
   389   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   390 
   391 fun scheduler_check () = (*requires SYNCHRONIZED*)
   392  (do_shutdown := false;
   393   if scheduler_active () then ()
   394   else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
   395 
   396 
   397 
   398 (** futures **)
   399 
   400 (* fork *)
   401 
   402 fun forks {name, group, deps, pri} es =
   403   if null es then []
   404   else
   405     let
   406       val grp =
   407         (case group of
   408           NONE => worker_subgroup ()
   409         | SOME grp => grp);
   410       fun enqueue e queue =
   411         let
   412           val (result, job) = future_job grp e;
   413           val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
   414           val future = Future {promised = false, task = task, result = result};
   415         in (future, queue') end;
   416     in
   417       SYNCHRONIZED "enqueue" (fn () =>
   418         let
   419           val (futures, queue') = fold_map enqueue es (! queue);
   420           val _ = queue := queue';
   421           val minimal = forall (not o Task_Queue.known_task queue') deps;
   422           val _ = if minimal then signal work_available else ();
   423           val _ = scheduler_check ();
   424         in futures end)
   425     end;
   426 
   427 fun fork_pri pri e = singleton (forks {name = "", group = NONE, deps = [], pri = pri}) e;
   428 fun fork e = fork_pri 0 e;
   429 
   430 
   431 (* join *)
   432 
   433 local
   434 
   435 fun get_result x =
   436   (case peek x of
   437     NONE => Exn.Exn (Fail "Unfinished future")
   438   | SOME res =>
   439       if Exn.is_interrupt_exn res then
   440         (case Exn.flatten_list (Task_Queue.group_status (Task_Queue.group_of_task (task_of x))) of
   441           [] => res
   442         | exns => Exn.Exn (Exn.EXCEPTIONS exns))
   443       else res);
   444 
   445 fun join_next deps = (*requires SYNCHRONIZED*)
   446   if null deps then NONE
   447   else
   448     (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
   449       (NONE, []) => NONE
   450     | (NONE, deps') =>
   451         (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
   452     | (SOME work, deps') => SOME (work, deps'));
   453 
   454 fun execute_work NONE = ()
   455   | execute_work (SOME (work, deps')) = (worker_joining (fn () => execute work); join_work deps')
   456 and join_work deps =
   457   Multithreading.with_attributes Multithreading.no_interrupts
   458     (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
   459 
   460 in
   461 
   462 fun join_results xs =
   463   let
   464     val _ =
   465       if forall is_finished xs then ()
   466       else if Multithreading.self_critical () then
   467         error "Cannot join future values within critical section"
   468       else if is_some (worker_task ()) then join_work (map task_of xs)
   469       else List.app (ignore o Single_Assignment.await o result_of) xs;
   470   in map get_result xs end;
   471 
   472 end;
   473 
   474 fun join_result x = singleton join_results x;
   475 fun join x = Exn.release (join_result x);
   476 
   477 
   478 (* fast-path versions -- bypassing full task management *)
   479 
   480 fun value (x: 'a) =
   481   let
   482     val task = Task_Queue.dummy_task ();
   483     val group = Task_Queue.group_of_task task;
   484     val result = Single_Assignment.var "value" : 'a result;
   485     val _ = assign_result group result (Exn.Res x);
   486   in Future {promised = false, task = task, result = result} end;
   487 
   488 fun map_future f x =
   489   let
   490     val task = task_of x;
   491     val group = Task_Queue.new_group (SOME (Task_Queue.group_of_task task));
   492     val (result, job) = future_job group (fn () => f (join x));
   493 
   494     val extended = SYNCHRONIZED "extend" (fn () =>
   495       (case Task_Queue.extend task job (! queue) of
   496         SOME queue' => (queue := queue'; true)
   497       | NONE => false));
   498   in
   499     if extended then Future {promised = false, task = task, result = result}
   500     else
   501       singleton
   502         (forks {name = "Future.map", group = SOME group,
   503           deps = [task], pri = Task_Queue.pri_of_task task})
   504         (fn () => f (join x))
   505   end;
   506 
   507 fun cond_forks args es =
   508   if Multithreading.enabled () then forks args es
   509   else map (fn e => value (e ())) es;
   510 
   511 
   512 (* promised futures -- fulfilled by external means *)
   513 
   514 fun promise_group group : 'a future =
   515   let
   516     val result = Single_Assignment.var "promise" : 'a result;
   517     fun abort () = assign_result group result Exn.interrupt_exn
   518       handle Fail _ => true
   519         | exn =>
   520             if Exn.is_interrupt exn then raise Fail "Concurrent attempt to fulfill promise"
   521             else reraise exn;
   522     val task = SYNCHRONIZED "enqueue_passive" (fn () =>
   523       Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));
   524   in Future {promised = true, task = task, result = result} end;
   525 
   526 fun promise () = promise_group (worker_subgroup ());
   527 
   528 fun fulfill_result (Future {promised, task, result}) res =
   529   if not promised then raise Fail "Not a promised future"
   530   else
   531     let
   532       val group = Task_Queue.group_of_task task;
   533       fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
   534       val _ =
   535         Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
   536           let
   537             val still_passive =
   538               SYNCHRONIZED "fulfill_result" (fn () =>
   539                 Unsynchronized.change_result queue
   540                   (Task_Queue.dequeue_passive (Thread.self ()) task));
   541           in if still_passive then execute (task, [job]) else () end);
   542       val _ =
   543         if is_some (Single_Assignment.peek result) then ()
   544         else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
   545     in () end;
   546 
   547 fun fulfill x res = fulfill_result x (Exn.Res res);
   548 
   549 
   550 (* cancellation *)
   551 
   552 fun interruptible_task f x =
   553   if Multithreading.available then
   554     Multithreading.with_attributes
   555       (if is_some (worker_task ())
   556        then Multithreading.private_interrupts
   557        else Multithreading.public_interrupts)
   558       (fn _ => f x)
   559   else interruptible f x;
   560 
   561 (*cancel: present and future group members will be interrupted eventually*)
   562 fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
   563  (if cancel_now group then () else cancel_later group;
   564   signal work_available; scheduler_check ()));
   565 
   566 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
   567 
   568 
   569 (* shutdown *)
   570 
   571 fun shutdown () =
   572   if Multithreading.available then
   573     SYNCHRONIZED "shutdown" (fn () =>
   574      while scheduler_active () do
   575       (wait scheduler_event; broadcast_work ()))
   576   else ();
   577 
   578 
   579 (* status markup *)
   580 
   581 fun status e =
   582   let
   583     val task_props =
   584       (case worker_task () of
   585         NONE => I
   586       | SOME task => Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)]);
   587     val _ = Output.status (Markup.markup_only (task_props Markup.forked));
   588     val x = e ();  (*sic -- report "joined" only for success*)
   589     val _ = Output.status (Markup.markup_only (task_props Markup.joined));
   590   in x end;
   591 
   592 
   593 (*final declarations of this structure!*)
   594 val map = map_future;
   595 
   596 end;
   597 
   598 type 'a future = 'a Future.future;
   599