eliminated cache, which complicates the code without making a real difference (NB: deque_towards often disrupts caching, and daisy-chaining of workers already reduces queue overhead);
1.1 --- a/src/Pure/Concurrent/future.ML Wed Jan 06 15:07:56 2010 +0100
1.2 +++ b/src/Pure/Concurrent/future.ML Wed Jan 06 18:14:16 2010 +0100
1.3 @@ -179,7 +179,7 @@
1.4 in (result, job) end;
1.5
1.6 fun cancel_now group = (*requires SYNCHRONIZED*)
1.7 - Unsynchronized.change_result queue (Task_Queue.cancel group);
1.8 + Task_Queue.cancel (! queue) group;
1.9
1.10 fun cancel_later group = (*requires SYNCHRONIZED*)
1.11 (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
1.12 @@ -351,7 +351,7 @@
1.13 in continue end
1.14 handle Exn.Interrupt =>
1.15 (Multithreading.tracing 1 (fn () => "Interrupt");
1.16 - List.app cancel_later (Unsynchronized.change_result queue Task_Queue.cancel_all);
1.17 + List.app cancel_later (Task_Queue.cancel_all (! queue));
1.18 broadcast_work (); true);
1.19
1.20 fun scheduler_loop () =
2.1 --- a/src/Pure/Concurrent/task_queue.ML Wed Jan 06 15:07:56 2010 +0100
2.2 +++ b/src/Pure/Concurrent/task_queue.ML Wed Jan 06 18:14:16 2010 +0100
2.3 @@ -22,8 +22,8 @@
2.4 val empty: queue
2.5 val all_passive: queue -> bool
2.6 val status: queue -> {ready: int, pending: int, running: int, passive: int}
2.7 - val cancel: group -> queue -> bool * queue
2.8 - val cancel_all: queue -> group list * queue
2.9 + val cancel: queue -> group -> bool
2.10 + val cancel_all: queue -> group list
2.11 val enqueue_passive: group -> queue -> task * queue
2.12 val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue
2.13 val extend: task -> (bool -> bool) -> queue -> queue option
2.14 @@ -118,16 +118,13 @@
2.15
2.16 (* queue of grouped jobs *)
2.17
2.18 -datatype result = Unknown | Result of task | No_Result;
2.19 -
2.20 datatype queue = Queue of
2.21 {groups: task list Inttab.table, (*groups with presently active members*)
2.22 - jobs: jobs, (*job dependency graph*)
2.23 - cache: result}; (*last dequeue result*)
2.24 + jobs: jobs}; (*job dependency graph*)
2.25
2.26 -fun make_queue groups jobs cache = Queue {groups = groups, jobs = jobs, cache = cache};
2.27 +fun make_queue groups jobs = Queue {groups = groups, jobs = jobs};
2.28
2.29 -val empty = make_queue Inttab.empty Task_Graph.empty No_Result;
2.30 +val empty = make_queue Inttab.empty Task_Graph.empty;
2.31
2.32 fun all_passive (Queue {jobs, ...}) =
2.33 Task_Graph.get_first NONE
2.34 @@ -150,16 +147,16 @@
2.35
2.36 (* cancel -- peers and sub-groups *)
2.37
2.38 -fun cancel group (Queue {groups, jobs, ...}) =
2.39 +fun cancel (Queue {groups, jobs}) group =
2.40 let
2.41 val _ = cancel_group group Exn.Interrupt;
2.42 val tasks = Inttab.lookup_list groups (group_id group);
2.43 val running =
2.44 fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];
2.45 val _ = List.app SimpleThread.interrupt running;
2.46 - in (null running, make_queue groups jobs Unknown) end;
2.47 + in null running end;
2.48
2.49 -fun cancel_all (Queue {groups, jobs, ...}) =
2.50 +fun cancel_all (Queue {groups, jobs}) =
2.51 let
2.52 fun cancel_job (group, job) (groups, running) =
2.53 (cancel_group group Exn.Interrupt;
2.54 @@ -168,20 +165,20 @@
2.55 | _ => (groups, running)));
2.56 val (running_groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
2.57 val _ = List.app SimpleThread.interrupt running;
2.58 - in (running_groups, make_queue groups jobs Unknown) end;
2.59 + in running_groups end;
2.60
2.61
2.62 (* enqueue *)
2.63
2.64 -fun enqueue_passive group (Queue {groups, jobs, cache}) =
2.65 +fun enqueue_passive group (Queue {groups, jobs}) =
2.66 let
2.67 val task = new_task NONE;
2.68 val groups' = groups
2.69 |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
2.70 val jobs' = jobs |> Task_Graph.new_node (task, (group, Passive));
2.71 - in (task, make_queue groups' jobs' cache) end;
2.72 + in (task, make_queue groups' jobs') end;
2.73
2.74 -fun enqueue group deps pri job (Queue {groups, jobs, cache}) =
2.75 +fun enqueue group deps pri job (Queue {groups, jobs}) =
2.76 let
2.77 val task = new_task (SOME pri);
2.78 val groups' = groups
2.79 @@ -191,49 +188,36 @@
2.80 |> fold (add_job task) deps
2.81 |> fold (fold (add_job task) o get_deps jobs) deps;
2.82 val minimal = null (get_deps jobs' task);
2.83 - val cache' =
2.84 - (case cache of
2.85 - Result last =>
2.86 - if task_ord (last, task) = LESS
2.87 - then cache else Unknown
2.88 - | _ => Unknown);
2.89 - in ((task, minimal), make_queue groups' jobs' cache') end;
2.90 + in ((task, minimal), make_queue groups' jobs') end;
2.91
2.92 -fun extend task job (Queue {groups, jobs, cache}) =
2.93 +fun extend task job (Queue {groups, jobs}) =
2.94 (case try (get_job jobs) task of
2.95 - SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs) cache)
2.96 + SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs))
2.97 | _ => NONE);
2.98
2.99
2.100 (* dequeue *)
2.101
2.102 -fun dequeue thread (queue as Queue {groups, jobs, cache}) =
2.103 +fun dequeue thread (queue as Queue {groups, jobs}) =
2.104 let
2.105 fun ready (task, ((group, Job list), (deps, _))) =
2.106 if is_ready deps group then SOME (task, group, rev list) else NONE
2.107 | ready _ = NONE;
2.108 - fun deq boundary =
2.109 - (case Task_Graph.get_first boundary ready jobs of
2.110 - NONE => (NONE, make_queue groups jobs No_Result)
2.111 - | SOME (result as (task, _, _)) =>
2.112 - let
2.113 - val jobs' = set_job task (Running thread) jobs;
2.114 - val cache' = Result task;
2.115 - in (SOME result, make_queue groups jobs' cache') end);
2.116 in
2.117 - (case cache of
2.118 - Unknown => deq NONE
2.119 - | Result last => deq (SOME last)
2.120 - | No_Result => (NONE, queue))
2.121 + (case Task_Graph.get_first NONE ready jobs of
2.122 + NONE => (NONE, queue)
2.123 + | SOME (result as (task, _, _)) =>
2.124 + let val jobs' = set_job task (Running thread) jobs
2.125 + in (SOME result, make_queue groups jobs') end)
2.126 end;
2.127
2.128
2.129 (* dequeue_towards -- adhoc dependencies *)
2.130
2.131 -fun depend task deps (Queue {groups, jobs, ...}) =
2.132 - make_queue groups (fold (add_dep task) deps jobs) Unknown;
2.133 +fun depend task deps (Queue {groups, jobs}) =
2.134 + make_queue groups (fold (add_dep task) deps jobs);
2.135
2.136 -fun dequeue_towards thread deps (queue as Queue {groups, jobs, ...}) =
2.137 +fun dequeue_towards thread deps (queue as Queue {groups, jobs}) =
2.138 let
2.139 fun ready task =
2.140 (case Task_Graph.get_node jobs task of
2.141 @@ -244,10 +228,8 @@
2.142 | _ => NONE);
2.143 val tasks = filter (can (Task_Graph.get_node jobs)) deps;
2.144 fun result (res as (task, _, _)) =
2.145 - let
2.146 - val jobs' = set_job task (Running thread) jobs;
2.147 - val cache' = Unknown;
2.148 - in ((SOME res, tasks), make_queue groups jobs' cache') end;
2.149 + let val jobs' = set_job task (Running thread) jobs
2.150 + in ((SOME res, tasks), make_queue groups jobs') end;
2.151 in
2.152 (case get_first ready tasks of
2.153 SOME res => result res
2.154 @@ -260,14 +242,13 @@
2.155
2.156 (* finish *)
2.157
2.158 -fun finish task (Queue {groups, jobs, cache}) =
2.159 +fun finish task (Queue {groups, jobs}) =
2.160 let
2.161 val group = get_group jobs task;
2.162 val groups' = groups
2.163 |> fold (fn gid => Inttab.remove_list (op =) (gid, task)) (group_ancestry group);
2.164 val jobs' = Task_Graph.del_node task jobs;
2.165 val maximal = null (Task_Graph.imm_succs jobs task);
2.166 - val cache' = if maximal then cache else Unknown;
2.167 - in (maximal, make_queue groups' jobs' cache') end;
2.168 + in (maximal, make_queue groups' jobs') end;
2.169
2.170 end;