src/Pure/Concurrent/task_queue.ML
author wenzelm
Tue, 23 Sep 2008 15:48:54 +0200
changeset 28332 c33c8ad8de70
parent 28304 4b0477452943
child 28384 70abca69247b
permissions -rw-r--r--
IntGraph.del_node;
     1 (*  Title:      Pure/Concurrent/task_queue.ML
     2     ID:         $Id$
     3     Author:     Makarius
     4 
     5 Ordered queue of grouped tasks.
     6 *)
     7 
     8 signature TASK_QUEUE =
     9 sig
    10   eqtype task
    11   val str_of_task: task -> string
    12   eqtype group
    13   val new_group: unit -> group
    14   val str_of_group: group -> string
    15   type queue
    16   val empty: queue
    17   val is_empty: queue -> bool
    18   val enqueue: group -> task list -> bool -> (bool -> bool) -> queue -> task * queue
    19   val depend: task list -> task -> queue -> queue
    20   val focus: task list -> queue -> queue
    21   val dequeue: queue -> (task * group * (unit -> bool)) option * queue
    22   val dequeue_towards: task list -> queue -> (task * group * (unit -> bool)) option * queue
    23   val interrupt: queue -> task -> unit
    24   val interrupt_external: queue -> string -> unit
    25   val finish: task -> queue -> queue
    26   val cancel: queue -> group -> bool
    27 end;
    28 
    29 structure TaskQueue: TASK_QUEUE =
    30 struct
    31 
    32 (* identifiers *)
    33 
    34 datatype task = Task of serial;
    35 fun str_of_task (Task i) = string_of_int i;
    36 
    37 datatype group = Group of serial * bool ref;
    38 fun new_group () = Group (serial (), ref true);
    39 
    40 fun str_of_group (Group (i, ref ok)) =
    41   if ok then string_of_int i else enclose "(" ")" (string_of_int i);
    42 
    43 
    44 (* jobs *)
    45 
    46 datatype job =
    47   Job of bool * (bool -> bool) |   (*priority, job: status -> status*)
    48   Running of Thread.thread;
    49 
    50 type jobs = (group * job) IntGraph.T;
    51 
    52 fun get_group (jobs: jobs) (Task id) = #1 (IntGraph.get_node jobs id);
    53 fun get_job (jobs: jobs) (Task id) = #2 (IntGraph.get_node jobs id);
    54 fun map_job (Task id) f (jobs: jobs) = IntGraph.map_node id (apsnd f) jobs;
    55 
    56 fun add_job (Task id) (Task dep) (jobs: jobs) =
    57   IntGraph.add_edge_acyclic (dep, id) jobs handle IntGraph.UNDEF _ => jobs;
    58 
    59 fun check_job (jobs: jobs) (task as Task id) =
    60   if can (IntGraph.get_node jobs) id then SOME task else NONE;
    61 
    62 
    63 (* queue of grouped jobs *)
    64 
    65 datatype queue = Queue of
    66  {groups: task list Inttab.table,   (*groups with presently active members*)
    67   jobs: jobs,                       (*job dependency graph*)
    68   focus: task list};                (*particular collection of high-priority tasks*)
    69 
    70 fun make_queue groups jobs focus = Queue {groups = groups, jobs = jobs, focus = focus};
    71 
    72 val empty = make_queue Inttab.empty IntGraph.empty [];
    73 fun is_empty (Queue {jobs, ...}) = IntGraph.is_empty jobs;
    74 
    75 
    76 (* enqueue *)
    77 
    78 fun enqueue (group as Group (gid, _)) deps pri job (Queue {groups, jobs, focus}) =
    79   let
    80     val id = serial ();
    81     val task = Task id;
    82     val groups' = Inttab.cons_list (gid, task) groups;
    83     val jobs' = jobs
    84       |> IntGraph.new_node (id, (group, Job (pri, job))) |> fold (add_job task) deps;
    85   in (task, make_queue groups' jobs' focus) end;
    86 
    87 fun depend deps task (Queue {groups, jobs, focus}) =
    88   make_queue groups (fold (add_job task) deps jobs) focus;
    89 
    90 fun focus tasks (Queue {groups, jobs, ...}) =
    91   make_queue groups jobs (map_filter (check_job jobs) tasks);
    92 
    93 
    94 (* dequeue *)
    95 
    96 local
    97 
    98 fun dequeue_result NONE queue = (NONE, queue)
    99   | dequeue_result (SOME (result as (task, _, _))) (Queue {groups, jobs, focus}) =
   100       (SOME result, make_queue groups (map_job task (K (Running (Thread.self ()))) jobs) focus);
   101 
   102 fun dequeue_global req_pri (queue as Queue {jobs, ...}) =
   103   let
   104     fun ready (id, ((group as Group (_, ref ok), Job (pri, job)), ([], _))) =
   105           if pri = req_pri then SOME (Task id, group, (fn () => job ok)) else NONE
   106       | ready _ = NONE;
   107   in dequeue_result (IntGraph.get_first ready jobs) queue end;
   108 
   109 fun dequeue_local focus (queue as Queue {jobs, ...}) =
   110   let
   111     fun ready id =
   112       (case IntGraph.get_node jobs id of
   113         (group as Group (_, ref ok), Job (_, job)) =>
   114           if null (IntGraph.imm_preds jobs id) then SOME (Task id, group, (fn () => job ok))
   115           else NONE
   116       | _ => NONE);
   117     val ids = map (fn Task id => id) focus;
   118   in dequeue_result (get_first ready (IntGraph.all_preds jobs ids)) queue end;
   119 
   120 in
   121 
   122 fun dequeue (queue as Queue {focus, ...}) =
   123   (case dequeue_local focus queue of
   124     (NONE, _) =>
   125       (case dequeue_global true queue of (NONE, _) => dequeue_global false queue | res => res)
   126   | res => res);
   127 
   128 fun dequeue_towards tasks (queue as Queue {jobs, ...}) =
   129   dequeue_local (map_filter (check_job jobs) tasks) queue;
   130 
   131 end;
   132 
   133 
   134 (* sporadic interrupts *)
   135 
   136 fun interrupt_thread thread = Thread.interrupt thread handle Thread _ => ();
   137 
   138 fun interrupt (Queue {jobs, ...}) task =
   139   (case try (get_job jobs) task of SOME (Running thread) => interrupt_thread thread | _ => ());
   140 
   141 fun interrupt_external queue str =
   142   (case Int.fromString str of SOME id => interrupt queue (Task id) | NONE => ());
   143 
   144 
   145 (* misc operations *)
   146 
   147 fun cancel (Queue {groups, jobs, ...}) (Group (gid, ok)) =
   148   let
   149     val _ = ok := false;  (*invalidate any future group members*)
   150     val tasks = Inttab.lookup_list groups gid;
   151     val running = fold (get_job jobs #> (fn Running thread => cons thread | _ => I)) tasks [];
   152     val _ = List.app interrupt_thread running;
   153   in null running end;
   154 
   155 fun finish (task as Task id) (Queue {groups, jobs, focus}) =
   156   let
   157     val Group (gid, _) = get_group jobs task;
   158     val groups' = Inttab.remove_list (op =) (gid, task) groups;
   159     val jobs' = IntGraph.del_node id jobs;
   160     val focus' = remove (op =) task focus;
   161   in make_queue groups' jobs' focus' end;
   162 
   163 end;