1.1 --- a/src/Pure/Concurrent/future.ML Wed Feb 02 18:22:13 2011 +0100
1.2 +++ b/src/Pure/Concurrent/future.ML Wed Feb 02 20:32:50 2011 +0100
1.3 @@ -440,12 +440,12 @@
1.4 else res);
1.5
1.6 fun join_next deps = (*requires SYNCHRONIZED*)
1.7 - if Task_Queue.finished_deps deps then NONE
1.8 + if null deps then NONE
1.9 else
1.10 (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
1.11 - (NONE, deps') =>
1.12 - if Task_Queue.finished_deps deps' then NONE
1.13 - else (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
1.14 + (NONE, []) => NONE
1.15 + | (NONE, deps') =>
1.16 + (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
1.17 | (SOME work, deps') => SOME (work, deps'));
1.18
1.19 fun execute_work NONE = ()
1.20 @@ -461,8 +461,7 @@
1.21 if forall is_finished xs then ()
1.22 else if Multithreading.self_critical () then
1.23 error "Cannot join future values within critical section"
1.24 - else if is_some (worker_task ()) then
1.25 - join_work (Task_Queue.init_deps (map task_of xs))
1.26 + else if is_some (worker_task ()) then join_work (map task_of xs)
1.27 else List.app (ignore o Single_Assignment.await o result_of) xs;
1.28 in map get_result xs end;
1.29
1.30 @@ -533,8 +532,8 @@
1.31 (Task_Queue.dequeue_passive (Thread.self ()) task));
1.32 in if still_passive then execute (task, [job]) else () end);
1.33 val _ =
1.34 - worker_waiting (Task_Queue.init_deps [task])
1.35 - (fn () => Single_Assignment.await result);
1.36 + if is_some (Single_Assignment.peek result) then ()
1.37 + else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
1.38 in () end;
1.39
1.40 fun fulfill x res = fulfill_result x (Exn.Result res);
2.1 --- a/src/Pure/Concurrent/task_queue.ML Wed Feb 02 18:22:13 2011 +0100
2.2 +++ b/src/Pure/Concurrent/task_queue.ML Wed Feb 02 20:32:50 2011 +0100
2.3 @@ -21,6 +21,9 @@
2.4 val pri_of_task: task -> int
2.5 val str_of_task: task -> string
2.6 val timing_of_task: task -> Time.time * Time.time * string list
2.7 + val running: task -> (unit -> 'a) -> 'a
2.8 + val joining: task -> (unit -> 'a) -> 'a
2.9 + val waiting: task -> task list -> (unit -> 'a) -> 'a
2.10 type queue
2.11 val empty: queue
2.12 val all_passive: queue -> bool
2.13 @@ -34,14 +37,8 @@
2.14 val extend: task -> (bool -> bool) -> queue -> queue option
2.15 val dequeue_passive: Thread.thread -> task -> queue -> bool * queue
2.16 val dequeue: Thread.thread -> queue -> (task * (bool -> bool) list) option * queue
2.17 - type deps
2.18 - val init_deps: task list -> deps
2.19 - val finished_deps: deps -> bool
2.20 - val dequeue_deps: Thread.thread -> deps -> queue ->
2.21 - (((task * (bool -> bool) list) option * deps) * queue)
2.22 - val running: task -> (unit -> 'a) -> 'a
2.23 - val joining: task -> (unit -> 'a) -> 'a
2.24 - val waiting: task -> deps -> (unit -> 'a) -> 'a
2.25 + val dequeue_deps: Thread.thread -> task list -> queue ->
2.26 + (((task * (bool -> bool) list) option * task list) * queue)
2.27 end;
2.28
2.29 structure Task_Queue: TASK_QUEUE =
2.30 @@ -140,6 +137,19 @@
2.31 structure Task_Graph = Graph(type key = task val ord = task_ord);
2.32
2.33
2.34 +(* timing *)
2.35 +
2.36 +fun running task =
2.37 + update_timing (fn t => fn (a, b, ds) => (Time.+ (a, t), b, ds)) task;
2.38 +
2.39 +fun joining task =
2.40 + update_timing (fn t => fn (a, b, ds) => (Time.- (a, t), b, ds)) task;
2.41 +
2.42 +fun waiting task deps =
2.43 + update_timing (fn t => fn (a, b, ds) =>
2.44 + (Time.- (a, t), Time.+ (b, t), fold (insert (op =) o name_of_task) deps ds)) task;
2.45 +
2.46 +
2.47
2.48 (** queue of jobs and groups **)
2.49
2.50 @@ -165,7 +175,7 @@
2.51 (* queue *)
2.52
2.53 datatype queue = Queue of
2.54 - {groups: task list Inttab.table, (*groups with presently known members*)
2.55 + {groups: task list Inttab.table, (*presently known group members*)
2.56 jobs: jobs}; (*job dependency graph*)
2.57
2.58 fun make_queue groups jobs = Queue {groups = groups, jobs = jobs};
2.59 @@ -291,15 +301,7 @@
2.60
2.61 (* dequeue wrt. dynamic dependencies *)
2.62
2.63 -abstype deps = Deps of task list
2.64 -with
2.65 -
2.66 -fun init_deps tasks = Deps tasks;
2.67 -fun finished_deps (Deps tasks) = null tasks;
2.68 -
2.69 -fun insert_deps (Deps tasks) = fold (insert (op =) o name_of_task) tasks;
2.70 -
2.71 -fun dequeue_deps thread (Deps deps) (queue as Queue {groups, jobs}) =
2.72 +fun dequeue_deps thread deps (queue as Queue {groups, jobs}) =
2.73 let
2.74 fun ready [] rest = (NONE, rev rest)
2.75 | ready (task :: tasks) rest =
2.76 @@ -322,28 +324,14 @@
2.77
2.78 fun result (res as (task, _)) deps' =
2.79 let val jobs' = set_job task (Running thread) jobs
2.80 - in ((SOME res, Deps deps'), make_queue groups jobs') end;
2.81 + in ((SOME res, deps'), make_queue groups jobs') end;
2.82 in
2.83 (case ready deps [] of
2.84 (SOME res, deps') => result res deps'
2.85 | (NONE, deps') =>
2.86 (case ready_dep [] deps' of
2.87 SOME res => result res deps'
2.88 - | NONE => ((NONE, Deps deps'), queue)))
2.89 + | NONE => ((NONE, deps'), queue)))
2.90 end;
2.91
2.92 end;
2.93 -
2.94 -
2.95 -(* timing *)
2.96 -
2.97 -fun running task =
2.98 - update_timing (fn t => fn (a, b, ds) => (Time.+ (a, t), b, ds)) task;
2.99 -
2.100 -fun joining task =
2.101 - update_timing (fn t => fn (a, b, ds) => (Time.- (a, t), b, ds)) task;
2.102 -
2.103 -fun waiting task deps =
2.104 - update_timing (fn t => fn (a, b, ds) => (Time.- (a, t), Time.+ (b, t), insert_deps deps ds)) task;
2.105 -
2.106 -end;