src/Pure/Concurrent/future.ML
author wenzelm
Thu, 22 Oct 2009 15:21:01 +0200
changeset 33068 e3e61133e0fc
parent 32823 81897d30b97f
child 33427 1ddcb8472bd2
permissions -rw-r--r--
use Synchronized.assign to achieve actual immutable results;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Future values, see also
     5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
     6 
     7 Notes:
     8 
     9   * Futures are similar to delayed evaluation, i.e. delay/force is
    10     generalized to fork/join (and variants).  The idea is to model
    11     parallel value-oriented computations, but *not* communicating
    12     processes.
    13 
    14   * Futures are grouped; failure of one group member causes the whole
    15     group to be interrupted eventually.  Groups are block-structured.
    16 
    17   * Forked futures are evaluated spontaneously by a farm of worker
    18     threads in the background; join resynchronizes the computation and
    19     delivers results (values or exceptions).
    20 
    21   * The pool of worker threads is limited, usually in correlation with
    22     the number of physical cores on the machine.  Note that allocation
    23     of runtime resources is distorted either if workers yield CPU time
    24     (e.g. via system sleep or wait operations), or if non-worker
    25     threads contend for significant runtime resources independently.
    26 *)
    27 
    28 signature FUTURE =
    29 sig
    30   type task = Task_Queue.task
    31   type group = Task_Queue.group
    32   val is_worker: unit -> bool
    33   val worker_task: unit -> Task_Queue.task option
    34   val worker_group: unit -> Task_Queue.group option
    35   type 'a future
    36   val task_of: 'a future -> task
    37   val group_of: 'a future -> group
    38   val peek: 'a future -> 'a Exn.result option
    39   val is_finished: 'a future -> bool
    40   val value: 'a -> 'a future
    41   val fork_group: group -> (unit -> 'a) -> 'a future
    42   val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future
    43   val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
    44   val fork_pri: int -> (unit -> 'a) -> 'a future
    45   val fork: (unit -> 'a) -> 'a future
    46   val join_results: 'a future list -> 'a Exn.result list
    47   val join_result: 'a future -> 'a Exn.result
    48   val join: 'a future -> 'a
    49   val map: ('a -> 'b) -> 'a future -> 'b future
    50   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    51   val cancel_group: group -> unit
    52   val cancel: 'a future -> unit
    53   val shutdown: unit -> unit
    54 end;
    55 
    56 structure Future: FUTURE =
    57 struct
    58 
    59 (** future values **)
    60 
    61 (* identifiers *)
    62 
    63 type task = Task_Queue.task;
    64 type group = Task_Queue.group;
    65 
    66 local
    67   val tag = Universal.tag () : (string * task * group) option Universal.tag;
    68 in
    69   fun thread_data () = the_default NONE (Thread.getLocal tag);
    70   fun setmp_thread_data data f x =
    71     Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
    72 end;
    73 
    74 val is_worker = is_some o thread_data;
    75 val worker_task = Option.map #2 o thread_data;
    76 val worker_group = Option.map #3 o thread_data;
    77 
    78 
    79 (* datatype future *)
    80 
    81 datatype 'a future = Future of
    82  {task: task,
    83   group: group,
    84   result: 'a Exn.result option Synchronized.var};
    85 
    86 fun task_of (Future {task, ...}) = task;
    87 fun group_of (Future {group, ...}) = group;
    88 fun result_of (Future {result, ...}) = result;
    89 
    90 fun peek x = Synchronized.value (result_of x);
    91 fun is_finished x = is_some (peek x);
    92 
    93 fun value x = Future
    94  {task = Task_Queue.new_task 0,
    95   group = Task_Queue.new_group NONE,
    96   result = Synchronized.var "future" (SOME (Exn.Result x))};
    97 
    98 
    99 
   100 (** scheduling **)
   101 
   102 (* global state *)
   103 
   104 val queue = Unsynchronized.ref Task_Queue.empty;
   105 val next = Unsynchronized.ref 0;
   106 val workers = Unsynchronized.ref ([]: (Thread.thread * bool) list);
   107 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
   108 val excessive = Unsynchronized.ref 0;
   109 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
   110 val do_shutdown = Unsynchronized.ref false;
   111 
   112 
   113 (* synchronization *)
   114 
   115 val scheduler_event = ConditionVar.conditionVar ();
   116 val work_available = ConditionVar.conditionVar ();
   117 val work_finished = ConditionVar.conditionVar ();
   118 
   119 local
   120   val lock = Mutex.mutex ();
   121 in
   122 
   123 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
   124 
   125 fun wait cond = (*requires SYNCHRONIZED*)
   126   Multithreading.sync_wait NONE NONE cond lock;
   127 
   128 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
   129   Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
   130 
   131 fun signal cond = (*requires SYNCHRONIZED*)
   132   ConditionVar.signal cond;
   133 
   134 fun broadcast cond = (*requires SYNCHRONIZED*)
   135   ConditionVar.broadcast cond;
   136 
   137 fun broadcast_work () = (*requires SYNCHRONIZED*)
   138  (ConditionVar.broadcast work_available;
   139   ConditionVar.broadcast work_finished);
   140 
   141 end;
   142 
   143 
   144 (* execute future jobs *)
   145 
   146 fun future_job group (e: unit -> 'a) =
   147   let
   148     val result = Synchronized.var "future" (NONE: 'a Exn.result option);
   149     fun job ok =
   150       let
   151         val res =
   152           if ok then
   153             Exn.capture (fn () =>
   154               Multithreading.with_attributes Multithreading.private_interrupts (fn _ => e ())) ()
   155           else Exn.Exn Exn.Interrupt;
   156         val _ = Synchronized.assign result (K (SOME res));
   157       in
   158         (case res of
   159           Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
   160         | Exn.Result _ => true)
   161       end;
   162   in (result, job) end;
   163 
   164 fun do_cancel group = (*requires SYNCHRONIZED*)
   165  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   166   broadcast scheduler_event);
   167 
   168 fun execute name (task, group, jobs) =
   169   let
   170     val valid = not (Task_Queue.is_canceled group);
   171     val ok = setmp_thread_data (name, task, group) (fn () =>
   172       fold (fn job => fn ok => job valid andalso ok) jobs true) ();
   173     val _ = SYNCHRONIZED "finish" (fn () =>
   174       let
   175         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   176         val _ =
   177           if ok then ()
   178           else if Task_Queue.cancel (! queue) group then ()
   179           else do_cancel group;
   180         val _ = broadcast work_finished;
   181         val _ = if maximal then () else broadcast work_available;
   182       in () end);
   183   in () end;
   184 
   185 
   186 (* worker activity *)
   187 
   188 fun count_active () = (*requires SYNCHRONIZED*)
   189   fold (fn (_, active) => fn i => if active then i + 1 else i) (! workers) 0;
   190 
   191 fun change_active active = (*requires SYNCHRONIZED*)
   192   Unsynchronized.change workers
   193     (AList.update Thread.equal (Thread.self (), active));
   194 
   195 
   196 (* worker threads *)
   197 
   198 fun worker_wait cond = (*requires SYNCHRONIZED*)
   199   (change_active false; wait cond; change_active true);
   200 
   201 fun worker_next () = (*requires SYNCHRONIZED*)
   202   if ! excessive > 0 then
   203     (Unsynchronized.dec excessive;
   204      Unsynchronized.change workers
   205       (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
   206      broadcast scheduler_event;
   207      NONE)
   208   else if count_active () > Multithreading.max_threads_value () then
   209     (worker_wait scheduler_event; worker_next ())
   210   else
   211     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   212       NONE => (worker_wait work_available; worker_next ())
   213     | some => some);
   214 
   215 fun worker_loop name =
   216   (case SYNCHRONIZED name (fn () => worker_next ()) of
   217     NONE => ()
   218   | SOME work => (execute name work; worker_loop name));
   219 
   220 fun worker_start name = (*requires SYNCHRONIZED*)
   221   Unsynchronized.change workers (cons (SimpleThread.fork false (fn () =>
   222      (broadcast scheduler_event; worker_loop name)), true));
   223 
   224 
   225 (* scheduler *)
   226 
   227 val last_status = Unsynchronized.ref Time.zeroTime;
   228 val next_status = Time.fromMilliseconds 500;
   229 val next_round = Time.fromMilliseconds 50;
   230 
   231 fun scheduler_next () = (*requires SYNCHRONIZED*)
   232   let
   233     (*queue and worker status*)
   234     val _ =
   235       let val now = Time.now () in
   236         if Time.> (Time.+ (! last_status, next_status), now) then ()
   237         else
   238          (last_status := now; Multithreading.tracing 1 (fn () =>
   239             let
   240               val {ready, pending, running} = Task_Queue.status (! queue);
   241               val total = length (! workers);
   242               val active = count_active ();
   243             in
   244               "SCHEDULE " ^ Time.toString now ^ ": " ^
   245                 string_of_int ready ^ " ready, " ^
   246                 string_of_int pending ^ " pending, " ^
   247                 string_of_int running ^ " running; " ^
   248                 string_of_int total ^ " workers, " ^
   249                 string_of_int active ^ " active"
   250             end))
   251       end;
   252 
   253     (*worker threads*)
   254     val _ =
   255       if forall (Thread.isActive o #1) (! workers) then ()
   256       else
   257         (case List.partition (Thread.isActive o #1) (! workers) of
   258           (_, []) => ()
   259         | (alive, dead) =>
   260             (workers := alive; Multithreading.tracing 0 (fn () =>
   261               "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")));
   262 
   263     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   264     val mm = if m = 9999 then 1 else m * 2;
   265     val l = length (! workers);
   266     val _ = excessive := l - mm;
   267     val _ =
   268       if mm > l then
   269         funpow (mm - l) (fn () =>
   270           worker_start ("worker " ^ string_of_int (Unsynchronized.inc next))) ()
   271       else ();
   272 
   273     (*canceled groups*)
   274     val _ =
   275       if null (! canceled) then ()
   276       else
   277        (Multithreading.tracing 1 (fn () =>
   278           string_of_int (length (! canceled)) ^ " canceled groups");
   279         Unsynchronized.change canceled (filter_out (Task_Queue.cancel (! queue)));
   280         broadcast_work ());
   281 
   282     (*delay loop*)
   283     val _ = Exn.release (wait_timeout next_round scheduler_event);
   284 
   285     (*shutdown*)
   286     val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
   287     val continue = not (! do_shutdown andalso null (! workers));
   288     val _ = if continue then () else scheduler := NONE;
   289     val _ = broadcast scheduler_event;
   290   in continue end
   291   handle Exn.Interrupt =>
   292    (Multithreading.tracing 1 (fn () => "Interrupt");
   293     uninterruptible (fn _ => fn () => List.app do_cancel (Task_Queue.cancel_all (! queue))) ();
   294     scheduler_next ());
   295 
   296 fun scheduler_loop () =
   297   Multithreading.with_attributes
   298     (Multithreading.sync_interrupts Multithreading.public_interrupts)
   299     (fn _ => while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ());
   300 
   301 fun scheduler_active () = (*requires SYNCHRONIZED*)
   302   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   303 
   304 fun scheduler_check () = (*requires SYNCHRONIZED*)
   305  (do_shutdown := false;
   306   if scheduler_active () then ()
   307   else scheduler := SOME (SimpleThread.fork false scheduler_loop));
   308 
   309 
   310 
   311 (** futures **)
   312 
   313 (* fork *)
   314 
   315 fun fork_future opt_group deps pri e =
   316   let
   317     val group =
   318       (case opt_group of
   319         SOME group => group
   320       | NONE => Task_Queue.new_group (worker_group ()));
   321     val (result, job) = future_job group e;
   322     val task = SYNCHRONIZED "enqueue" (fn () =>
   323       let
   324         val (task, minimal) =
   325           Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job);
   326         val _ = if minimal then signal work_available else ();
   327         val _ = scheduler_check ();
   328       in task end);
   329   in Future {task = task, group = group, result = result} end;
   330 
   331 fun fork_group group e = fork_future (SOME group) [] 0 e;
   332 fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e;
   333 fun fork_deps deps e = fork_deps_pri deps 0 e;
   334 fun fork_pri pri e = fork_deps_pri [] pri e;
   335 fun fork e = fork_deps [] e;
   336 
   337 
   338 (* join *)
   339 
   340 local
   341 
   342 fun get_result x =
   343   (case peek x of
   344     NONE => Exn.Exn (SYS_ERROR "unfinished future")
   345   | SOME (Exn.Exn Exn.Interrupt) =>
   346       Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
   347   | SOME res => res);
   348 
   349 fun join_wait x =
   350   Synchronized.readonly_access (result_of x) (fn NONE => NONE | SOME _ => SOME ());
   351 
   352 fun join_next deps = (*requires SYNCHRONIZED*)
   353   if null deps then NONE
   354   else
   355     (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
   356       (NONE, []) => NONE
   357     | (NONE, deps') => (worker_wait work_finished; join_next deps')
   358     | (SOME work, deps') => SOME (work, deps'));
   359 
   360 fun execute_work NONE = ()
   361   | execute_work (SOME (work, deps')) = (execute "join" work; join_work deps')
   362 and join_work deps =
   363   execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
   364 
   365 fun join_depend task deps =
   366   execute_work (SYNCHRONIZED "join" (fn () =>
   367     (Unsynchronized.change queue (Task_Queue.depend task deps); join_next deps)));
   368 
   369 in
   370 
   371 fun join_results xs =
   372   if forall is_finished xs then map get_result xs
   373   else if Multithreading.self_critical () then
   374     error "Cannot join future values within critical section"
   375   else
   376     (case worker_task () of
   377       SOME task => join_depend task (map task_of xs)
   378     | NONE => List.app join_wait xs;
   379     map get_result xs);
   380 
   381 end;
   382 
   383 fun join_result x = singleton join_results x;
   384 fun join x = Exn.release (join_result x);
   385 
   386 
   387 (* map *)
   388 
   389 fun map_future f x =
   390   let
   391     val task = task_of x;
   392     val group = Task_Queue.new_group (SOME (group_of x));
   393     val (result, job) = future_job group (fn () => f (join x));
   394 
   395     val extended = SYNCHRONIZED "extend" (fn () =>
   396       (case Task_Queue.extend task job (! queue) of
   397         SOME queue' => (queue := queue'; true)
   398       | NONE => false));
   399   in
   400     if extended then Future {task = task, group = group, result = result}
   401     else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
   402   end;
   403 
   404 
   405 (* cancellation *)
   406 
   407 fun interruptible_task f x =
   408   if Multithreading.available then
   409     Multithreading.with_attributes
   410       (if is_worker ()
   411        then Multithreading.private_interrupts
   412        else Multithreading.public_interrupts)
   413       (fn _ => f x)
   414   else interruptible f x;
   415 
   416 (*cancel: present and future group members will be interrupted eventually*)
   417 fun cancel_group group = SYNCHRONIZED "cancel" (fn () => do_cancel group);
   418 fun cancel x = cancel_group (group_of x);
   419 
   420 
   421 (* shutdown *)
   422 
   423 fun shutdown () =
   424   if Multithreading.available then
   425     SYNCHRONIZED "shutdown" (fn () =>
   426      while scheduler_active () do
   427       (wait scheduler_event; broadcast_work ()))
   428   else ();
   429 
   430 
   431 (*final declarations of this structure!*)
   432 val map = map_future;
   433 
   434 end;
   435 
   436 type 'a future = 'a Future.future;
   437