equal
deleted
inserted
replaced
203 val _ = active := false; |
203 val _ = active := false; |
204 val _ = wait cond; |
204 val _ = wait cond; |
205 val _ = active := true; |
205 val _ = active := true; |
206 in () end; |
206 in () end; |
207 |
207 |
208 fun worker_next has_work = (*requires SYNCHRONIZED*) |
208 fun worker_next have_work = (*requires SYNCHRONIZED*) |
209 if length (! workers) > ! max_workers then |
209 if length (! workers) > ! max_workers then |
210 (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ())); |
210 (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ())); |
|
211 if have_work then signal work_available else (); |
211 broadcast scheduler_event; |
212 broadcast scheduler_event; |
212 if has_work then signal work_available else (); |
|
213 NONE) |
213 NONE) |
214 else if count_active () > ! max_active then |
214 else if count_active () > ! max_active then |
215 (if has_work then signal work_available else (); |
215 (if have_work then signal work_available else (); |
216 worker_wait scheduler_event; |
216 worker_wait scheduler_event; |
217 worker_next false) |
217 worker_next false) |
218 else |
218 else |
219 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of |
219 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of |
220 NONE => (worker_wait work_available; worker_next true) |
220 NONE => (worker_wait work_available; worker_next true) |
265 |
265 |
266 (*worker threads*) |
266 (*worker threads*) |
267 val _ = |
267 val _ = |
268 if forall (Thread.isActive o #1) (! workers) then () |
268 if forall (Thread.isActive o #1) (! workers) then () |
269 else |
269 else |
270 (case List.partition (Thread.isActive o #1) (! workers) of |
270 let |
271 (_, []) => () |
271 val (alive, dead) = List.partition (Thread.isActive o #1) (! workers); |
272 | (alive, dead) => |
272 val _ = workers := alive; |
273 (workers := alive; Multithreading.tracing 0 (fn () => |
273 in |
274 "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads"))); |
274 Multithreading.tracing 0 (fn () => |
|
275 "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads") |
|
276 end; |
275 |
277 |
276 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value (); |
278 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value (); |
277 val _ = max_active := m; |
279 val _ = max_active := m; |
278 |
280 |
279 val mm = if m = 9999 then 1 else m * 2; |
281 val mm = if m = 9999 then 1 else m * 2; |
361 NONE => Exn.Exn (SYS_ERROR "unfinished future") |
363 NONE => Exn.Exn (SYS_ERROR "unfinished future") |
362 | SOME (Exn.Exn Exn.Interrupt) => |
364 | SOME (Exn.Exn Exn.Interrupt) => |
363 Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x)))) |
365 Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x)))) |
364 | SOME res => res); |
366 | SOME res => res); |
365 |
367 |
366 fun join_wait x = |
368 fun passive_wait x = |
367 Synchronized.readonly_access (result_of x) (fn NONE => NONE | SOME _ => SOME ()); |
369 Synchronized.readonly_access (result_of x) (fn NONE => NONE | SOME _ => SOME ()); |
368 |
370 |
369 fun join_next deps = (*requires SYNCHRONIZED*) |
371 fun join_next deps = (*requires SYNCHRONIZED*) |
370 if null deps then NONE |
372 if null deps then NONE |
371 else |
373 else |
390 else if Multithreading.self_critical () then |
392 else if Multithreading.self_critical () then |
391 error "Cannot join future values within critical section" |
393 error "Cannot join future values within critical section" |
392 else |
394 else |
393 (case worker_task () of |
395 (case worker_task () of |
394 SOME task => join_depend task (map task_of xs) |
396 SOME task => join_depend task (map task_of xs) |
395 | NONE => List.app join_wait xs; |
397 | NONE => List.app passive_wait xs; |
396 map get_result xs); |
398 map get_result xs); |
397 |
399 |
398 end; |
400 end; |
399 |
401 |
400 fun join_result x = singleton join_results x; |
402 fun join_result x = singleton join_results x; |