src/Pure/Concurrent/task_queue.ML
author wenzelm
Wed, 06 Jan 2010 15:07:56 +0100
changeset 34279 02936e77a07c
parent 34277 7325a5e3587f
child 34280 16bf3e9786a3
permissions -rw-r--r--
tasks of canceled groups are considered "ready" -- enables to purge the queue from tasks depending on unfinished promises (also improves general reactivity);
shutdown: back to synchronous wait, which means no asynchronous interrupts within the loop;
     1 (*  Title:      Pure/Concurrent/task_queue.ML
     2     Author:     Makarius
     3 
     4 Ordered queue of grouped tasks.
     5 *)
     6 
     7 signature TASK_QUEUE =
     8 sig
     9   type task
    10   val dummy_task: task
    11   val pri_of_task: task -> int
    12   val str_of_task: task -> string
    13   type group
    14   val new_group: group option -> group
    15   val group_id: group -> int
    16   val eq_group: group * group -> bool
    17   val cancel_group: group -> exn -> unit
    18   val is_canceled: group -> bool
    19   val group_status: group -> exn list
    20   val str_of_group: group -> string
    21   type queue
    22   val empty: queue
    23   val all_passive: queue -> bool
    24   val status: queue -> {ready: int, pending: int, running: int, passive: int}
    25   val cancel: group -> queue -> bool * queue
    26   val cancel_all: queue -> group list * queue
    27   val enqueue_passive: group -> queue -> task * queue
    28   val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue
    29   val extend: task -> (bool -> bool) -> queue -> queue option
    30   val dequeue: Thread.thread -> queue -> (task * group * (bool -> bool) list) option * queue
    31   val depend: task -> task list -> queue -> queue
    32   val dequeue_towards: Thread.thread -> task list -> queue ->
    33     (((task * group * (bool -> bool) list) option * task list) * queue)
    34   val finish: task -> queue -> bool * queue
    35 end;
    36 
    37 structure Task_Queue:> TASK_QUEUE =
    38 struct
    39 
    40 (* tasks *)
    41 
    42 datatype task = Task of int option * serial;
    43 val dummy_task = Task (NONE, ~1);
    44 fun new_task pri = Task (pri, serial ());
    45 
    46 fun pri_of_task (Task (pri, _)) = the_default 0 pri;
    47 fun str_of_task (Task (_, i)) = string_of_int i;
    48 
    49 fun task_ord (Task t1, Task t2) = prod_ord (rev_order o option_ord int_ord) int_ord (t1, t2);
    50 structure Task_Graph = Graph(type key = task val ord = task_ord);
    51 
    52 
    53 (* nested groups *)
    54 
    55 datatype group = Group of
    56  {parent: group option,
    57   id: serial,
    58   status: exn list Synchronized.var};
    59 
    60 fun make_group (parent, id, status) = Group {parent = parent, id = id, status = status};
    61 
    62 fun new_group parent = make_group (parent, serial (), Synchronized.var "group" []);
    63 
    64 fun group_id (Group {id, ...}) = id;
    65 fun eq_group (group1, group2) = group_id group1 = group_id group2;
    66 
    67 fun group_ancestry (Group {parent, id, ...}) =
    68   id :: (case parent of NONE => [] | SOME group => group_ancestry group);
    69 
    70 
    71 (* group status *)
    72 
    73 fun cancel_group (Group {status, ...}) exn =
    74   Synchronized.change status
    75     (fn exns =>
    76       (case exn of
    77         Exn.Interrupt => if null exns then [exn] else exns
    78       | _ => exn :: exns));
    79 
    80 fun is_canceled (Group {parent, status, ...}) =
    81   not (null (Synchronized.value status)) orelse
    82     (case parent of NONE => false | SOME group => is_canceled group);
    83 
    84 fun is_ready deps group = null deps orelse is_canceled group;
    85 
    86 fun group_status (Group {parent, status, ...}) =
    87   Synchronized.value status @
    88     (case parent of NONE => [] | SOME group => group_status group);
    89 
    90 fun str_of_group group =
    91   (is_canceled group ? enclose "(" ")") (string_of_int (group_id group));
    92 
    93 
    94 (* jobs *)
    95 
    96 datatype job =
    97   Job of (bool -> bool) list |
    98   Running of Thread.thread |
    99   Passive;
   100 
   101 type jobs = (group * job) Task_Graph.T;
   102 
   103 fun get_group (jobs: jobs) task = #1 (Task_Graph.get_node jobs task);
   104 fun get_job (jobs: jobs) task = #2 (Task_Graph.get_node jobs task);
   105 fun set_job task job (jobs: jobs) = Task_Graph.map_node task (fn (group, _) => (group, job)) jobs;
   106 
   107 fun add_job task dep (jobs: jobs) =
   108   Task_Graph.add_edge (dep, task) jobs handle Task_Graph.UNDEF _ => jobs;
   109 
   110 fun add_dep task dep (jobs: jobs) =
   111   if Task_Graph.is_edge jobs (task, dep) then
   112     raise Fail "Cyclic dependency of future tasks"
   113   else add_job task dep jobs;
   114 
   115 fun get_deps (jobs: jobs) task =
   116   Task_Graph.imm_preds jobs task handle Task_Graph.UNDEF _ => [];
   117 
   118 
   119 (* queue of grouped jobs *)
   120 
   121 datatype result = Unknown | Result of task | No_Result;
   122 
   123 datatype queue = Queue of
   124  {groups: task list Inttab.table,   (*groups with presently active members*)
   125   jobs: jobs,                       (*job dependency graph*)
   126   cache: result};                   (*last dequeue result*)
   127 
   128 fun make_queue groups jobs cache = Queue {groups = groups, jobs = jobs, cache = cache};
   129 
   130 val empty = make_queue Inttab.empty Task_Graph.empty No_Result;
   131 
   132 fun all_passive (Queue {jobs, ...}) =
   133   Task_Graph.get_first NONE
   134     ((fn Job _ => SOME () | Running _ => SOME () | Passive => NONE) o #2 o #1 o #2) jobs |> is_none;
   135 
   136 
   137 (* queue status *)
   138 
   139 fun status (Queue {jobs, ...}) =
   140   let
   141     val (x, y, z, w) =
   142       Task_Graph.fold (fn (_, ((group, job), (deps, _))) => fn (x, y, z, w) =>
   143           (case job of
   144             Job _ => if is_ready deps group then (x + 1, y, z, w) else (x, y + 1, z, w)
   145           | Running _ => (x, y, z + 1, w)
   146           | Passive => (x, y, z, w + 1)))
   147         jobs (0, 0, 0, 0);
   148   in {ready = x, pending = y, running = z, passive = w} end;
   149 
   150 
   151 (* cancel -- peers and sub-groups *)
   152 
   153 fun cancel group (Queue {groups, jobs, ...}) =
   154   let
   155     val _ = cancel_group group Exn.Interrupt;
   156     val tasks = Inttab.lookup_list groups (group_id group);
   157     val running =
   158       fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];
   159     val _ = List.app SimpleThread.interrupt running;
   160   in (null running, make_queue groups jobs Unknown) end;
   161 
   162 fun cancel_all (Queue {groups, jobs, ...}) =
   163   let
   164     fun cancel_job (group, job) (groups, running) =
   165       (cancel_group group Exn.Interrupt;
   166         (case job of
   167           Running t => (insert eq_group group groups, insert Thread.equal t running)
   168         | _ => (groups, running)));
   169     val (running_groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
   170     val _ = List.app SimpleThread.interrupt running;
   171   in (running_groups, make_queue groups jobs Unknown) end;
   172 
   173 
   174 (* enqueue *)
   175 
   176 fun enqueue_passive group (Queue {groups, jobs, cache}) =
   177   let
   178     val task = new_task NONE;
   179     val groups' = groups
   180       |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
   181     val jobs' = jobs |> Task_Graph.new_node (task, (group, Passive));
   182   in (task, make_queue groups' jobs' cache) end;
   183 
   184 fun enqueue group deps pri job (Queue {groups, jobs, cache}) =
   185   let
   186     val task = new_task (SOME pri);
   187     val groups' = groups
   188       |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
   189     val jobs' = jobs
   190       |> Task_Graph.new_node (task, (group, Job [job]))
   191       |> fold (add_job task) deps
   192       |> fold (fold (add_job task) o get_deps jobs) deps;
   193     val minimal = null (get_deps jobs' task);
   194     val cache' =
   195       (case cache of
   196         Result last =>
   197           if task_ord (last, task) = LESS
   198           then cache else Unknown
   199       | _ => Unknown);
   200   in ((task, minimal), make_queue groups' jobs' cache') end;
   201 
   202 fun extend task job (Queue {groups, jobs, cache}) =
   203   (case try (get_job jobs) task of
   204     SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs) cache)
   205   | _ => NONE);
   206 
   207 
   208 (* dequeue *)
   209 
   210 fun dequeue thread (queue as Queue {groups, jobs, cache}) =
   211   let
   212     fun ready (task, ((group, Job list), (deps, _))) =
   213           if is_ready deps group then SOME (task, group, rev list) else NONE
   214       | ready _ = NONE;
   215     fun deq boundary =
   216       (case Task_Graph.get_first boundary ready jobs of
   217         NONE => (NONE, make_queue groups jobs No_Result)
   218       | SOME (result as (task, _, _)) =>
   219           let
   220             val jobs' = set_job task (Running thread) jobs;
   221             val cache' = Result task;
   222           in (SOME result, make_queue groups jobs' cache') end);
   223   in
   224     (case cache of
   225       Unknown => deq NONE
   226     | Result last => deq (SOME last)
   227     | No_Result => (NONE, queue))
   228   end;
   229 
   230 
   231 (* dequeue_towards -- adhoc dependencies *)
   232 
   233 fun depend task deps (Queue {groups, jobs, ...}) =
   234   make_queue groups (fold (add_dep task) deps jobs) Unknown;
   235 
   236 fun dequeue_towards thread deps (queue as Queue {groups, jobs, ...}) =
   237   let
   238     fun ready task =
   239       (case Task_Graph.get_node jobs task of
   240         (group, Job list) =>
   241           if is_ready (get_deps jobs task) group
   242           then SOME (task, group, rev list)
   243           else NONE
   244       | _ => NONE);
   245     val tasks = filter (can (Task_Graph.get_node jobs)) deps;
   246     fun result (res as (task, _, _)) =
   247       let
   248         val jobs' = set_job task (Running thread) jobs;
   249         val cache' = Unknown;
   250       in ((SOME res, tasks), make_queue groups jobs' cache') end;
   251   in
   252     (case get_first ready tasks of
   253       SOME res => result res
   254     | NONE =>
   255         (case get_first (get_first ready o get_deps jobs) tasks of
   256           SOME res => result res
   257         | NONE => ((NONE, tasks), queue)))
   258   end;
   259 
   260 
   261 (* finish *)
   262 
   263 fun finish task (Queue {groups, jobs, cache}) =
   264   let
   265     val group = get_group jobs task;
   266     val groups' = groups
   267       |> fold (fn gid => Inttab.remove_list (op =) (gid, task)) (group_ancestry group);
   268     val jobs' = Task_Graph.del_node task jobs;
   269     val maximal = null (Task_Graph.imm_succs jobs task);
   270     val cache' = if maximal then cache else Unknown;
   271   in (maximal, make_queue groups' jobs' cache') end;
   272 
   273 end;