src/Pure/Concurrent/task_queue.ML
author wenzelm
Wed, 10 Sep 2008 19:44:28 +0200
changeset 28190 0a2434cf38c9
parent 28185 0f20cbce4935
child 28196 f019dd2db561
permissions -rw-r--r--
cancel: invalidate group implicitly, via bool ref;
job: moved ok flag into group;
added interrupt, interrupt_external for tasks;
wenzelm@28165
     1
(*  Title:      Pure/Concurrent/task_queue.ML
wenzelm@28165
     2
    ID:         $Id$
wenzelm@28165
     3
    Author:     Makarius
wenzelm@28165
     4
wenzelm@28165
     5
Ordered queue of grouped tasks.
wenzelm@28165
     6
*)
wenzelm@28165
     7
wenzelm@28165
     8
signature TASK_QUEUE =
wenzelm@28165
     9
sig
wenzelm@28165
    10
  eqtype task
wenzelm@28165
    11
  eqtype group
wenzelm@28165
    12
  val new_group: unit -> group
wenzelm@28179
    13
  val str_of_task: task -> string
wenzelm@28179
    14
  val str_of_group: group -> string
wenzelm@28165
    15
  type queue
wenzelm@28165
    16
  val empty: queue
wenzelm@28176
    17
  val enqueue: group -> task list -> (bool -> bool) -> queue -> task * queue
wenzelm@28185
    18
  val depend: task list -> task -> queue -> queue
wenzelm@28185
    19
  val dequeue: queue -> (task * group * (unit -> bool)) option * queue
wenzelm@28185
    20
  val dequeue_towards: task list -> queue -> (task * group * (unit -> bool)) option * queue
wenzelm@28190
    21
  val interrupt: queue -> task -> unit
wenzelm@28190
    22
  val interrupt_external: queue -> string -> unit
wenzelm@28176
    23
  val finish: task -> queue -> queue
wenzelm@28190
    24
  val cancel: queue -> group -> bool
wenzelm@28165
    25
end;
wenzelm@28165
    26
wenzelm@28171
    27
structure TaskQueue: TASK_QUEUE =
wenzelm@28165
    28
struct
wenzelm@28165
    29
wenzelm@28168
    30
(* identifiers *)
wenzelm@28165
    31
wenzelm@28165
    32
datatype task = Task of serial;
wenzelm@28190
    33
fun str_of_task (Task i) = string_of_int i;
wenzelm@28165
    34
wenzelm@28190
    35
datatype group = Group of serial * bool ref;
wenzelm@28190
    36
fun new_group () = Group (serial (), ref true);
wenzelm@28165
    37
wenzelm@28190
    38
fun str_of_group (Group (i, ref ok)) =
wenzelm@28190
    39
  if ok then string_of_int i else enclose "(" ")" (string_of_int i);
wenzelm@28179
    40
wenzelm@28165
    41
wenzelm@28176
    42
(* jobs *)
wenzelm@28165
    43
wenzelm@28165
    44
datatype job =
wenzelm@28190
    45
  Job of bool -> bool |
wenzelm@28165
    46
  Running of Thread.thread;
wenzelm@28165
    47
wenzelm@28176
    48
type jobs = (group * job) IntGraph.T;
wenzelm@28176
    49
wenzelm@28185
    50
fun defined_job (jobs: jobs) (Task id) = can (IntGraph.get_node jobs) id;
wenzelm@28176
    51
fun get_group (jobs: jobs) (Task id) = #1 (IntGraph.get_node jobs id);
wenzelm@28176
    52
fun get_job (jobs: jobs) (Task id) = #2 (IntGraph.get_node jobs id);
wenzelm@28176
    53
fun map_job (Task id) f (jobs: jobs) = IntGraph.map_node id (apsnd f) jobs;
wenzelm@28185
    54
fun add_job (Task id) (Task dep) (jobs: jobs) =
wenzelm@28185
    55
  IntGraph.add_edge_acyclic (dep, id) jobs handle IntGraph.UNDEF _ => jobs;
wenzelm@28176
    56
wenzelm@28176
    57
wenzelm@28176
    58
(* queue of grouped jobs *)
wenzelm@28176
    59
wenzelm@28165
    60
datatype queue = Queue of
wenzelm@28184
    61
 {groups: task list Inttab.table,   (*groups with presently active members*)
wenzelm@28184
    62
  jobs: jobs};                      (*job dependency graph*)
wenzelm@28165
    63
wenzelm@28184
    64
fun make_queue groups jobs = Queue {groups = groups, jobs = jobs};
wenzelm@28184
    65
val empty = make_queue Inttab.empty IntGraph.empty;
wenzelm@28165
    66
wenzelm@28165
    67
wenzelm@28185
    68
(* enqueue *)
wenzelm@28165
    69
wenzelm@28190
    70
fun enqueue (group as Group (gid, _)) deps job (Queue {groups, jobs}) =
wenzelm@28165
    71
  let
wenzelm@28165
    72
    val id = serial ();
wenzelm@28165
    73
    val task = Task id;
wenzelm@28176
    74
    val groups' = Inttab.cons_list (gid, task) groups;
wenzelm@28185
    75
    val jobs' = jobs
wenzelm@28190
    76
      |> IntGraph.new_node (id, (group, Job job)) |> fold (add_job task) deps;
wenzelm@28184
    77
  in (task, make_queue groups' jobs') end;
wenzelm@28165
    78
wenzelm@28185
    79
fun depend deps task (Queue {groups, jobs}) =
wenzelm@28185
    80
  make_queue groups (fold (add_job task) deps jobs);
wenzelm@28185
    81
wenzelm@28185
    82
wenzelm@28185
    83
(* dequeue *)
wenzelm@28185
    84
wenzelm@28185
    85
fun dequeue_if P (queue as Queue {groups, jobs}) =
wenzelm@28165
    86
  let
wenzelm@28190
    87
    fun ready (id, ((group as Group (_, ref ok), Job job), ([], _))) =
wenzelm@28185
    88
          if P id then SOME (Task id, group, (fn () => job ok)) else NONE
wenzelm@28184
    89
      | ready _ = NONE;
wenzelm@28165
    90
  in
wenzelm@28184
    91
    (case IntGraph.get_first ready jobs of
wenzelm@28184
    92
      NONE => (NONE, queue)
wenzelm@28184
    93
    | SOME result =>
wenzelm@28185
    94
        let val jobs' = map_job (#1 result) (K (Running (Thread.self ()))) jobs
wenzelm@28184
    95
        in (SOME result, make_queue groups jobs') end)
wenzelm@28165
    96
  end;
wenzelm@28165
    97
wenzelm@28185
    98
val dequeue = dequeue_if (K true);
wenzelm@28185
    99
wenzelm@28185
   100
fun dequeue_towards tasks (queue as Queue {jobs, ...}) =
wenzelm@28185
   101
  let val ids = tasks
wenzelm@28185
   102
    |> map_filter (fn task as Task id => if defined_job jobs task then SOME id else NONE)
wenzelm@28185
   103
  in dequeue_if (member (op =) (IntGraph.all_preds jobs ids)) queue end;
wenzelm@28185
   104
wenzelm@28176
   105
wenzelm@28190
   106
(* sporadic interrupts *)
wenzelm@28190
   107
wenzelm@28190
   108
fun interrupt_thread thread = Thread.interrupt thread handle Thread _ => ();
wenzelm@28190
   109
wenzelm@28190
   110
fun interrupt (Queue {jobs, ...}) task =
wenzelm@28190
   111
  (case try (get_job jobs) task of SOME (Running thread) => interrupt_thread thread | _ => ());
wenzelm@28190
   112
wenzelm@28190
   113
fun interrupt_external queue str =
wenzelm@28190
   114
  (case Int.fromString str of SOME id => interrupt queue (Task id) | NONE => ());
wenzelm@28190
   115
wenzelm@28190
   116
wenzelm@28176
   117
(* termination *)
wenzelm@28176
   118
wenzelm@28190
   119
fun cancel (Queue {groups, jobs}) (group as Group (gid, ok)) =
wenzelm@28165
   120
  let
wenzelm@28190
   121
    val _ = ok := false;  (*invalidate any future group members*)
wenzelm@28176
   122
    val tasks = Inttab.lookup_list groups gid;
wenzelm@28176
   123
    val running = fold (get_job jobs #> (fn Running thread => cons thread | _ => I)) tasks [];
wenzelm@28190
   124
    val _ = List.app interrupt_thread running;
wenzelm@28190
   125
  in null running end;
wenzelm@28176
   126
wenzelm@28184
   127
fun finish (task as Task id) (Queue {groups, jobs}) =
wenzelm@28176
   128
  let
wenzelm@28190
   129
    val Group (gid, _) = get_group jobs task;
wenzelm@28176
   130
    val groups' = Inttab.remove_list (op =) (gid, task) groups;
wenzelm@28165
   131
    val jobs' = IntGraph.del_nodes [id] jobs;
wenzelm@28184
   132
  in make_queue groups' jobs' end;
wenzelm@28165
   133
wenzelm@28165
   134
end;