1.1 --- a/src/Pure/Concurrent/task_queue.ML Tue Sep 09 20:22:40 2008 +0200
1.2 +++ b/src/Pure/Concurrent/task_queue.ML Tue Sep 09 23:30:00 2008 +0200
1.3 @@ -15,8 +15,10 @@
1.4 type queue
1.5 val empty: queue
1.6 val enqueue: group -> task list -> (bool -> bool) -> queue -> task * queue
1.7 - val dequeue: Thread.thread -> queue -> (task * group * (unit -> bool)) option * queue
1.8 - val cancel: group -> queue -> Thread.thread list * queue
1.9 + val depend: task list -> task -> queue -> queue
1.10 + val dequeue: queue -> (task * group * (unit -> bool)) option * queue
1.11 + val dequeue_towards: task list -> queue -> (task * group * (unit -> bool)) option * queue
1.12 + val cancel: group -> queue -> bool * queue
1.13 val finish: task -> queue -> queue
1.14 end;
1.15
1.16 @@ -42,9 +44,12 @@
1.17
1.18 type jobs = (group * job) IntGraph.T;
1.19
1.20 +fun defined_job (jobs: jobs) (Task id) = can (IntGraph.get_node jobs) id;
1.21 fun get_group (jobs: jobs) (Task id) = #1 (IntGraph.get_node jobs id);
1.22 fun get_job (jobs: jobs) (Task id) = #2 (IntGraph.get_node jobs id);
1.23 fun map_job (Task id) f (jobs: jobs) = IntGraph.map_node id (apsnd f) jobs;
1.24 +fun add_job (Task id) (Task dep) (jobs: jobs) =
1.25 + IntGraph.add_edge_acyclic (dep, id) jobs handle IntGraph.UNDEF _ => jobs;
1.26
1.27
1.28 (* queue of grouped jobs *)
1.29 @@ -57,31 +62,43 @@
1.30 val empty = make_queue Inttab.empty IntGraph.empty;
1.31
1.32
1.33 -(* queue operations *)
1.34 +(* enqueue *)
1.35
1.36 fun enqueue (group as Group gid) deps job (Queue {groups, jobs}) =
1.37 let
1.38 val id = serial ();
1.39 val task = Task id;
1.40 val groups' = Inttab.cons_list (gid, task) groups;
1.41 -
1.42 - fun add_dep (Task dep) G = IntGraph.add_edge_acyclic (dep, id) G
1.43 - handle IntGraph.UNDEF _ => G;
1.44 - val jobs' = jobs |> IntGraph.new_node (id, (group, Job (true, job))) |> fold add_dep deps;
1.45 + val jobs' = jobs
1.46 + |> IntGraph.new_node (id, (group, Job (true, job))) |> fold (add_job task) deps;
1.47 in (task, make_queue groups' jobs') end;
1.48
1.49 -fun dequeue thread (queue as Queue {groups, jobs}) =
1.50 +fun depend deps task (Queue {groups, jobs}) =
1.51 + make_queue groups (fold (add_job task) deps jobs);
1.52 +
1.53 +
1.54 +(* dequeue *)
1.55 +
1.56 +fun dequeue_if P (queue as Queue {groups, jobs}) =
1.57 let
1.58 - fun ready (id, ((group, Job (ok, job)), ([], _))) = SOME (Task id, group, (fn () => job ok))
1.59 + fun ready (id, ((group, Job (ok, job)), ([], _))) =
1.60 + if P id then SOME (Task id, group, (fn () => job ok)) else NONE
1.61 | ready _ = NONE;
1.62 in
1.63 (case IntGraph.get_first ready jobs of
1.64 NONE => (NONE, queue)
1.65 | SOME result =>
1.66 - let val jobs' = map_job (#1 result) (K (Running thread)) jobs
1.67 + let val jobs' = map_job (#1 result) (K (Running (Thread.self ()))) jobs
1.68 in (SOME result, make_queue groups jobs') end)
1.69 end;
1.70
1.71 +val dequeue = dequeue_if (K true);
1.72 +
1.73 +fun dequeue_towards tasks (queue as Queue {jobs, ...}) =
1.74 + let val ids = tasks
1.75 + |> map_filter (fn task as Task id => if defined_job jobs task then SOME id else NONE)
1.76 + in dequeue_if (member (op =) (IntGraph.all_preds jobs ids)) queue end;
1.77 +
1.78
1.79 (* termination *)
1.80
1.81 @@ -93,7 +110,8 @@
1.82 (case get_job jobs task of
1.83 Job (true, job) => map_job task (K (Job (false, job)))
1.84 | _ => I)) tasks jobs;
1.85 - in (running, make_queue groups jobs') end;
1.86 + val _ = List.app (fn thread => Thread.interrupt thread handle Thread _ => ()) running;
1.87 + in (null running, make_queue groups jobs') end;
1.88
1.89 fun finish (task as Task id) (Queue {groups, jobs}) =
1.90 let