src/Pure/Concurrent/future.ML
author wenzelm
Wed, 04 Nov 2009 11:37:06 +0100
changeset 33430 0a1c0c1209ec
parent 33429 a69ddd7dce95
child 33431 e351f4c1f18c
permissions -rw-r--r--
tuned;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Future values, see also
     5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
     6 
     7 Notes:
     8 
     9   * Futures are similar to delayed evaluation, i.e. delay/force is
    10     generalized to fork/join (and variants).  The idea is to model
    11     parallel value-oriented computations, but *not* communicating
    12     processes.
    13 
    14   * Futures are grouped; failure of one group member causes the whole
    15     group to be interrupted eventually.  Groups are block-structured.
    16 
    17   * Forked futures are evaluated spontaneously by a farm of worker
    18     threads in the background; join resynchronizes the computation and
    19     delivers results (values or exceptions).
    20 
    21   * The pool of worker threads is limited, usually in correlation with
    22     the number of physical cores on the machine.  Note that allocation
    23     of runtime resources is distorted either if workers yield CPU time
    24     (e.g. via system sleep or wait operations), or if non-worker
    25     threads contend for significant runtime resources independently.
    26 *)
    27 
    28 signature FUTURE =
    29 sig
    30   type task = Task_Queue.task
    31   type group = Task_Queue.group
    32   val is_worker: unit -> bool
    33   val worker_task: unit -> Task_Queue.task option
    34   val worker_group: unit -> Task_Queue.group option
    35   type 'a future
    36   val task_of: 'a future -> task
    37   val group_of: 'a future -> group
    38   val peek: 'a future -> 'a Exn.result option
    39   val is_finished: 'a future -> bool
    40   val value: 'a -> 'a future
    41   val fork_group: group -> (unit -> 'a) -> 'a future
    42   val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future
    43   val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
    44   val fork_pri: int -> (unit -> 'a) -> 'a future
    45   val fork: (unit -> 'a) -> 'a future
    46   val join_results: 'a future list -> 'a Exn.result list
    47   val join_result: 'a future -> 'a Exn.result
    48   val join: 'a future -> 'a
    49   val map: ('a -> 'b) -> 'a future -> 'b future
    50   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    51   val cancel_group: group -> unit
    52   val cancel: 'a future -> unit
    53   val shutdown: unit -> unit
    54 end;
    55 
    56 structure Future: FUTURE =
    57 struct
    58 
    59 (** future values **)
    60 
    61 (* identifiers *)
    62 
    63 type task = Task_Queue.task;
    64 type group = Task_Queue.group;
    65 
    66 local
    67   val tag = Universal.tag () : (task * group) option Universal.tag;
    68 in
    69   fun thread_data () = the_default NONE (Thread.getLocal tag);
    70   fun setmp_thread_data data f x =
    71     Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
    72 end;
    73 
    74 val is_worker = is_some o thread_data;
    75 val worker_task = Option.map #1 o thread_data;
    76 val worker_group = Option.map #2 o thread_data;
    77 
    78 
    79 (* datatype future *)
    80 
    81 datatype 'a future = Future of
    82  {task: task,
    83   group: group,
    84   result: 'a Exn.result option Synchronized.var};
    85 
    86 fun task_of (Future {task, ...}) = task;
    87 fun group_of (Future {group, ...}) = group;
    88 fun result_of (Future {result, ...}) = result;
    89 
    90 fun peek x = Synchronized.value (result_of x);
    91 fun is_finished x = is_some (peek x);
    92 
    93 fun value x = Future
    94  {task = Task_Queue.new_task 0,
    95   group = Task_Queue.new_group NONE,
    96   result = Synchronized.var "future" (SOME (Exn.Result x))};
    97 
    98 
    99 
   100 (** scheduling **)
   101 
   102 (* global state *)
   103 
   104 val queue = Unsynchronized.ref Task_Queue.empty;
   105 val next = Unsynchronized.ref 0;
   106 val max_workers = Unsynchronized.ref 0;
   107 val max_active = Unsynchronized.ref 0;
   108 val workers = Unsynchronized.ref ([]: (Thread.thread * bool Unsynchronized.ref) list);
   109 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
   110 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
   111 val do_shutdown = Unsynchronized.ref false;
   112 
   113 
   114 (* synchronization *)
   115 
   116 val scheduler_event = ConditionVar.conditionVar ();
   117 val work_available = ConditionVar.conditionVar ();
   118 val work_finished = ConditionVar.conditionVar ();
   119 
   120 local
   121   val lock = Mutex.mutex ();
   122 in
   123 
   124 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
   125 
   126 fun wait cond = (*requires SYNCHRONIZED*)
   127   Multithreading.sync_wait NONE NONE cond lock;
   128 
   129 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
   130   Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
   131 
   132 fun signal cond = (*requires SYNCHRONIZED*)
   133   ConditionVar.signal cond;
   134 
   135 fun broadcast cond = (*requires SYNCHRONIZED*)
   136   ConditionVar.broadcast cond;
   137 
   138 fun broadcast_work () = (*requires SYNCHRONIZED*)
   139  (ConditionVar.broadcast work_available;
   140   ConditionVar.broadcast work_finished);
   141 
   142 end;
   143 
   144 
   145 (* execute future jobs *)
   146 
   147 fun future_job group (e: unit -> 'a) =
   148   let
   149     val result = Synchronized.var "future" (NONE: 'a Exn.result option);
   150     fun job ok =
   151       let
   152         val res =
   153           if ok then
   154             Exn.capture (fn () =>
   155               Multithreading.with_attributes Multithreading.private_interrupts (fn _ => e ())) ()
   156           else Exn.Exn Exn.Interrupt;
   157         val _ = Synchronized.assign result (K (SOME res));
   158       in
   159         (case res of
   160           Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
   161         | Exn.Result _ => true)
   162       end;
   163   in (result, job) end;
   164 
   165 fun do_cancel group = (*requires SYNCHRONIZED*)
   166  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   167   broadcast scheduler_event);
   168 
   169 fun execute (task, group, jobs) =
   170   let
   171     val valid = not (Task_Queue.is_canceled group);
   172     val ok = setmp_thread_data (task, group) (fn () =>
   173       fold (fn job => fn ok => job valid andalso ok) jobs true) ();
   174     val _ = SYNCHRONIZED "finish" (fn () =>
   175       let
   176         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   177         val _ =
   178           if ok then ()
   179           else if Task_Queue.cancel (! queue) group then ()
   180           else do_cancel group;
   181         val _ = broadcast work_finished;
   182         val _ = if maximal then () else broadcast work_available;
   183       in () end);
   184   in () end;
   185 
   186 
   187 (* worker activity *)
   188 
   189 fun count_active () = (*requires SYNCHRONIZED*)
   190   fold (fn (_, active) => fn i => if ! active then i + 1 else i) (! workers) 0;
   191 
   192 fun find_active () = (*requires SYNCHRONIZED*)
   193   (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
   194     SOME active => active
   195   | NONE => raise Fail "Unregistered worker thread");
   196 
   197 
   198 (* worker threads *)
   199 
   200 fun worker_wait cond = (*requires SYNCHRONIZED*)
   201   let
   202     val active = find_active ();
   203     val _ = active := false;
   204     val _ = wait cond;
   205     val _ = active := true;
   206   in () end;
   207 
   208 fun worker_next have_work = (*requires SYNCHRONIZED*)
   209   if length (! workers) > ! max_workers then
   210     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
   211      if have_work then signal work_available else ();
   212      broadcast scheduler_event;
   213      NONE)
   214   else if count_active () > ! max_active then
   215     (if have_work then signal work_available else ();
   216      worker_wait scheduler_event;
   217      worker_next false)
   218   else
   219     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   220       NONE => (worker_wait work_available; worker_next true)
   221     | some => some);
   222 
   223 fun worker_loop name =
   224   (case SYNCHRONIZED name (fn () => worker_next false) of
   225     NONE => ()
   226   | SOME work => (execute work; worker_loop name));
   227 
   228 fun worker_start name = (*requires SYNCHRONIZED*)
   229   Unsynchronized.change workers (cons (SimpleThread.fork false (fn () => worker_loop name),
   230     Unsynchronized.ref true));
   231 
   232 
   233 (* scheduler *)
   234 
   235 val status_ticks = Unsynchronized.ref 0;
   236 
   237 val last_round = Unsynchronized.ref Time.zeroTime;
   238 val next_round = Time.fromMilliseconds 50;
   239 
   240 fun scheduler_next () = (*requires SYNCHRONIZED*)
   241   let
   242     val now = Time.now ();
   243     val tick = Time.<= (Time.+ (! last_round, next_round), now);
   244     val _ = if tick then last_round := now else ();
   245 
   246     (*queue and worker status*)
   247     val _ =
   248       if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
   249     val _ =
   250       if tick andalso ! status_ticks = 0 then
   251         Multithreading.tracing 1 (fn () =>
   252           let
   253             val {ready, pending, running} = Task_Queue.status (! queue);
   254             val total = length (! workers);
   255             val active = count_active ();
   256           in
   257             "SCHEDULE " ^ Time.toString now ^ ": " ^
   258               string_of_int ready ^ " ready, " ^
   259               string_of_int pending ^ " pending, " ^
   260               string_of_int running ^ " running; " ^
   261               string_of_int total ^ " workers, " ^
   262               string_of_int active ^ " active "
   263           end)
   264       else ();
   265 
   266     (*worker threads*)
   267     val _ =
   268       if forall (Thread.isActive o #1) (! workers) then ()
   269       else
   270         let
   271           val  (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
   272           val _ = workers := alive;
   273         in
   274           Multithreading.tracing 0 (fn () =>
   275             "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
   276         end;
   277 
   278     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   279     val _ = max_active := m;
   280 
   281     val mm = if m = 9999 then 1 else m * 2;
   282     val _ = max_workers := mm;
   283 
   284     val missing = ! max_workers - length (! workers);
   285     val _ =
   286       if missing > 0 then
   287        (funpow missing (fn () =>
   288           ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ();
   289         broadcast scheduler_event)
   290       else ();
   291 
   292     (*canceled groups*)
   293     val _ =
   294       if null (! canceled) then ()
   295       else
   296        (Multithreading.tracing 1 (fn () =>
   297           string_of_int (length (! canceled)) ^ " canceled groups");
   298         Unsynchronized.change canceled (filter_out (Task_Queue.cancel (! queue)));
   299         broadcast_work ());
   300 
   301     (*delay loop*)
   302     val _ = Exn.release (wait_timeout next_round scheduler_event);
   303 
   304     (*shutdown*)
   305     val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
   306     val continue = not (! do_shutdown andalso null (! workers));
   307     val _ = if continue then () else scheduler := NONE;
   308     val _ = broadcast scheduler_event;
   309   in continue end
   310   handle Exn.Interrupt =>
   311    (Multithreading.tracing 1 (fn () => "Interrupt");
   312     uninterruptible (fn _ => fn () => List.app do_cancel (Task_Queue.cancel_all (! queue))) ();
   313     scheduler_next ());
   314 
   315 fun scheduler_loop () =
   316   Multithreading.with_attributes
   317     (Multithreading.sync_interrupts Multithreading.public_interrupts)
   318     (fn _ => while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ());
   319 
   320 fun scheduler_active () = (*requires SYNCHRONIZED*)
   321   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   322 
   323 fun scheduler_check () = (*requires SYNCHRONIZED*)
   324  (do_shutdown := false;
   325   if scheduler_active () then ()
   326   else scheduler := SOME (SimpleThread.fork false scheduler_loop));
   327 
   328 
   329 
   330 (** futures **)
   331 
   332 (* fork *)
   333 
   334 fun fork_future opt_group deps pri e =
   335   let
   336     val group =
   337       (case opt_group of
   338         SOME group => group
   339       | NONE => Task_Queue.new_group (worker_group ()));
   340     val (result, job) = future_job group e;
   341     val task = SYNCHRONIZED "enqueue" (fn () =>
   342       let
   343         val (task, minimal) =
   344           Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job);
   345         val _ = if minimal then signal work_available else ();
   346         val _ = scheduler_check ();
   347       in task end);
   348   in Future {task = task, group = group, result = result} end;
   349 
   350 fun fork_group group e = fork_future (SOME group) [] 0 e;
   351 fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e;
   352 fun fork_deps deps e = fork_deps_pri deps 0 e;
   353 fun fork_pri pri e = fork_deps_pri [] pri e;
   354 fun fork e = fork_deps [] e;
   355 
   356 
   357 (* join *)
   358 
   359 local
   360 
   361 fun get_result x =
   362   (case peek x of
   363     NONE => Exn.Exn (SYS_ERROR "unfinished future")
   364   | SOME (Exn.Exn Exn.Interrupt) =>
   365       Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
   366   | SOME res => res);
   367 
   368 fun passive_wait x =
   369   Synchronized.readonly_access (result_of x) (fn NONE => NONE | SOME _ => SOME ());
   370 
   371 fun join_next deps = (*requires SYNCHRONIZED*)
   372   if null deps then NONE
   373   else
   374     (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
   375       (NONE, []) => NONE
   376     | (NONE, deps') => (worker_wait work_finished; join_next deps')
   377     | (SOME work, deps') => SOME (work, deps'));
   378 
   379 fun execute_work NONE = ()
   380   | execute_work (SOME (work, deps')) = (execute work; join_work deps')
   381 and join_work deps =
   382   execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
   383 
   384 fun join_depend task deps =
   385   execute_work (SYNCHRONIZED "join" (fn () =>
   386     (Unsynchronized.change queue (Task_Queue.depend task deps); join_next deps)));
   387 
   388 in
   389 
   390 fun join_results xs =
   391   if forall is_finished xs then map get_result xs
   392   else if Multithreading.self_critical () then
   393     error "Cannot join future values within critical section"
   394   else
   395     (case worker_task () of
   396       SOME task => join_depend task (map task_of xs)
   397     | NONE => List.app passive_wait xs;
   398     map get_result xs);
   399 
   400 end;
   401 
   402 fun join_result x = singleton join_results x;
   403 fun join x = Exn.release (join_result x);
   404 
   405 
   406 (* map *)
   407 
   408 fun map_future f x =
   409   let
   410     val task = task_of x;
   411     val group = Task_Queue.new_group (SOME (group_of x));
   412     val (result, job) = future_job group (fn () => f (join x));
   413 
   414     val extended = SYNCHRONIZED "extend" (fn () =>
   415       (case Task_Queue.extend task job (! queue) of
   416         SOME queue' => (queue := queue'; true)
   417       | NONE => false));
   418   in
   419     if extended then Future {task = task, group = group, result = result}
   420     else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
   421   end;
   422 
   423 
   424 (* cancellation *)
   425 
   426 fun interruptible_task f x =
   427   if Multithreading.available then
   428     Multithreading.with_attributes
   429       (if is_worker ()
   430        then Multithreading.private_interrupts
   431        else Multithreading.public_interrupts)
   432       (fn _ => f x)
   433   else interruptible f x;
   434 
   435 (*cancel: present and future group members will be interrupted eventually*)
   436 fun cancel_group group = SYNCHRONIZED "cancel" (fn () => do_cancel group);
   437 fun cancel x = cancel_group (group_of x);
   438 
   439 
   440 (* shutdown *)
   441 
   442 fun shutdown () =
   443   if Multithreading.available then
   444     SYNCHRONIZED "shutdown" (fn () =>
   445      while scheduler_active () do
   446       (wait scheduler_event; broadcast_work ()))
   447   else ();
   448 
   449 
   450 (*final declarations of this structure!*)
   451 val map = map_future;
   452 
   453 end;
   454 
   455 type 'a future = 'a Future.future;
   456