src/Pure/Concurrent/future.ML
author wenzelm
Thu, 04 Dec 2008 23:46:20 +0100
changeset 28979 3ce619d8d432
parent 28972 cb8a2c3e188f
child 29002 1b99dcae2156
permissions -rw-r--r--
fork/map: no inheritance of group (structure is nested, not parallel);
removed group thread_data;
refined Future.fork interfaces, no longer export Future.future;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     ID:         $Id$
     3     Author:     Makarius
     4 
     5 Future values.
     6 
     7 Notes:
     8 
     9   * Futures are similar to delayed evaluation, i.e. delay/force is
    10     generalized to fork/join (and variants).  The idea is to model
    11     parallel value-oriented computations, but *not* communicating
    12     processes.
    13 
    14   * Futures are grouped; failure of one group member causes the whole
    15     group to be interrupted eventually.
    16 
    17   * Forked futures are evaluated spontaneously by a farm of worker
    18     threads in the background; join resynchronizes the computation and
    19     delivers results (values or exceptions).
    20 
    21   * The pool of worker threads is limited, usually in correlation with
    22     the number of physical cores on the machine.  Note that allocation
    23     of runtime resources is distorted either if workers yield CPU time
    24     (e.g. via system sleep or wait operations), or if non-worker
    25     threads contend for significant runtime resources independently.
    26 *)
    27 
    28 signature FUTURE =
    29 sig
    30   val enabled: unit -> bool
    31   type task = TaskQueue.task
    32   type group = TaskQueue.group
    33   val thread_data: unit -> (string * task) option
    34   type 'a future
    35   val task_of: 'a future -> task
    36   val group_of: 'a future -> group
    37   val peek: 'a future -> 'a Exn.result option
    38   val is_finished: 'a future -> bool
    39   val fork: (unit -> 'a) -> 'a future
    40   val fork_group: group -> (unit -> 'a) -> 'a future
    41   val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
    42   val fork_background: (unit -> 'a) -> 'a future
    43   val join_results: 'a future list -> 'a Exn.result list
    44   val join_result: 'a future -> 'a Exn.result
    45   val join: 'a future -> 'a
    46   val map: ('a -> 'b) -> 'a future -> 'b future
    47   val focus: task list -> unit
    48   val interrupt_task: string -> unit
    49   val cancel: 'a future -> unit
    50   val shutdown: unit -> unit
    51 end;
    52 
    53 structure Future: FUTURE =
    54 struct
    55 
    56 (** future values **)
    57 
    58 fun enabled () =
    59   ! future_scheduler andalso Multithreading.enabled () andalso
    60     not (Multithreading.self_critical ());
    61 
    62 
    63 (* identifiers *)
    64 
    65 type task = TaskQueue.task;
    66 type group = TaskQueue.group;
    67 
    68 local val tag = Universal.tag () : (string * task) option Universal.tag in
    69   fun thread_data () = the_default NONE (Thread.getLocal tag);
    70   fun setmp_thread_data data f x = Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
    71 end;
    72 
    73 
    74 (* datatype future *)
    75 
    76 datatype 'a future = Future of
    77  {task: task,
    78   group: group,
    79   result: 'a Exn.result option ref};
    80 
    81 fun task_of (Future {task, ...}) = task;
    82 fun group_of (Future {group, ...}) = group;
    83 
    84 fun peek (Future {result, ...}) = ! result;
    85 fun is_finished x = is_some (peek x);
    86 
    87 
    88 
    89 (** scheduling **)
    90 
    91 (* global state *)
    92 
    93 val queue = ref TaskQueue.empty;
    94 val next = ref 0;
    95 val workers = ref ([]: (Thread.thread * bool) list);
    96 val scheduler = ref (NONE: Thread.thread option);
    97 val excessive = ref 0;
    98 val canceled = ref ([]: TaskQueue.group list);
    99 val do_shutdown = ref false;
   100 
   101 
   102 (* synchronization *)
   103 
   104 local
   105   val lock = Mutex.mutex ();
   106   val cond = ConditionVar.conditionVar ();
   107 in
   108 
   109 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
   110 
   111 fun wait name = (*requires SYNCHRONIZED*)
   112  (Multithreading.tracing 3 (fn () => name ^ ": wait ...");
   113   ConditionVar.wait (cond, lock);
   114   Multithreading.tracing 3 (fn () => name ^ ": ... continue"));
   115 
   116 fun wait_timeout name timeout = (*requires SYNCHRONIZED*)
   117  (Multithreading.tracing 3 (fn () => name ^ ": wait ...");
   118   ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout));
   119   Multithreading.tracing 3 (fn () => name ^ ": ... continue"));
   120 
   121 fun notify_all () = (*requires SYNCHRONIZED*)
   122   ConditionVar.broadcast cond;
   123 
   124 end;
   125 
   126 
   127 (* worker activity *)
   128 
   129 fun trace_active () =
   130   let
   131     val ws = ! workers;
   132     val m = string_of_int (length ws);
   133     val n = string_of_int (length (filter #2 ws));
   134   in Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active") end;
   135 
   136 fun change_active active = (*requires SYNCHRONIZED*)
   137   change workers (AList.update Thread.equal (Thread.self (), active));
   138 
   139 
   140 (* execute *)
   141 
   142 fun execute name (task, group, run) =
   143   let
   144     val _ = trace_active ();
   145     val ok = setmp_thread_data (name, task) run ();
   146     val _ = SYNCHRONIZED "execute" (fn () =>
   147      (change queue (TaskQueue.finish task);
   148       if ok then ()
   149       else if TaskQueue.cancel (! queue) group then ()
   150       else change canceled (cons group);
   151       notify_all ()));
   152   in () end;
   153 
   154 
   155 (* worker threads *)
   156 
   157 fun worker_wait name = (*requires SYNCHRONIZED*)
   158   (change_active false; wait name; change_active true);
   159 
   160 fun worker_next name = (*requires SYNCHRONIZED*)
   161   if ! excessive > 0 then
   162     (dec excessive;
   163      change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
   164      notify_all ();
   165      NONE)
   166   else
   167     (case change_result queue TaskQueue.dequeue of
   168       NONE => (worker_wait name; worker_next name)
   169     | some => some);
   170 
   171 fun worker_loop name =
   172   (case SYNCHRONIZED name (fn () => worker_next name) of
   173     NONE => Multithreading.tracing 3 (fn () => name ^ ": exit")
   174   | SOME work => (execute name work; worker_loop name));
   175 
   176 fun worker_start name = (*requires SYNCHRONIZED*)
   177   change workers (cons (SimpleThread.fork false (fn () => worker_loop name), true));
   178 
   179 
   180 (* scheduler *)
   181 
   182 fun scheduler_next () = (*requires SYNCHRONIZED*)
   183   let
   184     (*worker threads*)
   185     val _ =
   186       (case List.partition (Thread.isActive o #1) (! workers) of
   187         (_, []) => ()
   188       | (active, inactive) =>
   189           (workers := active; Multithreading.tracing 0 (fn () =>
   190             "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads")));
   191     val _ = trace_active ();
   192 
   193     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   194     val l = length (! workers);
   195     val _ = excessive := l - m;
   196     val _ =
   197       if m > l then funpow (m - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ()
   198       else ();
   199 
   200     (*canceled groups*)
   201     val _ =  change canceled (filter_out (TaskQueue.cancel (! queue)));
   202 
   203     (*shutdown*)
   204     val continue = not (! do_shutdown andalso null (! workers));
   205     val _ = if continue then () else scheduler := NONE;
   206 
   207     val _ = notify_all ();
   208     val _ = wait_timeout "scheduler" (Time.fromSeconds 3);
   209   in continue end;
   210 
   211 fun scheduler_loop () =
   212  (while SYNCHRONIZED "scheduler" scheduler_next do ();
   213   Multithreading.tracing 3 (fn () => "scheduler: exit"));
   214 
   215 fun scheduler_active () = (*requires SYNCHRONIZED*)
   216   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   217 
   218 fun scheduler_check name = SYNCHRONIZED name (fn () =>
   219   if not (scheduler_active ()) then
   220     (Multithreading.tracing 3 (fn () => "scheduler: fork");
   221      do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop))
   222   else if ! do_shutdown then error "Scheduler shutdown in progress"
   223   else ());
   224 
   225 
   226 (* future values: fork independent computation *)
   227 
   228 fun future opt_group deps pri (e: unit -> 'a) =
   229   let
   230     val _ = scheduler_check "future check";
   231 
   232     val group = (case opt_group of SOME group => group | NONE => TaskQueue.new_group ());
   233 
   234     val result = ref (NONE: 'a Exn.result option);
   235     val run = Multithreading.with_attributes (Thread.getAttributes ())
   236       (fn _ => fn ok =>
   237         let
   238           val res = if ok then Exn.capture e () else Exn.Exn Exn.Interrupt;
   239           val _ = result := SOME res;
   240           val res_ok =
   241             (case res of
   242               Exn.Result _ => true
   243             | Exn.Exn Exn.Interrupt => (TaskQueue.invalidate_group group; true)
   244             | _ => false);
   245         in res_ok end);
   246 
   247     val task = SYNCHRONIZED "future" (fn () =>
   248       change_result queue (TaskQueue.enqueue group deps pri run) before notify_all ());
   249   in Future {task = task, group = group, result = result} end;
   250 
   251 fun fork e = future NONE [] true e;
   252 fun fork_group group e = future (SOME group) [] true e;
   253 fun fork_deps deps e = future NONE (map task_of deps) true e;
   254 fun fork_background e = future NONE [] false e;
   255 
   256 
   257 (* join: retrieve results *)
   258 
   259 fun join_results [] = []
   260   | join_results xs = uninterruptible (fn _ => fn () =>
   261       let
   262         val _ = scheduler_check "join check";
   263         val _ = Multithreading.self_critical () andalso
   264           exists (not o is_finished) xs andalso
   265           error "Cannot join future values within critical section";
   266 
   267         fun join_loop _ [] = ()
   268           | join_loop name tasks =
   269               (case SYNCHRONIZED name (fn () =>
   270                   change_result queue (TaskQueue.dequeue_towards tasks)) of
   271                 NONE => ()
   272               | SOME (work, tasks') => (execute name work; join_loop name tasks'));
   273         val _ =
   274           (case thread_data () of
   275             NONE =>
   276               (*alien thread -- refrain from contending for resources*)
   277               while exists (not o is_finished) xs
   278               do SYNCHRONIZED "join_thread" (fn () => wait "join_thread")
   279           | SOME (name, task) =>
   280               (*proper task -- actively work towards results*)
   281               let
   282                 val unfinished = xs |> map_filter
   283                   (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE);
   284                 val _ = SYNCHRONIZED "join" (fn () =>
   285                   (change queue (TaskQueue.depend unfinished task); notify_all ()));
   286                 val _ = join_loop ("join_loop: " ^ name) unfinished;
   287                 val _ =
   288                   while exists (not o is_finished) xs
   289                   do SYNCHRONIZED "join_task" (fn () => worker_wait "join_task");
   290               in () end);
   291 
   292       in xs |> map (fn Future {result = ref (SOME res), ...} => res) end) ();
   293 
   294 fun join_result x = singleton join_results x;
   295 fun join x = Exn.release (join_result x);
   296 
   297 fun map f x = fork_deps [x] (fn () => f (join x));
   298 
   299 
   300 (* misc operations *)
   301 
   302 (*focus: collection of high-priority task*)
   303 fun focus tasks = SYNCHRONIZED "focus" (fn () =>
   304   change queue (TaskQueue.focus tasks));
   305 
   306 (*interrupt: permissive signal, may get ignored*)
   307 fun interrupt_task id = SYNCHRONIZED "interrupt"
   308   (fn () => TaskQueue.interrupt_external (! queue) id);
   309 
   310 (*cancel: present and future group members will be interrupted eventually*)
   311 fun cancel x =
   312  (scheduler_check "cancel check";
   313   SYNCHRONIZED "cancel" (fn () => (change canceled (cons (group_of x)); notify_all ())));
   314 
   315 
   316 (*global join and shutdown*)
   317 fun shutdown () =
   318   if Multithreading.available then
   319    (scheduler_check "shutdown check";
   320     SYNCHRONIZED "shutdown" (fn () =>
   321      (while not (scheduler_active ()) do wait "shutdown: scheduler inactive";
   322       while not (TaskQueue.is_empty (! queue)) do wait "shutdown: join";
   323       do_shutdown := true;
   324       notify_all ();
   325       while not (null (! workers)) do wait "shutdown: workers";
   326       while scheduler_active () do wait "shutdown: scheduler still active";
   327       OS.Process.sleep (Time.fromMilliseconds 300))))
   328   else ();
   329 
   330 end;
   331 
   332 type 'a future = 'a Future.future;
   333