src/Pure/Concurrent/future.ML
changeset 33430 0a1c0c1209ec
parent 33429 a69ddd7dce95
child 33431 e351f4c1f18c
equal deleted inserted replaced
33429:a69ddd7dce95 33430:0a1c0c1209ec
   203     val _ = active := false;
   203     val _ = active := false;
   204     val _ = wait cond;
   204     val _ = wait cond;
   205     val _ = active := true;
   205     val _ = active := true;
   206   in () end;
   206   in () end;
   207 
   207 
   208 fun worker_next has_work = (*requires SYNCHRONIZED*)
   208 fun worker_next have_work = (*requires SYNCHRONIZED*)
   209   if length (! workers) > ! max_workers then
   209   if length (! workers) > ! max_workers then
   210     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
   210     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
       
   211      if have_work then signal work_available else ();
   211      broadcast scheduler_event;
   212      broadcast scheduler_event;
   212      if has_work then signal work_available else ();
       
   213      NONE)
   213      NONE)
   214   else if count_active () > ! max_active then
   214   else if count_active () > ! max_active then
   215     (if has_work then signal work_available else ();
   215     (if have_work then signal work_available else ();
   216      worker_wait scheduler_event;
   216      worker_wait scheduler_event;
   217      worker_next false)
   217      worker_next false)
   218   else
   218   else
   219     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   219     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   220       NONE => (worker_wait work_available; worker_next true)
   220       NONE => (worker_wait work_available; worker_next true)
   265 
   265 
   266     (*worker threads*)
   266     (*worker threads*)
   267     val _ =
   267     val _ =
   268       if forall (Thread.isActive o #1) (! workers) then ()
   268       if forall (Thread.isActive o #1) (! workers) then ()
   269       else
   269       else
   270         (case List.partition (Thread.isActive o #1) (! workers) of
   270         let
   271           (_, []) => ()
   271           val  (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
   272         | (alive, dead) =>
   272           val _ = workers := alive;
   273             (workers := alive; Multithreading.tracing 0 (fn () =>
   273         in
   274               "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")));
   274           Multithreading.tracing 0 (fn () =>
       
   275             "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
       
   276         end;
   275 
   277 
   276     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   278     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   277     val _ = max_active := m;
   279     val _ = max_active := m;
   278 
   280 
   279     val mm = if m = 9999 then 1 else m * 2;
   281     val mm = if m = 9999 then 1 else m * 2;
   361     NONE => Exn.Exn (SYS_ERROR "unfinished future")
   363     NONE => Exn.Exn (SYS_ERROR "unfinished future")
   362   | SOME (Exn.Exn Exn.Interrupt) =>
   364   | SOME (Exn.Exn Exn.Interrupt) =>
   363       Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
   365       Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
   364   | SOME res => res);
   366   | SOME res => res);
   365 
   367 
   366 fun join_wait x =
   368 fun passive_wait x =
   367   Synchronized.readonly_access (result_of x) (fn NONE => NONE | SOME _ => SOME ());
   369   Synchronized.readonly_access (result_of x) (fn NONE => NONE | SOME _ => SOME ());
   368 
   370 
   369 fun join_next deps = (*requires SYNCHRONIZED*)
   371 fun join_next deps = (*requires SYNCHRONIZED*)
   370   if null deps then NONE
   372   if null deps then NONE
   371   else
   373   else
   390   else if Multithreading.self_critical () then
   392   else if Multithreading.self_critical () then
   391     error "Cannot join future values within critical section"
   393     error "Cannot join future values within critical section"
   392   else
   394   else
   393     (case worker_task () of
   395     (case worker_task () of
   394       SOME task => join_depend task (map task_of xs)
   396       SOME task => join_depend task (map task_of xs)
   395     | NONE => List.app join_wait xs;
   397     | NONE => List.app passive_wait xs;
   396     map get_result xs);
   398     map get_result xs);
   397 
   399 
   398 end;
   400 end;
   399 
   401 
   400 fun join_result x = singleton join_results x;
   402 fun join_result x = singleton join_results x;