src/Pure/Concurrent/task_queue.ML
changeset 28185 0f20cbce4935
parent 28184 5ed5cb73a2e9
child 28190 0a2434cf38c9
     1.1 --- a/src/Pure/Concurrent/task_queue.ML	Tue Sep 09 20:22:40 2008 +0200
     1.2 +++ b/src/Pure/Concurrent/task_queue.ML	Tue Sep 09 23:30:00 2008 +0200
     1.3 @@ -15,8 +15,10 @@
     1.4    type queue
     1.5    val empty: queue
     1.6    val enqueue: group -> task list -> (bool -> bool) -> queue -> task * queue
     1.7 -  val dequeue: Thread.thread -> queue -> (task * group * (unit -> bool)) option * queue
     1.8 -  val cancel: group -> queue -> Thread.thread list * queue
     1.9 +  val depend: task list -> task -> queue -> queue
    1.10 +  val dequeue: queue -> (task * group * (unit -> bool)) option * queue
    1.11 +  val dequeue_towards: task list -> queue -> (task * group * (unit -> bool)) option * queue
    1.12 +  val cancel: group -> queue -> bool * queue
    1.13    val finish: task -> queue -> queue
    1.14  end;
    1.15  
    1.16 @@ -42,9 +44,12 @@
    1.17  
    1.18  type jobs = (group * job) IntGraph.T;
    1.19  
    1.20 +fun defined_job (jobs: jobs) (Task id) = can (IntGraph.get_node jobs) id;
    1.21  fun get_group (jobs: jobs) (Task id) = #1 (IntGraph.get_node jobs id);
    1.22  fun get_job (jobs: jobs) (Task id) = #2 (IntGraph.get_node jobs id);
    1.23  fun map_job (Task id) f (jobs: jobs) = IntGraph.map_node id (apsnd f) jobs;
    1.24 +fun add_job (Task id) (Task dep) (jobs: jobs) =
    1.25 +  IntGraph.add_edge_acyclic (dep, id) jobs handle IntGraph.UNDEF _ => jobs;
    1.26  
    1.27  
    1.28  (* queue of grouped jobs *)
    1.29 @@ -57,31 +62,43 @@
    1.30  val empty = make_queue Inttab.empty IntGraph.empty;
    1.31  
    1.32  
    1.33 -(* queue operations *)
    1.34 +(* enqueue *)
    1.35  
    1.36  fun enqueue (group as Group gid) deps job (Queue {groups, jobs}) =
    1.37    let
    1.38      val id = serial ();
    1.39      val task = Task id;
    1.40      val groups' = Inttab.cons_list (gid, task) groups;
    1.41 -
    1.42 -    fun add_dep (Task dep) G = IntGraph.add_edge_acyclic (dep, id) G
    1.43 -      handle IntGraph.UNDEF _ => G;
    1.44 -    val jobs' = jobs |> IntGraph.new_node (id, (group, Job (true, job))) |> fold add_dep deps;
    1.45 +    val jobs' = jobs
    1.46 +      |> IntGraph.new_node (id, (group, Job (true, job))) |> fold (add_job task) deps;
    1.47    in (task, make_queue groups' jobs') end;
    1.48  
    1.49 -fun dequeue thread (queue as Queue {groups, jobs}) =
    1.50 +fun depend deps task (Queue {groups, jobs}) =
    1.51 +  make_queue groups (fold (add_job task) deps jobs);
    1.52 +
    1.53 +
    1.54 +(* dequeue *)
    1.55 +
    1.56 +fun dequeue_if P (queue as Queue {groups, jobs}) =
    1.57    let
    1.58 -    fun ready (id, ((group, Job (ok, job)), ([], _))) = SOME (Task id, group, (fn () => job ok))
    1.59 +    fun ready (id, ((group, Job (ok, job)), ([], _))) =
    1.60 +          if P id then SOME (Task id, group, (fn () => job ok)) else NONE
    1.61        | ready _ = NONE;
    1.62    in
    1.63      (case IntGraph.get_first ready jobs of
    1.64        NONE => (NONE, queue)
    1.65      | SOME result =>
    1.66 -        let val jobs' = map_job (#1 result) (K (Running thread)) jobs
    1.67 +        let val jobs' = map_job (#1 result) (K (Running (Thread.self ()))) jobs
    1.68          in (SOME result, make_queue groups jobs') end)
    1.69    end;
    1.70  
    1.71 +val dequeue = dequeue_if (K true);
    1.72 +
    1.73 +fun dequeue_towards tasks (queue as Queue {jobs, ...}) =
    1.74 +  let val ids = tasks
    1.75 +    |> map_filter (fn task as Task id => if defined_job jobs task then SOME id else NONE)
    1.76 +  in dequeue_if (member (op =) (IntGraph.all_preds jobs ids)) queue end;
    1.77 +
    1.78  
    1.79  (* termination *)
    1.80  
    1.81 @@ -93,7 +110,8 @@
    1.82          (case get_job jobs task of
    1.83            Job (true, job) => map_job task (K (Job (false, job)))
    1.84          | _ => I)) tasks jobs;
    1.85 -  in (running, make_queue groups jobs') end;
    1.86 +    val _ = List.app (fn thread => Thread.interrupt thread handle Thread _ => ()) running;
    1.87 +  in (null running, make_queue groups jobs') end;
    1.88  
    1.89  fun finish (task as Task id) (Queue {groups, jobs}) =
    1.90    let