src/Pure/Concurrent/future.ML
changeset 44993 058520fa03a8
parent 44824 318ca53e3fbb
child 44994 2d16c693d536
equal deleted inserted replaced
44992:7a44005dc2ec 44993:058520fa03a8
    37   val worker_subgroup: unit -> Task_Queue.group
    37   val worker_subgroup: unit -> Task_Queue.group
    38   type 'a future
    38   type 'a future
    39   val task_of: 'a future -> Task_Queue.task
    39   val task_of: 'a future -> Task_Queue.task
    40   val peek: 'a future -> 'a Exn.result option
    40   val peek: 'a future -> 'a Exn.result option
    41   val is_finished: 'a future -> bool
    41   val is_finished: 'a future -> bool
       
    42   val cancel_group: Task_Queue.group -> unit
       
    43   val cancel: 'a future -> unit
    42   val forks:
    44   val forks:
    43     {name: string, group: Task_Queue.group option, deps: Task_Queue.task list, pri: int} ->
    45     {name: string, group: Task_Queue.group option, deps: Task_Queue.task list, pri: int} ->
    44       (unit -> 'a) list -> 'a future list
    46       (unit -> 'a) list -> 'a future list
    45   val fork_pri: int -> (unit -> 'a) -> 'a future
    47   val fork_pri: int -> (unit -> 'a) -> 'a future
    46   val fork: (unit -> 'a) -> 'a future
    48   val fork: (unit -> 'a) -> 'a future
    55   val promise_group: Task_Queue.group -> 'a future
    57   val promise_group: Task_Queue.group -> 'a future
    56   val promise: unit -> 'a future
    58   val promise: unit -> 'a future
    57   val fulfill_result: 'a future -> 'a Exn.result -> unit
    59   val fulfill_result: 'a future -> 'a Exn.result -> unit
    58   val fulfill: 'a future -> 'a -> unit
    60   val fulfill: 'a future -> 'a -> unit
    59   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    61   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    60   val cancel_group: Task_Queue.group -> unit
       
    61   val cancel: 'a future -> unit
       
    62   val shutdown: unit -> unit
    62   val shutdown: unit -> unit
    63   val status: (unit -> 'a) -> 'a
    63   val status: (unit -> 'a) -> 'a
    64 end;
    64 end;
    65 
    65 
    66 structure Future: FUTURE =
    66 structure Future: FUTURE =
    72 
    72 
    73 local
    73 local
    74   val tag = Universal.tag () : Task_Queue.task option Universal.tag;
    74   val tag = Universal.tag () : Task_Queue.task option Universal.tag;
    75 in
    75 in
    76   fun worker_task () = the_default NONE (Thread.getLocal tag);
    76   fun worker_task () = the_default NONE (Thread.getLocal tag);
    77   fun setmp_worker_task data f x =
    77   fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x;
    78     Library.setmp_thread_data tag (worker_task ()) (SOME data) f x;
       
    79 end;
    78 end;
    80 
    79 
    81 val worker_group = Option.map Task_Queue.group_of_task o worker_task;
    80 val worker_group = Option.map Task_Queue.group_of_task o worker_task;
    82 fun worker_subgroup () = Task_Queue.new_group (worker_group ());
    81 fun worker_subgroup () = Task_Queue.new_group (worker_group ());
    83 
    82 
   104 fun task_of (Future {task, ...}) = task;
   103 fun task_of (Future {task, ...}) = task;
   105 fun result_of (Future {result, ...}) = result;
   104 fun result_of (Future {result, ...}) = result;
   106 
   105 
   107 fun peek x = Single_Assignment.peek (result_of x);
   106 fun peek x = Single_Assignment.peek (result_of x);
   108 fun is_finished x = is_some (peek x);
   107 fun is_finished x = is_some (peek x);
   109 
       
   110 fun assign_result group result res =
       
   111   let
       
   112     val _ = Single_Assignment.assign result res
       
   113       handle exn as Fail _ =>
       
   114         (case Single_Assignment.peek result of
       
   115           SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
       
   116         | _ => reraise exn);
       
   117     val ok =
       
   118       (case the (Single_Assignment.peek result) of
       
   119         Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
       
   120       | Exn.Res _ => true);
       
   121   in ok end;
       
   122 
   108 
   123 
   109 
   124 
   110 
   125 (** scheduling **)
   111 (** scheduling **)
   126 
   112 
   171 
   157 
   172 fun count_workers state = (*requires SYNCHRONIZED*)
   158 fun count_workers state = (*requires SYNCHRONIZED*)
   173   fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
   159   fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
   174 
   160 
   175 
   161 
   176 (* execute future jobs *)
   162 (* cancellation primitives *)
   177 
   163 
   178 fun future_job group (e: unit -> 'a) =
   164 fun interruptible_task f x =
   179   let
   165   if Multithreading.available then
   180     val result = Single_Assignment.var "future" : 'a result;
   166     Multithreading.with_attributes
   181     val pos = Position.thread_data ();
   167       (if is_some (worker_task ())
   182     fun job ok =
   168        then Multithreading.private_interrupts
   183       let
   169        else Multithreading.public_interrupts)
   184         val res =
   170       (fn _ => f x)
   185           if ok then
   171   else interruptible f x;
   186             Exn.capture (fn () =>
       
   187               Multithreading.with_attributes Multithreading.private_interrupts
       
   188                 (fn _ => Position.setmp_thread_data pos e ()) before
       
   189               Multithreading.interrupted ()) ()
       
   190           else Exn.interrupt_exn;
       
   191       in assign_result group result res end;
       
   192   in (result, job) end;
       
   193 
   172 
   194 fun cancel_now group = (*requires SYNCHRONIZED*)
   173 fun cancel_now group = (*requires SYNCHRONIZED*)
   195   Task_Queue.cancel (! queue) group;
   174   Task_Queue.cancel (! queue) group;
   196 
   175 
   197 fun cancel_later group = (*requires SYNCHRONIZED*)
   176 fun cancel_later group = (*requires SYNCHRONIZED*)
   198  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   177  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   199   broadcast scheduler_event);
   178   broadcast scheduler_event);
   200 
   179 
   201 fun execute (task, jobs) =
   180 
       
   181 (* worker threads *)
       
   182 
       
   183 fun worker_exec (task, jobs) =
   202   let
   184   let
   203     val group = Task_Queue.group_of_task task;
   185     val group = Task_Queue.group_of_task task;
   204     val valid = not (Task_Queue.is_canceled group);
   186     val valid = not (Task_Queue.is_canceled group);
   205     val ok =
   187     val ok =
   206       Task_Queue.running task (fn () =>
   188       Task_Queue.running task (fn () =>
   222         val _ = broadcast work_finished;
   204         val _ = broadcast work_finished;
   223         val _ = if maximal then () else signal work_available;
   205         val _ = if maximal then () else signal work_available;
   224       in () end);
   206       in () end);
   225   in () end;
   207   in () end;
   226 
   208 
   227 
       
   228 (* worker threads *)
       
   229 
       
   230 fun worker_wait active cond = (*requires SYNCHRONIZED*)
   209 fun worker_wait active cond = (*requires SYNCHRONIZED*)
   231   let
   210   let
   232     val state =
   211     val state =
   233       (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
   212       (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
   234         SOME state => state
   213         SOME state => state
   251     | some => (signal work_available; some));
   230     | some => (signal work_available; some));
   252 
   231 
   253 fun worker_loop name =
   232 fun worker_loop name =
   254   (case SYNCHRONIZED name (fn () => worker_next ()) of
   233   (case SYNCHRONIZED name (fn () => worker_next ()) of
   255     NONE => ()
   234     NONE => ()
   256   | SOME work => (Exn.capture Multithreading.interrupted (); execute work; worker_loop name));
   235   | SOME work => (Exn.capture Multithreading.interrupted (); worker_exec work; worker_loop name));
   257 
   236 
   258 fun worker_start name = (*requires SYNCHRONIZED*)
   237 fun worker_start name = (*requires SYNCHRONIZED*)
   259   Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
   238   Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
   260     Unsynchronized.ref Working));
   239     Unsynchronized.ref Working));
   261 
   240 
   394   else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
   373   else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
   395 
   374 
   396 
   375 
   397 
   376 
   398 (** futures **)
   377 (** futures **)
       
   378 
       
   379 (* cancellation *)
       
   380 
       
   381 (*cancel: present and future group members will be interrupted eventually*)
       
   382 fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
       
   383  (if cancel_now group then () else cancel_later group;
       
   384   signal work_available; scheduler_check ()));
       
   385 
       
   386 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
       
   387 
       
   388 
       
   389 (* future jobs *)
       
   390 
       
   391 fun assign_result group result res =
       
   392   let
       
   393     val _ = Single_Assignment.assign result res
       
   394       handle exn as Fail _ =>
       
   395         (case Single_Assignment.peek result of
       
   396           SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
       
   397         | _ => reraise exn);
       
   398     val ok =
       
   399       (case the (Single_Assignment.peek result) of
       
   400         Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
       
   401       | Exn.Res _ => true);
       
   402   in ok end;
       
   403 
       
   404 fun future_job group (e: unit -> 'a) =
       
   405   let
       
   406     val result = Single_Assignment.var "future" : 'a result;
       
   407     val pos = Position.thread_data ();
       
   408     fun job ok =
       
   409       let
       
   410         val res =
       
   411           if ok then
       
   412             Exn.capture (fn () =>
       
   413               Multithreading.with_attributes Multithreading.private_interrupts
       
   414                 (fn _ => Position.setmp_thread_data pos e ()) before
       
   415               Multithreading.interrupted ()) ()
       
   416           else Exn.interrupt_exn;
       
   417       in assign_result group result res end;
       
   418   in (result, job) end;
       
   419 
   399 
   420 
   400 (* fork *)
   421 (* fork *)
   401 
   422 
   402 fun forks {name, group, deps, pri} es =
   423 fun forks {name, group, deps, pri} es =
   403   if null es then []
   424   if null es then []
   450     | (NONE, deps') =>
   471     | (NONE, deps') =>
   451         (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
   472         (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
   452     | (SOME work, deps') => SOME (work, deps'));
   473     | (SOME work, deps') => SOME (work, deps'));
   453 
   474 
   454 fun execute_work NONE = ()
   475 fun execute_work NONE = ()
   455   | execute_work (SOME (work, deps')) = (worker_joining (fn () => execute work); join_work deps')
   476   | execute_work (SOME (work, deps')) =
       
   477       (worker_joining (fn () => worker_exec work); join_work deps')
   456 and join_work deps =
   478 and join_work deps =
   457   Multithreading.with_attributes Multithreading.no_interrupts
   479   Multithreading.with_attributes Multithreading.no_interrupts
   458     (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
   480     (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
   459 
   481 
   460 in
   482 in
   473 
   495 
   474 fun join_result x = singleton join_results x;
   496 fun join_result x = singleton join_results x;
   475 fun join x = Exn.release (join_result x);
   497 fun join x = Exn.release (join_result x);
   476 
   498 
   477 
   499 
   478 (* fast-path versions -- bypassing full task management *)
   500 (* fast-path versions -- bypassing task queue *)
   479 
   501 
   480 fun value (x: 'a) =
   502 fun value (x: 'a) =
   481   let
   503   let
   482     val task = Task_Queue.dummy_task ();
   504     val task = Task_Queue.dummy_task ();
   483     val group = Task_Queue.group_of_task task;
   505     val group = Task_Queue.group_of_task task;
   536           let
   558           let
   537             val still_passive =
   559             val still_passive =
   538               SYNCHRONIZED "fulfill_result" (fn () =>
   560               SYNCHRONIZED "fulfill_result" (fn () =>
   539                 Unsynchronized.change_result queue
   561                 Unsynchronized.change_result queue
   540                   (Task_Queue.dequeue_passive (Thread.self ()) task));
   562                   (Task_Queue.dequeue_passive (Thread.self ()) task));
   541           in if still_passive then execute (task, [job]) else () end);
   563           in if still_passive then worker_exec (task, [job]) else () end);
   542       val _ =
   564       val _ =
   543         if is_some (Single_Assignment.peek result) then ()
   565         if is_some (Single_Assignment.peek result) then ()
   544         else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
   566         else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
   545     in () end;
   567     in () end;
   546 
   568 
   547 fun fulfill x res = fulfill_result x (Exn.Res res);
   569 fun fulfill x res = fulfill_result x (Exn.Res res);
   548 
       
   549 
       
   550 (* cancellation *)
       
   551 
       
   552 fun interruptible_task f x =
       
   553   if Multithreading.available then
       
   554     Multithreading.with_attributes
       
   555       (if is_some (worker_task ())
       
   556        then Multithreading.private_interrupts
       
   557        else Multithreading.public_interrupts)
       
   558       (fn _ => f x)
       
   559   else interruptible f x;
       
   560 
       
   561 (*cancel: present and future group members will be interrupted eventually*)
       
   562 fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
       
   563  (if cancel_now group then () else cancel_later group;
       
   564   signal work_available; scheduler_check ()));
       
   565 
       
   566 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
       
   567 
   570 
   568 
   571 
   569 (* shutdown *)
   572 (* shutdown *)
   570 
   573 
   571 fun shutdown () =
   574 fun shutdown () =