src/Pure/Concurrent/future.ML
changeset 55298 99b9249b3e05
parent 55286 db3d3d99c69d
child 56013 d64a4ef26edb
equal deleted inserted replaced
55297:f38b113697a2 55298:99b9249b3e05
    46   val new_group: group option -> group
    46   val new_group: group option -> group
    47   val worker_task: unit -> task option
    47   val worker_task: unit -> task option
    48   val worker_group: unit -> group option
    48   val worker_group: unit -> group option
    49   val the_worker_group: unit -> group
    49   val the_worker_group: unit -> group
    50   val worker_subgroup: unit -> group
    50   val worker_subgroup: unit -> group
    51   val worker_context: string -> group -> ('a -> 'b) -> 'a -> 'b
       
    52   type 'a future
    51   type 'a future
    53   val task_of: 'a future -> task
    52   val task_of: 'a future -> task
    54   val peek: 'a future -> 'a Exn.result option
    53   val peek: 'a future -> 'a Exn.result option
    55   val is_finished: 'a future -> bool
    54   val is_finished: 'a future -> bool
    56   val ML_statistics: bool Unsynchronized.ref
    55   val ML_statistics: bool Unsynchronized.ref
    66   val join_results: 'a future list -> 'a Exn.result list
    65   val join_results: 'a future list -> 'a Exn.result list
    67   val join_result: 'a future -> 'a Exn.result
    66   val join_result: 'a future -> 'a Exn.result
    68   val joins: 'a future list -> 'a list
    67   val joins: 'a future list -> 'a list
    69   val join: 'a future -> 'a
    68   val join: 'a future -> 'a
    70   val join_tasks: task list -> unit
    69   val join_tasks: task list -> unit
       
    70   val task_context: string -> group -> ('a -> 'b) -> 'a -> 'b
    71   val value_result: 'a Exn.result -> 'a future
    71   val value_result: 'a Exn.result -> 'a future
    72   val value: 'a -> 'a future
    72   val value: 'a -> 'a future
    73   val cond_forks: params -> (unit -> 'a) list -> 'a future list
    73   val cond_forks: params -> (unit -> 'a) list -> 'a future list
    74   val map: ('a -> 'b) -> 'a future -> 'b future
    74   val map: ('a -> 'b) -> 'a future -> 'b future
    75   val promise_group: group -> (unit -> unit) -> 'a future
    75   val promise_group: group -> (unit -> unit) -> 'a future
   106   (case worker_group () of
   106   (case worker_group () of
   107     SOME group => group
   107     SOME group => group
   108   | NONE => raise Fail "Missing worker thread context");
   108   | NONE => raise Fail "Missing worker thread context");
   109 
   109 
   110 fun worker_subgroup () = new_group (worker_group ());
   110 fun worker_subgroup () = new_group (worker_group ());
   111 
       
   112 fun worker_context name group f x =
       
   113   setmp_worker_task (Task_Queue.new_task group name NONE) f x;
       
   114 
   111 
   115 fun worker_joining e =
   112 fun worker_joining e =
   116   (case worker_task () of
   113   (case worker_task () of
   117     NONE => e ()
   114     NONE => e ()
   118   | SOME task => Task_Queue.joining task e);
   115   | SOME task => Task_Queue.joining task e);
   469   in ok end;
   466   in ok end;
   470 
   467 
   471 
   468 
   472 (* future jobs *)
   469 (* future jobs *)
   473 
   470 
   474 fun future_job group interrupts (e: unit -> 'a) =
   471 fun future_job group atts (e: unit -> 'a) =
   475   let
   472   let
   476     val result = Single_Assignment.var "future" : 'a result;
   473     val result = Single_Assignment.var "future" : 'a result;
   477     val pos = Position.thread_data ();
   474     val pos = Position.thread_data ();
   478     fun job ok =
   475     fun job ok =
   479       let
   476       let
   480         val res =
   477         val res =
   481           if ok then
   478           if ok then
   482             Exn.capture (fn () =>
   479             Exn.capture (fn () =>
   483               Multithreading.with_attributes
   480               Multithreading.with_attributes atts (fn _ => Position.setmp_thread_data pos e ())) ()
   484                 (if interrupts
       
   485                  then Multithreading.private_interrupts else Multithreading.no_interrupts)
       
   486                 (fn _ => Position.setmp_thread_data pos e ())) ()
       
   487           else Exn.interrupt_exn;
   481           else Exn.interrupt_exn;
   488       in assign_result group result (identify_result pos res) end;
   482       in assign_result group result (identify_result pos res) end;
   489   in (result, job) end;
   483   in (result, job) end;
   490 
   484 
   491 
   485 
   502         (case group of
   496         (case group of
   503           NONE => worker_subgroup ()
   497           NONE => worker_subgroup ()
   504         | SOME grp => grp);
   498         | SOME grp => grp);
   505       fun enqueue e queue =
   499       fun enqueue e queue =
   506         let
   500         let
   507           val (result, job) = future_job grp interrupts e;
   501           val atts =
       
   502             if interrupts
       
   503             then Multithreading.private_interrupts
       
   504             else Multithreading.no_interrupts;
       
   505           val (result, job) = future_job grp atts e;
   508           val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
   506           val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
   509           val future = Future {promised = false, task = task, result = result};
   507           val future = Future {promised = false, task = task, result = result};
   510         in (future, queue') end;
   508         in (future, queue') end;
   511     in
   509     in
   512       SYNCHRONIZED "enqueue" (fn () =>
   510       SYNCHRONIZED "enqueue" (fn () =>
   578       {name = "join_tasks", group = SOME (new_group NONE),
   576       {name = "join_tasks", group = SOME (new_group NONE),
   579         deps = tasks, pri = 0, interrupts = false} I
   577         deps = tasks, pri = 0, interrupts = false} I
   580     |> join;
   578     |> join;
   581 
   579 
   582 
   580 
       
   581 (* task context for running thread *)
       
   582 
       
   583 fun task_context name group f x =
       
   584   Multithreading.with_attributes Multithreading.no_interrupts (fn orig_atts =>
       
   585     let
       
   586       val (result, job) = future_job group orig_atts (fn () => f x);
       
   587       val task =
       
   588         SYNCHRONIZED "enroll" (fn () =>
       
   589           Unsynchronized.change_result queue (Task_Queue.enroll (Thread.self ()) name group));
       
   590       val _ = worker_exec (task, [job]);
       
   591     in
       
   592       (case Single_Assignment.peek result of
       
   593         NONE => raise Fail "Missing task context result"
       
   594       | SOME res => Exn.release res)
       
   595     end);
       
   596 
       
   597 
   583 (* fast-path operations -- bypass task queue if possible *)
   598 (* fast-path operations -- bypass task queue if possible *)
   584 
   599 
   585 fun value_result (res: 'a Exn.result) =
   600 fun value_result (res: 'a Exn.result) =
   586   let
   601   let
   587     val task = Task_Queue.dummy_task;
   602     val task = Task_Queue.dummy_task;
   600   if is_finished x then value_result (Exn.interruptible_capture (f o join) x)
   615   if is_finished x then value_result (Exn.interruptible_capture (f o join) x)
   601   else
   616   else
   602     let
   617     let
   603       val task = task_of x;
   618       val task = task_of x;
   604       val group = Task_Queue.group_of_task task;
   619       val group = Task_Queue.group_of_task task;
   605       val (result, job) = future_job group true (fn () => f (join x));
   620       val (result, job) =
       
   621         future_job group Multithreading.private_interrupts (fn () => f (join x));
   606 
   622 
   607       val extended = SYNCHRONIZED "extend" (fn () =>
   623       val extended = SYNCHRONIZED "extend" (fn () =>
   608         (case Task_Queue.extend task job (! queue) of
   624         (case Task_Queue.extend task job (! queue) of
   609           SOME queue' => (queue := queue'; true)
   625           SOME queue' => (queue := queue'; true)
   610         | NONE => false));
   626         | NONE => false));