1.1 --- a/src/Pure/Concurrent/future.ML Mon Jan 31 15:28:48 2011 +0100
1.2 +++ b/src/Pure/Concurrent/future.ML Mon Jan 31 16:34:10 2011 +0100
1.3 @@ -38,6 +38,7 @@
1.4 val worker_task: unit -> Task_Queue.task option
1.5 val worker_group: unit -> Task_Queue.group option
1.6 val worker_subgroup: unit -> Task_Queue.group
1.7 + val worker_waiting: (unit -> 'a) -> 'a
1.8 type 'a future
1.9 val task_of: 'a future -> task
1.10 val group_of: 'a future -> group
1.11 @@ -87,6 +88,11 @@
1.12 val worker_group = Option.map #2 o thread_data;
1.13 fun worker_subgroup () = Task_Queue.new_group (worker_group ());
1.14
1.15 +fun worker_waiting e =
1.16 + (case worker_task () of
1.17 + NONE => e ()
1.18 + | SOME task => Task_Queue.waiting task e);
1.19 +
1.20
1.21 (* datatype future *)
1.22
1.23 @@ -198,8 +204,16 @@
1.24 fun execute (task, group, jobs) =
1.25 let
1.26 val valid = not (Task_Queue.is_canceled group);
1.27 - val ok = setmp_thread_data (task, group) (fn () =>
1.28 - fold (fn job => fn ok => job valid andalso ok) jobs true) ();
1.29 + val ok =
1.30 + Task_Queue.running task (fn () =>
1.31 + setmp_thread_data (task, group) (fn () =>
1.32 + fold (fn job => fn ok => job valid andalso ok) jobs true) ());
1.33 + val _ = Multithreading.tracing 1 (fn () =>
1.34 + let
1.35 + val s = Task_Queue.str_of_task task;
1.36 + fun micros time = string_of_int (Time.toNanoseconds time div 1000);
1.37 + val (run, wait) = pairself micros (Task_Queue.timing_of_task task);
1.38 + in "TASK " ^ s ^ " " ^ run ^ " " ^ wait end);
1.39 val _ = SYNCHRONIZED "finish" (fn () =>
1.40 let
1.41 val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
1.42 @@ -448,10 +462,11 @@
1.43 else if Multithreading.self_critical () then
1.44 error "Cannot join future values within critical section"
1.45 else
1.46 - (case worker_task () of
1.47 - SOME task => join_depend task (map task_of xs)
1.48 - | NONE => List.app (ignore o Single_Assignment.await o result_of) xs;
1.49 - map get_result xs);
1.50 + worker_waiting (fn () =>
1.51 + (case worker_task () of
1.52 + SOME task => join_depend task (map task_of xs)
1.53 + | NONE => List.app (ignore o Single_Assignment.await o result_of) xs;
1.54 + map get_result xs));
1.55
1.56 end;
1.57
1.58 @@ -513,7 +528,7 @@
1.59 Unsynchronized.change_result queue
1.60 (Task_Queue.dequeue_passive (Thread.self ()) task));
1.61 in if still_passive then execute (task, group, [job]) else () end);
1.62 - val _ = Single_Assignment.await result;
1.63 + val _ = worker_waiting (fn () => Single_Assignment.await result);
1.64 in () end;
1.65
1.66 fun fulfill x res = fulfill_result x (Exn.Result res);