wenzelm@28156: (* Title: Pure/Concurrent/future.ML wenzelm@28156: Author: Makarius wenzelm@28156: wenzelm@32250: Future values, see also wenzelm@32250: http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf wenzelm@38175: http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf wenzelm@28201: wenzelm@28201: Notes: wenzelm@28201: wenzelm@28201: * Futures are similar to delayed evaluation, i.e. delay/force is wenzelm@28201: generalized to fork/join (and variants). The idea is to model wenzelm@28201: parallel value-oriented computations, but *not* communicating wenzelm@28201: processes. wenzelm@28201: wenzelm@28201: * Futures are grouped; failure of one group member causes the whole wenzelm@32220: group to be interrupted eventually. Groups are block-structured. wenzelm@28201: wenzelm@28201: * Forked futures are evaluated spontaneously by a farm of worker wenzelm@28201: threads in the background; join resynchronizes the computation and wenzelm@28201: delivers results (values or exceptions). wenzelm@28201: wenzelm@28201: * The pool of worker threads is limited, usually in correlation with wenzelm@28201: the number of physical cores on the machine. Note that allocation wenzelm@28201: of runtime resources is distorted either if workers yield CPU time wenzelm@28201: (e.g. via system sleep or wait operations), or if non-worker wenzelm@28201: threads contend for significant runtime resources independently. wenzelm@34277: wenzelm@34277: * Promised futures are fulfilled by external means. There is no wenzelm@34277: associated evaluation task, but other futures can depend on them wenzelm@34277: as usual. wenzelm@28156: *) wenzelm@28156: wenzelm@28156: signature FUTURE = wenzelm@28156: sig wenzelm@32823: val worker_task: unit -> Task_Queue.task option wenzelm@32122: val worker_group: unit -> Task_Queue.group option wenzelm@38133: val worker_subgroup: unit -> Task_Queue.group wenzelm@28972: type 'a future wenzelm@42563: val task_of: 'a future -> Task_Queue.task wenzelm@28972: val peek: 'a future -> 'a Exn.result option wenzelm@28972: val is_finished: 'a future -> bool wenzelm@42563: val forks: wenzelm@42563: {name: string, group: Task_Queue.group option, deps: Task_Queue.task list, pri: int} -> wenzelm@42563: (unit -> 'a) list -> 'a future list wenzelm@29120: val fork_pri: int -> (unit -> 'a) -> 'a future wenzelm@32729: val fork: (unit -> 'a) -> 'a future wenzelm@28972: val join_results: 'a future list -> 'a Exn.result list wenzelm@28972: val join_result: 'a future -> 'a Exn.result wenzelm@28972: val join: 'a future -> 'a wenzelm@34277: val value: 'a -> 'a future wenzelm@28972: val map: ('a -> 'b) -> 'a future -> 'b future wenzelm@42563: val promise_group: Task_Queue.group -> 'a future wenzelm@34277: val promise: unit -> 'a future wenzelm@34277: val fulfill_result: 'a future -> 'a Exn.result -> unit wenzelm@34277: val fulfill: 'a future -> 'a -> unit wenzelm@30621: val interruptible_task: ('a -> 'b) -> 'a -> 'b wenzelm@42563: val cancel_group: Task_Queue.group -> unit wenzelm@28972: val cancel: 'a future -> unit wenzelm@28203: val shutdown: unit -> unit wenzelm@38492: val status: (unit -> 'a) -> 'a wenzelm@28156: end; wenzelm@28156: wenzelm@28156: structure Future: FUTURE = wenzelm@28156: struct wenzelm@28156: wenzelm@28177: (** future values **) wenzelm@28177: wenzelm@28167: (* identifiers *) wenzelm@28167: wenzelm@32074: local wenzelm@42563: val tag = Universal.tag () : Task_Queue.task option Universal.tag; wenzelm@32074: in wenzelm@42563: fun worker_task () = the_default NONE (Thread.getLocal tag); wenzelm@42563: fun setmp_worker_task data f x = wenzelm@42563: Library.setmp_thread_data tag (worker_task ()) (SOME data) f x; wenzelm@28167: end; wenzelm@28167: wenzelm@42563: val worker_group = Option.map Task_Queue.group_of_task o worker_task; wenzelm@38133: fun worker_subgroup () = Task_Queue.new_group (worker_group ()); wenzelm@34277: wenzelm@42559: fun worker_joining e = wenzelm@42559: (case worker_task () of wenzelm@42559: NONE => e () wenzelm@42559: | SOME task => Task_Queue.joining task e); wenzelm@42559: wenzelm@42560: fun worker_waiting deps e = wenzelm@42541: (case worker_task () of wenzelm@42541: NONE => e () wenzelm@42560: | SOME task => Task_Queue.waiting task deps e); wenzelm@42541: wenzelm@28167: wenzelm@28167: (* datatype future *) wenzelm@28167: wenzelm@35016: type 'a result = 'a Exn.result Single_Assignment.var; wenzelm@35016: wenzelm@28972: datatype 'a future = Future of wenzelm@34277: {promised: bool, wenzelm@42563: task: Task_Queue.task, wenzelm@35016: result: 'a result}; wenzelm@28167: wenzelm@28167: fun task_of (Future {task, ...}) = task; wenzelm@32257: fun result_of (Future {result, ...}) = result; wenzelm@28167: wenzelm@35016: fun peek x = Single_Assignment.peek (result_of x); wenzelm@28558: fun is_finished x = is_some (peek x); wenzelm@28320: wenzelm@34277: fun assign_result group result res = wenzelm@34277: let wenzelm@38122: val _ = Single_Assignment.assign result res wenzelm@38122: handle exn as Fail _ => wenzelm@38122: (case Single_Assignment.peek result of wenzelm@39509: SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn) wenzelm@38122: | _ => reraise exn); wenzelm@34277: val ok = wenzelm@38122: (case the (Single_Assignment.peek result) of wenzelm@34277: Exn.Exn exn => (Task_Queue.cancel_group group exn; false) wenzelm@34277: | Exn.Result _ => true); wenzelm@34277: in ok end; wenzelm@29002: wenzelm@28167: wenzelm@28177: wenzelm@28177: (** scheduling **) wenzelm@28177: wenzelm@28177: (* synchronization *) wenzelm@28156: wenzelm@32219: val scheduler_event = ConditionVar.conditionVar (); wenzelm@32219: val work_available = ConditionVar.conditionVar (); wenzelm@32219: val work_finished = ConditionVar.conditionVar (); wenzelm@32219: wenzelm@28156: local wenzelm@28156: val lock = Mutex.mutex (); wenzelm@28156: in wenzelm@28156: wenzelm@37216: fun SYNCHRONIZED name = Simple_Thread.synchronized name lock; wenzelm@28156: wenzelm@32219: fun wait cond = (*requires SYNCHRONIZED*) wenzelm@32298: Multithreading.sync_wait NONE NONE cond lock; wenzelm@28206: wenzelm@32298: fun wait_timeout timeout cond = (*requires SYNCHRONIZED*) wenzelm@32298: Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock; wenzelm@28166: wenzelm@32219: fun signal cond = (*requires SYNCHRONIZED*) wenzelm@32219: ConditionVar.signal cond; wenzelm@32219: wenzelm@32219: fun broadcast cond = (*requires SYNCHRONIZED*) wenzelm@28166: ConditionVar.broadcast cond; wenzelm@28156: wenzelm@32252: fun broadcast_work () = (*requires SYNCHRONIZED*) wenzelm@32252: (ConditionVar.broadcast work_available; wenzelm@32225: ConditionVar.broadcast work_finished); wenzelm@32225: wenzelm@28156: end; wenzelm@28156: wenzelm@28156: wenzelm@33431: (* global state *) wenzelm@33431: wenzelm@33431: val queue = Unsynchronized.ref Task_Queue.empty; wenzelm@33431: val next = Unsynchronized.ref 0; wenzelm@33431: val scheduler = Unsynchronized.ref (NONE: Thread.thread option); wenzelm@33431: val canceled = Unsynchronized.ref ([]: Task_Queue.group list); wenzelm@33431: val do_shutdown = Unsynchronized.ref false; wenzelm@33431: val max_workers = Unsynchronized.ref 0; wenzelm@33431: val max_active = Unsynchronized.ref 0; wenzelm@33432: val worker_trend = Unsynchronized.ref 0; wenzelm@33431: wenzelm@33431: datatype worker_state = Working | Waiting | Sleeping; wenzelm@33431: val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list); wenzelm@33431: wenzelm@33431: fun count_workers state = (*requires SYNCHRONIZED*) wenzelm@33431: fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0; wenzelm@33431: wenzelm@33431: wenzelm@32119: (* execute future jobs *) wenzelm@32119: wenzelm@32119: fun future_job group (e: unit -> 'a) = wenzelm@32119: let wenzelm@35016: val result = Single_Assignment.var "future" : 'a result; wenzelm@37053: val pos = Position.thread_data (); wenzelm@32127: fun job ok = wenzelm@32127: let wenzelm@32127: val res = wenzelm@32127: if ok then wenzelm@32230: Exn.capture (fn () => wenzelm@37053: Multithreading.with_attributes Multithreading.private_interrupts wenzelm@37053: (fn _ => Position.setmp_thread_data pos e ())) () wenzelm@39509: else Exn.interrupt_exn; wenzelm@34277: in assign_result group result res end; wenzelm@32119: in (result, job) end; wenzelm@28156: wenzelm@34279: fun cancel_now group = (*requires SYNCHRONIZED*) wenzelm@34280: Task_Queue.cancel (! queue) group; wenzelm@34279: wenzelm@34279: fun cancel_later group = (*requires SYNCHRONIZED*) wenzelm@32738: (Unsynchronized.change canceled (insert Task_Queue.eq_group group); wenzelm@32738: broadcast scheduler_event); wenzelm@29341: wenzelm@42563: fun execute (task, jobs) = wenzelm@28167: let wenzelm@42563: val group = Task_Queue.group_of_task task; wenzelm@32122: val valid = not (Task_Queue.is_canceled group); wenzelm@42541: val ok = wenzelm@42541: Task_Queue.running task (fn () => wenzelm@42563: setmp_worker_task task (fn () => wenzelm@42541: fold (fn job => fn ok => job valid andalso ok) jobs true) ()); wenzelm@42541: val _ = Multithreading.tracing 1 (fn () => wenzelm@42541: let wenzelm@42541: val s = Task_Queue.str_of_task task; wenzelm@42541: fun micros time = string_of_int (Time.toNanoseconds time div 1000); wenzelm@42560: val (run, wait, deps) = Task_Queue.timing_of_task task; wenzelm@42560: in "TASK " ^ s ^ " " ^ micros run ^ " " ^ micros wait ^ " (" ^ commas deps ^ ")" end); wenzelm@32250: val _ = SYNCHRONIZED "finish" (fn () => wenzelm@32219: let wenzelm@32738: val maximal = Unsynchronized.change_result queue (Task_Queue.finish task); wenzelm@32219: val _ = wenzelm@32219: if ok then () wenzelm@34279: else if cancel_now group then () wenzelm@34279: else cancel_later group; wenzelm@32219: val _ = broadcast work_finished; wenzelm@33434: val _ = if maximal then () else signal work_available; wenzelm@32219: in () end); wenzelm@28167: in () end; wenzelm@28167: wenzelm@28167: wenzelm@28167: (* worker threads *) wenzelm@28167: wenzelm@33431: fun worker_wait active cond = (*requires SYNCHRONIZED*) wenzelm@33427: let wenzelm@33431: val state = wenzelm@33431: (case AList.lookup Thread.equal (! workers) (Thread.self ()) of wenzelm@33431: SOME state => state wenzelm@33431: | NONE => raise Fail "Unregistered worker thread"); wenzelm@33431: val _ = state := (if active then Waiting else Sleeping); wenzelm@33427: val _ = wait cond; wenzelm@33431: val _ = state := Working; wenzelm@33427: in () end; wenzelm@28162: wenzelm@33436: fun worker_next () = (*requires SYNCHRONIZED*) wenzelm@33427: if length (! workers) > ! max_workers then wenzelm@33427: (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ())); wenzelm@33436: signal work_available; wenzelm@28167: NONE) wenzelm@33431: else if count_workers Working > ! max_active then wenzelm@33436: (worker_wait false work_available; worker_next ()) wenzelm@28166: else wenzelm@32738: (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of wenzelm@33436: NONE => (worker_wait false work_available; worker_next ()) wenzelm@33434: | some => (signal work_available; some)); wenzelm@28156: wenzelm@28167: fun worker_loop name = wenzelm@33436: (case SYNCHRONIZED name (fn () => worker_next ()) of wenzelm@29120: NONE => () wenzelm@33429: | SOME work => (execute work; worker_loop name)); wenzelm@28156: wenzelm@33428: fun worker_start name = (*requires SYNCHRONIZED*) wenzelm@37216: Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name), wenzelm@33431: Unsynchronized.ref Working)); wenzelm@28156: wenzelm@28156: wenzelm@28156: (* scheduler *) wenzelm@28156: wenzelm@33428: val status_ticks = Unsynchronized.ref 0; wenzelm@33428: wenzelm@33428: val last_round = Unsynchronized.ref Time.zeroTime; wenzelm@40553: val next_round = seconds 0.05; wenzelm@32226: wenzelm@28206: fun scheduler_next () = (*requires SYNCHRONIZED*) wenzelm@28156: let wenzelm@33428: val now = Time.now (); wenzelm@33428: val tick = Time.<= (Time.+ (! last_round, next_round), now); wenzelm@33428: val _ = if tick then last_round := now else (); wenzelm@33428: wenzelm@33436: wenzelm@33436: (* queue and worker status *) wenzelm@33436: wenzelm@32226: val _ = wenzelm@33428: if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else (); wenzelm@33428: val _ = wenzelm@33428: if tick andalso ! status_ticks = 0 then wenzelm@33428: Multithreading.tracing 1 (fn () => wenzelm@33428: let wenzelm@34277: val {ready, pending, running, passive} = Task_Queue.status (! queue); wenzelm@33428: val total = length (! workers); wenzelm@33431: val active = count_workers Working; wenzelm@33431: val waiting = count_workers Waiting; wenzelm@33428: in wenzelm@33428: "SCHEDULE " ^ Time.toString now ^ ": " ^ wenzelm@33428: string_of_int ready ^ " ready, " ^ wenzelm@33428: string_of_int pending ^ " pending, " ^ wenzelm@34277: string_of_int running ^ " running, " ^ wenzelm@34277: string_of_int passive ^ " passive; " ^ wenzelm@33428: string_of_int total ^ " workers, " ^ wenzelm@33431: string_of_int active ^ " active, " ^ wenzelm@33431: string_of_int waiting ^ " waiting " wenzelm@33428: end) wenzelm@33428: else (); wenzelm@32053: wenzelm@28191: val _ = wenzelm@32219: if forall (Thread.isActive o #1) (! workers) then () wenzelm@32115: else wenzelm@33430: let wenzelm@37698: val (alive, dead) = List.partition (Thread.isActive o #1) (! workers); wenzelm@33430: val _ = workers := alive; wenzelm@33430: in wenzelm@33430: Multithreading.tracing 0 (fn () => wenzelm@33430: "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads") wenzelm@33430: end; wenzelm@28191: wenzelm@33436: wenzelm@33436: (* worker pool adjustments *) wenzelm@33436: wenzelm@33436: val max_active0 = ! max_active; wenzelm@33436: val max_workers0 = ! max_workers; wenzelm@33436: wenzelm@28206: val m = if ! do_shutdown then 0 else Multithreading.max_threads_value (); wenzelm@33427: val _ = max_active := m; wenzelm@33427: wenzelm@33432: val mm = wenzelm@33432: if ! do_shutdown then 0 wenzelm@33432: else if m = 9999 then 1 wenzelm@33434: else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m); wenzelm@33432: val _ = wenzelm@33432: if tick andalso mm > ! max_workers then wenzelm@33432: Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1) wenzelm@33432: else if tick andalso mm < ! max_workers then wenzelm@33432: Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1) wenzelm@33432: else (); wenzelm@33432: val _ = wenzelm@33436: if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then wenzelm@33436: max_workers := mm wenzelm@33436: else if ! worker_trend > 5 andalso ! max_workers < 2 * m then wenzelm@33436: max_workers := Int.min (mm, 2 * m) wenzelm@33432: else (); wenzelm@33427: wenzelm@33428: val missing = ! max_workers - length (! workers); wenzelm@28203: val _ = wenzelm@33428: if missing > 0 then wenzelm@33436: funpow missing (fn () => wenzelm@33436: ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) () wenzelm@28203: else (); wenzelm@28206: wenzelm@33436: val _ = wenzelm@33436: if ! max_active = max_active0 andalso ! max_workers = max_workers0 then () wenzelm@33436: else signal work_available; wenzelm@33436: wenzelm@33436: wenzelm@33436: (* canceled groups *) wenzelm@33436: wenzelm@32225: val _ = wenzelm@32225: if null (! canceled) then () wenzelm@32293: else wenzelm@32293: (Multithreading.tracing 1 (fn () => wenzelm@32293: string_of_int (length (! canceled)) ^ " canceled groups"); wenzelm@34279: Unsynchronized.change canceled (filter_out cancel_now); wenzelm@32293: broadcast_work ()); wenzelm@28206: wenzelm@33436: wenzelm@33436: (* delay loop *) wenzelm@33436: wenzelm@32298: val _ = Exn.release (wait_timeout next_round scheduler_event); wenzelm@32219: wenzelm@33436: wenzelm@33436: (* shutdown *) wenzelm@33436: wenzelm@34277: val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else (); wenzelm@32219: val continue = not (! do_shutdown andalso null (! workers)); wenzelm@28206: val _ = if continue then () else scheduler := NONE; wenzelm@33436: wenzelm@32219: val _ = broadcast scheduler_event; wenzelm@32298: in continue end wenzelm@39509: handle exn => wenzelm@39509: if Exn.is_interrupt exn then wenzelm@39509: (Multithreading.tracing 1 (fn () => "Interrupt"); wenzelm@39509: List.app cancel_later (Task_Queue.cancel_all (! queue)); wenzelm@39509: broadcast_work (); true) wenzelm@39509: else reraise exn; wenzelm@32298: wenzelm@28206: fun scheduler_loop () = wenzelm@33437: while wenzelm@33437: Multithreading.with_attributes wenzelm@33437: (Multithreading.sync_interrupts Multithreading.public_interrupts) wenzelm@33437: (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ())) wenzelm@33437: do (); wenzelm@28191: wenzelm@28203: fun scheduler_active () = (*requires SYNCHRONIZED*) wenzelm@28203: (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread); wenzelm@28203: wenzelm@32228: fun scheduler_check () = (*requires SYNCHRONIZED*) wenzelm@32228: (do_shutdown := false; wenzelm@32252: if scheduler_active () then () wenzelm@37216: else scheduler := SOME (Simple_Thread.fork false scheduler_loop)); wenzelm@28191: wenzelm@28191: wenzelm@28191: wenzelm@29370: (** futures **) wenzelm@29370: wenzelm@29370: (* fork *) wenzelm@29370: wenzelm@42545: fun forks {name, group, deps, pri} es = wenzelm@42545: if null es then [] wenzelm@42545: else wenzelm@42545: let wenzelm@42545: val grp = wenzelm@42545: (case group of wenzelm@42545: NONE => worker_subgroup () wenzelm@42545: | SOME grp => grp); wenzelm@42545: fun enqueue e (minimal, queue) = wenzelm@42545: let wenzelm@42545: val (result, job) = future_job grp e; wenzelm@42545: val ((task, minimal'), queue') = Task_Queue.enqueue name grp deps pri job queue; wenzelm@42563: val future = Future {promised = false, task = task, result = result}; wenzelm@42545: in (future, (minimal orelse minimal', queue')) end; wenzelm@42545: in wenzelm@42545: SYNCHRONIZED "enqueue" (fn () => wenzelm@42545: let wenzelm@42545: val (futures, minimal) = wenzelm@42545: Unsynchronized.change_result queue (fn q => wenzelm@42545: let val (futures, (minimal, q')) = fold_map enqueue es (false, q) wenzelm@42545: in ((futures, minimal), q') end); wenzelm@42545: val _ = if minimal then signal work_available else (); wenzelm@42545: val _ = scheduler_check (); wenzelm@42545: in futures end) wenzelm@42545: end; wenzelm@28162: wenzelm@42545: fun fork_pri pri e = singleton (forks {name = "", group = NONE, deps = [], pri = pri}) e; wenzelm@42543: fun fork e = fork_pri 0 e; wenzelm@28162: wenzelm@28186: wenzelm@29370: (* join *) wenzelm@28186: wenzelm@29551: local wenzelm@29551: wenzelm@32119: fun get_result x = wenzelm@32119: (case peek x of wenzelm@38120: NONE => Exn.Exn (Fail "Unfinished future") wenzelm@39509: | SOME res => wenzelm@39509: if Exn.is_interrupt_exn res then wenzelm@42563: (case Exn.flatten_list (Task_Queue.group_status (Task_Queue.group_of_task (task_of x))) of wenzelm@39509: [] => res wenzelm@39509: | exns => Exn.Exn (Exn.EXCEPTIONS exns)) wenzelm@39509: else res); wenzelm@28177: wenzelm@32115: fun join_next deps = (*requires SYNCHRONIZED*) wenzelm@42561: if Task_Queue.finished_deps deps then NONE wenzelm@32224: else wenzelm@42561: (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of wenzelm@42561: (NONE, deps') => wenzelm@42561: if Task_Queue.finished_deps deps' then NONE wenzelm@42561: else (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps') wenzelm@32224: | (SOME work, deps') => SOME (work, deps')); wenzelm@32115: wenzelm@32823: fun execute_work NONE = () wenzelm@42559: | execute_work (SOME (work, deps')) = (worker_joining (fn () => execute work); join_work deps') wenzelm@32823: and join_work deps = wenzelm@32823: execute_work (SYNCHRONIZED "join" (fn () => join_next deps)); wenzelm@32823: wenzelm@29551: in wenzelm@29551: wenzelm@29370: fun join_results xs = wenzelm@42559: let wenzelm@42559: val _ = wenzelm@42559: if forall is_finished xs then () wenzelm@42559: else if Multithreading.self_critical () then wenzelm@42559: error "Cannot join future values within critical section" wenzelm@42563: else if is_some (worker_task ()) then wenzelm@42561: join_work (Task_Queue.init_deps (map task_of xs)) wenzelm@42561: else List.app (ignore o Single_Assignment.await o result_of) xs; wenzelm@42559: in map get_result xs end; wenzelm@28186: wenzelm@29551: end; wenzelm@29551: wenzelm@28647: fun join_result x = singleton join_results x; wenzelm@28647: fun join x = Exn.release (join_result x); wenzelm@28156: wenzelm@28979: wenzelm@34277: (* fast-path versions -- bypassing full task management *) wenzelm@34277: wenzelm@34277: fun value (x: 'a) = wenzelm@34277: let wenzelm@42563: val task = Task_Queue.dummy_task (); wenzelm@42563: val group = Task_Queue.group_of_task task; wenzelm@35016: val result = Single_Assignment.var "value" : 'a result; wenzelm@34277: val _ = assign_result group result (Exn.Result x); wenzelm@42563: in Future {promised = false, task = task, result = result} end; wenzelm@28191: wenzelm@29384: fun map_future f x = wenzelm@29370: let wenzelm@29384: val task = task_of x; wenzelm@42563: val group = Task_Queue.new_group (SOME (Task_Queue.group_of_task task)); wenzelm@29384: val (result, job) = future_job group (fn () => f (join x)); wenzelm@29384: wenzelm@32250: val extended = SYNCHRONIZED "extend" (fn () => wenzelm@29370: (case Task_Queue.extend task job (! queue) of wenzelm@29370: SOME queue' => (queue := queue'; true) wenzelm@29370: | NONE => false)); wenzelm@29370: in wenzelm@42563: if extended then Future {promised = false, task = task, result = result} wenzelm@42543: else wenzelm@42544: singleton wenzelm@42545: (forks {name = "Future.map", group = SOME group, wenzelm@42544: deps = [task], pri = Task_Queue.pri_of_task task}) wenzelm@42543: (fn () => f (join x)) wenzelm@29370: end; wenzelm@29370: wenzelm@29370: wenzelm@34277: (* promised futures -- fulfilled by external means *) wenzelm@34277: wenzelm@34277: fun promise_group group : 'a future = wenzelm@34277: let wenzelm@35016: val result = Single_Assignment.var "promise" : 'a result; wenzelm@39520: fun abort () = assign_result group result Exn.interrupt_exn wenzelm@39520: handle Fail _ => true wenzelm@39520: | exn => wenzelm@39520: if Exn.is_interrupt exn then raise Fail "Concurrent attempt to fulfill promise" wenzelm@39520: else reraise exn; wenzelm@38122: val task = SYNCHRONIZED "enqueue_passive" (fn () => wenzelm@38122: Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort)); wenzelm@42563: in Future {promised = true, task = task, result = result} end; wenzelm@34277: wenzelm@38133: fun promise () = promise_group (worker_subgroup ()); wenzelm@34277: wenzelm@42563: fun fulfill_result (Future {promised, task, result}) res = wenzelm@39520: if not promised then raise Fail "Not a promised future" wenzelm@39520: else wenzelm@39520: let wenzelm@42563: val group = Task_Queue.group_of_task task; wenzelm@39520: fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn); wenzelm@39520: val _ = wenzelm@39520: Multithreading.with_attributes Multithreading.no_interrupts (fn _ => wenzelm@39520: let wenzelm@39520: val still_passive = wenzelm@39520: SYNCHRONIZED "fulfill_result" (fn () => wenzelm@39520: Unsynchronized.change_result queue wenzelm@39520: (Task_Queue.dequeue_passive (Thread.self ()) task)); wenzelm@42563: in if still_passive then execute (task, [job]) else () end); wenzelm@42561: val _ = wenzelm@42561: worker_waiting (Task_Queue.init_deps [task]) wenzelm@42561: (fn () => Single_Assignment.await result); wenzelm@39520: in () end; wenzelm@34277: wenzelm@34277: fun fulfill x res = fulfill_result x (Exn.Result res); wenzelm@34277: wenzelm@34277: wenzelm@29431: (* cancellation *) wenzelm@28202: wenzelm@30621: fun interruptible_task f x = wenzelm@30621: if Multithreading.available then wenzelm@30621: Multithreading.with_attributes wenzelm@42563: (if is_some (worker_task ()) wenzelm@32298: then Multithreading.private_interrupts wenzelm@32298: else Multithreading.public_interrupts) wenzelm@32298: (fn _ => f x) wenzelm@30621: else interruptible f x; wenzelm@30621: wenzelm@32228: (*cancel: present and future group members will be interrupted eventually*) wenzelm@38122: fun cancel_group group = SYNCHRONIZED "cancel" (fn () => wenzelm@38122: (if cancel_now group then () else cancel_later group; wenzelm@38122: signal work_available; scheduler_check ())); wenzelm@38122: wenzelm@42563: fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x)); wenzelm@28206: wenzelm@29370: wenzelm@32228: (* shutdown *) wenzelm@29370: wenzelm@28203: fun shutdown () = wenzelm@28276: if Multithreading.available then wenzelm@28276: SYNCHRONIZED "shutdown" (fn () => wenzelm@32228: while scheduler_active () do wenzelm@34279: (wait scheduler_event; broadcast_work ())) wenzelm@28276: else (); wenzelm@28203: wenzelm@29370: wenzelm@38492: (* status markup *) wenzelm@37706: wenzelm@38492: fun status e = wenzelm@37706: let wenzelm@40705: val task_props = wenzelm@40705: (case worker_task () of wenzelm@40705: NONE => I wenzelm@40705: | SOME task => Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)]); wenzelm@40705: val _ = Output.status (Markup.markup (task_props Markup.forked) ""); wenzelm@37706: val x = e (); (*sic -- report "joined" only for success*) wenzelm@40705: val _ = Output.status (Markup.markup (task_props Markup.joined) ""); wenzelm@37706: in x end; wenzelm@37706: wenzelm@37706: wenzelm@29370: (*final declarations of this structure!*) wenzelm@29370: val map = map_future; wenzelm@29370: wenzelm@28156: end; wenzelm@28972: wenzelm@28972: type 'a future = 'a Future.future; wenzelm@28972: