src/Pure/Concurrent/task_queue.ML
author wenzelm
Tue, 28 Jul 2009 14:35:27 +0200
changeset 32253 3e48bf962e05
parent 32224 a46f5e9b1718
child 32254 3c28e4e578ad
permissions -rw-r--r--
Task_Queue.dequeue: explicit thread;
     1 (*  Title:      Pure/Concurrent/task_queue.ML
     2     Author:     Makarius
     3 
     4 Ordered queue of grouped tasks.
     5 *)
     6 
     7 signature TASK_QUEUE =
     8 sig
     9   type task
    10   val new_task: int -> task
    11   val pri_of_task: task -> int
    12   val str_of_task: task -> string
    13   type group
    14   val new_group: group option -> group
    15   val group_id: group -> int
    16   val eq_group: group * group -> bool
    17   val cancel_group: group -> exn -> unit
    18   val is_canceled: group -> bool
    19   val group_status: group -> exn list
    20   val str_of_group: group -> string
    21   type queue
    22   val empty: queue
    23   val is_empty: queue -> bool
    24   val status: queue -> {ready: int, pending: int, running: int}
    25   val cancel: queue -> group -> bool
    26   val cancel_all: queue -> group list
    27   val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue
    28   val extend: task -> (bool -> bool) -> queue -> queue option
    29   val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue
    30   val dequeue_towards: Thread.thread -> task list -> queue ->
    31     (((task * group * (bool -> bool) list) option * task list) * queue)
    32   val finish: task -> queue -> bool * queue
    33 end;
    34 
    35 structure Task_Queue:> TASK_QUEUE =
    36 struct
    37 
    38 (* tasks *)
    39 
    40 datatype task = Task of int * serial;
    41 fun new_task pri = Task (pri, serial ());
    42 
    43 fun pri_of_task (Task (pri, _)) = pri;
    44 fun str_of_task (Task (_, i)) = string_of_int i;
    45 
    46 fun task_ord (Task t1, Task t2) = prod_ord (rev_order o int_ord) int_ord (t1, t2);
    47 structure Task_Graph = Graph(type key = task val ord = task_ord);
    48 
    49 
    50 (* nested groups *)
    51 
    52 datatype group = Group of
    53  {parent: group option,
    54   id: serial,
    55   status: exn list ref};
    56 
    57 fun make_group (parent, id, status) = Group {parent = parent, id = id, status = status};
    58 
    59 fun new_group parent = make_group (parent, serial (), ref []);
    60 
    61 fun group_id (Group {id, ...}) = id;
    62 fun eq_group (group1, group2) = group_id group1 = group_id group2;
    63 
    64 fun group_ancestry (Group {parent, id, ...}) =
    65   id :: (case parent of NONE => [] | SOME group => group_ancestry group);
    66 
    67 
    68 (* group status *)
    69 
    70 fun cancel_group (Group {status, ...}) exn = CRITICAL (fn () =>
    71   (case exn of
    72     Exn.Interrupt => if null (! status) then status := [exn] else ()
    73   | _ => change status (cons exn)));
    74 
    75 fun is_canceled (Group {parent, status, ...}) = (*non-critical*)
    76   not (null (! status)) orelse (case parent of NONE => false | SOME group => is_canceled group);
    77 
    78 fun group_status (Group {parent, status, ...}) = (*non-critical*)
    79   ! status @ (case parent of NONE => [] | SOME group => group_status group);
    80 
    81 fun str_of_group group =
    82   (is_canceled group ? enclose "(" ")") (string_of_int (group_id group));
    83 
    84 
    85 (* jobs *)
    86 
    87 datatype job =
    88   Job of (bool -> bool) list |
    89   Running of Thread.thread;
    90 
    91 type jobs = (group * job) Task_Graph.T;
    92 
    93 fun get_group (jobs: jobs) task = #1 (Task_Graph.get_node jobs task);
    94 fun get_job (jobs: jobs) task = #2 (Task_Graph.get_node jobs task);
    95 fun set_job task job (jobs: jobs) = Task_Graph.map_node task (fn (group, _) => (group, job)) jobs;
    96 
    97 
    98 (* queue of grouped jobs *)
    99 
   100 datatype result = Unknown | Result of task | No_Result;
   101 
   102 datatype queue = Queue of
   103  {groups: task list Inttab.table,   (*groups with presently active members*)
   104   jobs: jobs,                       (*job dependency graph*)
   105   cache: result};                   (*last dequeue result*)
   106 
   107 fun make_queue groups jobs cache = Queue {groups = groups, jobs = jobs, cache = cache};
   108 
   109 val empty = make_queue Inttab.empty Task_Graph.empty No_Result;
   110 fun is_empty (Queue {jobs, ...}) = Task_Graph.is_empty jobs;
   111 
   112 
   113 (* queue status *)
   114 
   115 fun status (Queue {jobs, ...}) =
   116   let
   117     val (x, y, z) =
   118       Task_Graph.fold (fn (task, ((_, job), (deps, _))) => fn (x, y, z) =>
   119           (case job of
   120             Job _ => if null deps then (x + 1, y, z) else (x, y + 1, z)
   121           | Running _ => (x, y, z + 1)))
   122         jobs (0, 0, 0);
   123   in {ready = x, pending = y, running = z} end;
   124 
   125 
   126 (* cancel -- peers and sub-groups *)
   127 
   128 fun cancel (Queue {groups, jobs, ...}) group =
   129   let
   130     val _ = cancel_group group Exn.Interrupt;
   131     val tasks = Inttab.lookup_list groups (group_id group);
   132     val running =
   133       fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];
   134     val _ = List.app SimpleThread.interrupt running;
   135   in null running end;
   136 
   137 fun cancel_all (Queue {jobs, ...}) =
   138   let
   139     fun cancel_job (group, job) (groups, running) =
   140       (cancel_group group Exn.Interrupt;
   141         (case job of Running t => (insert eq_group group groups, insert Thread.equal t running)
   142         | _ => (groups, running)));
   143     val (groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
   144     val _ = List.app SimpleThread.interrupt running;
   145   in groups end;
   146 
   147 
   148 (* enqueue *)
   149 
   150 fun add_job task dep (jobs: jobs) =
   151   Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
   152 
   153 fun get_deps (jobs: jobs) task =
   154   Task_Graph.imm_preds jobs task handle Task_Graph.UNDEF _ => [];
   155 
   156 fun enqueue group deps pri job (Queue {groups, jobs, cache}) =
   157   let
   158     val task = new_task pri;
   159     val groups' = groups
   160       |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
   161     val jobs' = jobs
   162       |> Task_Graph.new_node (task, (group, Job [job]))
   163       |> fold (add_job task) deps
   164       |> fold (fold (add_job task) o get_deps jobs) deps;
   165     val minimal = null (get_deps jobs' task);
   166     val cache' =
   167       (case cache of
   168         Result last =>
   169           if task_ord (last, task) = LESS
   170           then cache else Unknown
   171       | _ => Unknown);
   172   in ((task, minimal), make_queue groups' jobs' cache') end;
   173 
   174 fun extend task job (Queue {groups, jobs, cache}) =
   175   (case try (get_job jobs) task of
   176     SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs) cache)
   177   | _ => NONE);
   178 
   179 
   180 (* dequeue *)
   181 
   182 fun dequeue thread (queue as Queue {groups, jobs, cache}) =
   183   let
   184     fun ready (task, ((group, Job list), ([], _))) = SOME (task, group, rev list)
   185       | ready _ = NONE;
   186     fun deq boundary =
   187       (case Task_Graph.get_first boundary ready jobs of
   188         NONE => (NONE, make_queue groups jobs No_Result)
   189       | SOME (result as (task, _, _)) =>
   190           let
   191             val jobs' = set_job task (Running thread) jobs;
   192             val cache' = Result task;
   193           in (SOME result, make_queue groups jobs' cache') end);
   194   in
   195     (case cache of
   196       Unknown => deq NONE
   197     | Result last => deq (SOME last)
   198     | No_Result => (NONE, queue))
   199   end;
   200 
   201 
   202 (* dequeue_towards -- adhoc dependencies *)
   203 
   204 fun dequeue_towards thread deps (queue as Queue {groups, jobs, ...}) =
   205   let
   206     fun ready task =
   207       (case Task_Graph.get_node jobs task of
   208         (group, Job list) =>
   209           if null (Task_Graph.imm_preds jobs task)
   210           then SOME (task, group, rev list)
   211           else NONE
   212       | _ => NONE);
   213     val tasks = filter (can (Task_Graph.get_node jobs)) deps;
   214     fun result (res as (task, _, _)) =
   215       let
   216         val jobs' = set_job task (Running thread) jobs;
   217         val cache' = Unknown;
   218       in ((SOME res, tasks), make_queue groups jobs' cache') end;
   219   in
   220     (case get_first ready tasks of
   221       SOME res => result res
   222     | NONE =>
   223         (case get_first (get_first ready o Task_Graph.imm_preds jobs) tasks of
   224           SOME res => result res
   225         | NONE => ((NONE, tasks), queue)))
   226   end;
   227 
   228 
   229 (* finish *)
   230 
   231 fun finish task (Queue {groups, jobs, cache}) =
   232   let
   233     val group = get_group jobs task;
   234     val groups' = groups
   235       |> fold (fn gid => Inttab.remove_list (op =) (gid, task)) (group_ancestry group);
   236     val jobs' = Task_Graph.del_node task jobs;
   237     val maximal = null (Task_Graph.imm_succs jobs task);
   238     val cache' = if maximal then cache else Unknown;
   239   in (maximal, make_queue groups' jobs' cache') end;
   240 
   241 end;