eliminated cache, which complicates the code without making a real difference (NB: deque_towards often disrupts caching, and daisy-chaining of workers already reduces queue overhead);
authorwenzelm
Wed, 06 Jan 2010 18:14:16 +0100
changeset 3428016bf3e9786a3
parent 34279 02936e77a07c
child 34281 eedea6f0b37e
eliminated cache, which complicates the code without making a real difference (NB: deque_towards often disrupts caching, and daisy-chaining of workers already reduces queue overhead);
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/task_queue.ML
     1.1 --- a/src/Pure/Concurrent/future.ML	Wed Jan 06 15:07:56 2010 +0100
     1.2 +++ b/src/Pure/Concurrent/future.ML	Wed Jan 06 18:14:16 2010 +0100
     1.3 @@ -179,7 +179,7 @@
     1.4    in (result, job) end;
     1.5  
     1.6  fun cancel_now group = (*requires SYNCHRONIZED*)
     1.7 -  Unsynchronized.change_result queue (Task_Queue.cancel group);
     1.8 +  Task_Queue.cancel (! queue) group;
     1.9  
    1.10  fun cancel_later group = (*requires SYNCHRONIZED*)
    1.11   (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
    1.12 @@ -351,7 +351,7 @@
    1.13    in continue end
    1.14    handle Exn.Interrupt =>
    1.15     (Multithreading.tracing 1 (fn () => "Interrupt");
    1.16 -    List.app cancel_later (Unsynchronized.change_result queue Task_Queue.cancel_all);
    1.17 +    List.app cancel_later (Task_Queue.cancel_all (! queue));
    1.18      broadcast_work (); true);
    1.19  
    1.20  fun scheduler_loop () =
     2.1 --- a/src/Pure/Concurrent/task_queue.ML	Wed Jan 06 15:07:56 2010 +0100
     2.2 +++ b/src/Pure/Concurrent/task_queue.ML	Wed Jan 06 18:14:16 2010 +0100
     2.3 @@ -22,8 +22,8 @@
     2.4    val empty: queue
     2.5    val all_passive: queue -> bool
     2.6    val status: queue -> {ready: int, pending: int, running: int, passive: int}
     2.7 -  val cancel: group -> queue -> bool * queue
     2.8 -  val cancel_all: queue -> group list * queue
     2.9 +  val cancel: queue -> group -> bool
    2.10 +  val cancel_all: queue -> group list
    2.11    val enqueue_passive: group -> queue -> task * queue
    2.12    val enqueue: group -> task list -> int -> (bool -> bool) -> queue -> (task * bool) * queue
    2.13    val extend: task -> (bool -> bool) -> queue -> queue option
    2.14 @@ -118,16 +118,13 @@
    2.15  
    2.16  (* queue of grouped jobs *)
    2.17  
    2.18 -datatype result = Unknown | Result of task | No_Result;
    2.19 -
    2.20  datatype queue = Queue of
    2.21   {groups: task list Inttab.table,   (*groups with presently active members*)
    2.22 -  jobs: jobs,                       (*job dependency graph*)
    2.23 -  cache: result};                   (*last dequeue result*)
    2.24 +  jobs: jobs};                      (*job dependency graph*)
    2.25  
    2.26 -fun make_queue groups jobs cache = Queue {groups = groups, jobs = jobs, cache = cache};
    2.27 +fun make_queue groups jobs = Queue {groups = groups, jobs = jobs};
    2.28  
    2.29 -val empty = make_queue Inttab.empty Task_Graph.empty No_Result;
    2.30 +val empty = make_queue Inttab.empty Task_Graph.empty;
    2.31  
    2.32  fun all_passive (Queue {jobs, ...}) =
    2.33    Task_Graph.get_first NONE
    2.34 @@ -150,16 +147,16 @@
    2.35  
    2.36  (* cancel -- peers and sub-groups *)
    2.37  
    2.38 -fun cancel group (Queue {groups, jobs, ...}) =
    2.39 +fun cancel (Queue {groups, jobs}) group =
    2.40    let
    2.41      val _ = cancel_group group Exn.Interrupt;
    2.42      val tasks = Inttab.lookup_list groups (group_id group);
    2.43      val running =
    2.44        fold (get_job jobs #> (fn Running t => insert Thread.equal t | _ => I)) tasks [];
    2.45      val _ = List.app SimpleThread.interrupt running;
    2.46 -  in (null running, make_queue groups jobs Unknown) end;
    2.47 +  in null running end;
    2.48  
    2.49 -fun cancel_all (Queue {groups, jobs, ...}) =
    2.50 +fun cancel_all (Queue {groups, jobs}) =
    2.51    let
    2.52      fun cancel_job (group, job) (groups, running) =
    2.53        (cancel_group group Exn.Interrupt;
    2.54 @@ -168,20 +165,20 @@
    2.55          | _ => (groups, running)));
    2.56      val (running_groups, running) = Task_Graph.fold (cancel_job o #1 o #2) jobs ([], []);
    2.57      val _ = List.app SimpleThread.interrupt running;
    2.58 -  in (running_groups, make_queue groups jobs Unknown) end;
    2.59 +  in running_groups end;
    2.60  
    2.61  
    2.62  (* enqueue *)
    2.63  
    2.64 -fun enqueue_passive group (Queue {groups, jobs, cache}) =
    2.65 +fun enqueue_passive group (Queue {groups, jobs}) =
    2.66    let
    2.67      val task = new_task NONE;
    2.68      val groups' = groups
    2.69        |> fold (fn gid => Inttab.cons_list (gid, task)) (group_ancestry group);
    2.70      val jobs' = jobs |> Task_Graph.new_node (task, (group, Passive));
    2.71 -  in (task, make_queue groups' jobs' cache) end;
    2.72 +  in (task, make_queue groups' jobs') end;
    2.73  
    2.74 -fun enqueue group deps pri job (Queue {groups, jobs, cache}) =
    2.75 +fun enqueue group deps pri job (Queue {groups, jobs}) =
    2.76    let
    2.77      val task = new_task (SOME pri);
    2.78      val groups' = groups
    2.79 @@ -191,49 +188,36 @@
    2.80        |> fold (add_job task) deps
    2.81        |> fold (fold (add_job task) o get_deps jobs) deps;
    2.82      val minimal = null (get_deps jobs' task);
    2.83 -    val cache' =
    2.84 -      (case cache of
    2.85 -        Result last =>
    2.86 -          if task_ord (last, task) = LESS
    2.87 -          then cache else Unknown
    2.88 -      | _ => Unknown);
    2.89 -  in ((task, minimal), make_queue groups' jobs' cache') end;
    2.90 +  in ((task, minimal), make_queue groups' jobs') end;
    2.91  
    2.92 -fun extend task job (Queue {groups, jobs, cache}) =
    2.93 +fun extend task job (Queue {groups, jobs}) =
    2.94    (case try (get_job jobs) task of
    2.95 -    SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs) cache)
    2.96 +    SOME (Job list) => SOME (make_queue groups (set_job task (Job (job :: list)) jobs))
    2.97    | _ => NONE);
    2.98  
    2.99  
   2.100  (* dequeue *)
   2.101  
   2.102 -fun dequeue thread (queue as Queue {groups, jobs, cache}) =
   2.103 +fun dequeue thread (queue as Queue {groups, jobs}) =
   2.104    let
   2.105      fun ready (task, ((group, Job list), (deps, _))) =
   2.106            if is_ready deps group then SOME (task, group, rev list) else NONE
   2.107        | ready _ = NONE;
   2.108 -    fun deq boundary =
   2.109 -      (case Task_Graph.get_first boundary ready jobs of
   2.110 -        NONE => (NONE, make_queue groups jobs No_Result)
   2.111 -      | SOME (result as (task, _, _)) =>
   2.112 -          let
   2.113 -            val jobs' = set_job task (Running thread) jobs;
   2.114 -            val cache' = Result task;
   2.115 -          in (SOME result, make_queue groups jobs' cache') end);
   2.116    in
   2.117 -    (case cache of
   2.118 -      Unknown => deq NONE
   2.119 -    | Result last => deq (SOME last)
   2.120 -    | No_Result => (NONE, queue))
   2.121 +    (case Task_Graph.get_first NONE ready jobs of
   2.122 +      NONE => (NONE, queue)
   2.123 +    | SOME (result as (task, _, _)) =>
   2.124 +        let val jobs' = set_job task (Running thread) jobs
   2.125 +        in (SOME result, make_queue groups jobs') end)
   2.126    end;
   2.127  
   2.128  
   2.129  (* dequeue_towards -- adhoc dependencies *)
   2.130  
   2.131 -fun depend task deps (Queue {groups, jobs, ...}) =
   2.132 -  make_queue groups (fold (add_dep task) deps jobs) Unknown;
   2.133 +fun depend task deps (Queue {groups, jobs}) =
   2.134 +  make_queue groups (fold (add_dep task) deps jobs);
   2.135  
   2.136 -fun dequeue_towards thread deps (queue as Queue {groups, jobs, ...}) =
   2.137 +fun dequeue_towards thread deps (queue as Queue {groups, jobs}) =
   2.138    let
   2.139      fun ready task =
   2.140        (case Task_Graph.get_node jobs task of
   2.141 @@ -244,10 +228,8 @@
   2.142        | _ => NONE);
   2.143      val tasks = filter (can (Task_Graph.get_node jobs)) deps;
   2.144      fun result (res as (task, _, _)) =
   2.145 -      let
   2.146 -        val jobs' = set_job task (Running thread) jobs;
   2.147 -        val cache' = Unknown;
   2.148 -      in ((SOME res, tasks), make_queue groups jobs' cache') end;
   2.149 +      let val jobs' = set_job task (Running thread) jobs
   2.150 +      in ((SOME res, tasks), make_queue groups jobs') end;
   2.151    in
   2.152      (case get_first ready tasks of
   2.153        SOME res => result res
   2.154 @@ -260,14 +242,13 @@
   2.155  
   2.156  (* finish *)
   2.157  
   2.158 -fun finish task (Queue {groups, jobs, cache}) =
   2.159 +fun finish task (Queue {groups, jobs}) =
   2.160    let
   2.161      val group = get_group jobs task;
   2.162      val groups' = groups
   2.163        |> fold (fn gid => Inttab.remove_list (op =) (gid, task)) (group_ancestry group);
   2.164      val jobs' = Task_Graph.del_node task jobs;
   2.165      val maximal = null (Task_Graph.imm_succs jobs task);
   2.166 -    val cache' = if maximal then cache else Unknown;
   2.167 -  in (maximal, make_queue groups' jobs' cache') end;
   2.168 +  in (maximal, make_queue groups' jobs') end;
   2.169  
   2.170  end;