src/Pure/Concurrent/future.ML
changeset 42541 74010c6af0a4
parent 40705 f9347a30d1b2
child 42543 2f70b1ddd09f
     1.1 --- a/src/Pure/Concurrent/future.ML	Mon Jan 31 15:28:48 2011 +0100
     1.2 +++ b/src/Pure/Concurrent/future.ML	Mon Jan 31 16:34:10 2011 +0100
     1.3 @@ -38,6 +38,7 @@
     1.4    val worker_task: unit -> Task_Queue.task option
     1.5    val worker_group: unit -> Task_Queue.group option
     1.6    val worker_subgroup: unit -> Task_Queue.group
     1.7 +  val worker_waiting: (unit -> 'a) -> 'a
     1.8    type 'a future
     1.9    val task_of: 'a future -> task
    1.10    val group_of: 'a future -> group
    1.11 @@ -87,6 +88,11 @@
    1.12  val worker_group = Option.map #2 o thread_data;
    1.13  fun worker_subgroup () = Task_Queue.new_group (worker_group ());
    1.14  
    1.15 +fun worker_waiting e =
    1.16 +  (case worker_task () of
    1.17 +    NONE => e ()
    1.18 +  | SOME task => Task_Queue.waiting task e);
    1.19 +
    1.20  
    1.21  (* datatype future *)
    1.22  
    1.23 @@ -198,8 +204,16 @@
    1.24  fun execute (task, group, jobs) =
    1.25    let
    1.26      val valid = not (Task_Queue.is_canceled group);
    1.27 -    val ok = setmp_thread_data (task, group) (fn () =>
    1.28 -      fold (fn job => fn ok => job valid andalso ok) jobs true) ();
    1.29 +    val ok =
    1.30 +      Task_Queue.running task (fn () =>
    1.31 +        setmp_thread_data (task, group) (fn () =>
    1.32 +          fold (fn job => fn ok => job valid andalso ok) jobs true) ());
    1.33 +    val _ = Multithreading.tracing 1 (fn () =>
    1.34 +      let
    1.35 +        val s = Task_Queue.str_of_task task;
    1.36 +        fun micros time = string_of_int (Time.toNanoseconds time div 1000);
    1.37 +        val (run, wait) = pairself micros (Task_Queue.timing_of_task task);
    1.38 +      in "TASK " ^ s ^ " " ^ run ^ " " ^ wait end);
    1.39      val _ = SYNCHRONIZED "finish" (fn () =>
    1.40        let
    1.41          val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
    1.42 @@ -448,10 +462,11 @@
    1.43    else if Multithreading.self_critical () then
    1.44      error "Cannot join future values within critical section"
    1.45    else
    1.46 -    (case worker_task () of
    1.47 -      SOME task => join_depend task (map task_of xs)
    1.48 -    | NONE => List.app (ignore o Single_Assignment.await o result_of) xs;
    1.49 -    map get_result xs);
    1.50 +    worker_waiting (fn () =>
    1.51 +      (case worker_task () of
    1.52 +        SOME task => join_depend task (map task_of xs)
    1.53 +      | NONE => List.app (ignore o Single_Assignment.await o result_of) xs;
    1.54 +      map get_result xs));
    1.55  
    1.56  end;
    1.57  
    1.58 @@ -513,7 +528,7 @@
    1.59                  Unsynchronized.change_result queue
    1.60                    (Task_Queue.dequeue_passive (Thread.self ()) task));
    1.61            in if still_passive then execute (task, group, [job]) else () end);
    1.62 -      val _ = Single_Assignment.await result;
    1.63 +      val _ = worker_waiting (fn () => Single_Assignment.await result);
    1.64      in () end;
    1.65  
    1.66  fun fulfill x res = fulfill_result x (Exn.Result res);