wenzelm@28156: (* Title: Pure/Concurrent/future.ML wenzelm@28156: Author: Makarius wenzelm@28156: wenzelm@45151: Value-oriented parallelism via futures and promises. See also wenzelm@32250: http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf wenzelm@38175: http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf wenzelm@28201: wenzelm@28201: Notes: wenzelm@28201: wenzelm@28201: * Futures are similar to delayed evaluation, i.e. delay/force is wenzelm@45151: generalized to fork/join. The idea is to model parallel wenzelm@45151: value-oriented computations (not communicating processes). wenzelm@28201: wenzelm@28201: * Forked futures are evaluated spontaneously by a farm of worker wenzelm@28201: threads in the background; join resynchronizes the computation and wenzelm@28201: delivers results (values or exceptions). wenzelm@28201: wenzelm@28201: * The pool of worker threads is limited, usually in correlation with wenzelm@28201: the number of physical cores on the machine. Note that allocation wenzelm@45151: of runtime resources may be distorted either if workers yield CPU wenzelm@45151: time (e.g. via system sleep or wait operations), or if non-worker wenzelm@28201: threads contend for significant runtime resources independently. wenzelm@45151: There is a limited number of replacement worker threads that get wenzelm@45151: activated in certain explicit wait conditions. wenzelm@34277: wenzelm@45151: * Future tasks are organized in groups, which are block-structured. wenzelm@45151: When forking a new new task, the default is to open an individual wenzelm@45151: subgroup, unless some common group is specified explicitly. wenzelm@50923: Failure of one group member causes peer and subgroup members to be wenzelm@50923: interrupted eventually. Interrupted tasks that lack regular wenzelm@50923: result information, will pick up parallel exceptions from the wenzelm@50923: cumulative group context (as Par_Exn). wenzelm@45151: wenzelm@45174: * Future task groups may be canceled: present and future group wenzelm@45174: members will be interrupted eventually. wenzelm@45174: wenzelm@45151: * Promised "passive" futures are fulfilled by external means. There wenzelm@45151: is no associated evaluation task, but other futures can depend on wenzelm@45151: them via regular join operations. wenzelm@28156: *) wenzelm@28156: wenzelm@28156: signature FUTURE = wenzelm@28156: sig wenzelm@45175: type task = Task_Queue.task wenzelm@45175: type group = Task_Queue.group wenzelm@45175: val new_group: group option -> group wenzelm@45175: val worker_task: unit -> task option wenzelm@45175: val worker_group: unit -> group option wenzelm@53740: val the_worker_group: unit -> group wenzelm@45175: val worker_subgroup: unit -> group wenzelm@55286: val worker_context: string -> group -> ('a -> 'b) -> 'a -> 'b wenzelm@28972: type 'a future wenzelm@45175: val task_of: 'a future -> task wenzelm@28972: val peek: 'a future -> 'a Exn.result option wenzelm@28972: val is_finished: 'a future -> bool wenzelm@51295: val ML_statistics: bool Unsynchronized.ref wenzelm@45176: val interruptible_task: ('a -> 'b) -> 'a -> 'b wenzelm@48275: val cancel_group: group -> unit wenzelm@48275: val cancel: 'a future -> unit wenzelm@51929: val error_msg: Position.T -> (serial * string) * string option -> unit wenzelm@51929: val identify_result: Position.T -> 'a Exn.result -> 'a Exn.result wenzelm@45298: type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool} wenzelm@45298: val default_params: params wenzelm@45298: val forks: params -> (unit -> 'a) list -> 'a future list wenzelm@32729: val fork: (unit -> 'a) -> 'a future wenzelm@28972: val join_results: 'a future list -> 'a Exn.result list wenzelm@28972: val join_result: 'a future -> 'a Exn.result wenzelm@45210: val joins: 'a future list -> 'a list wenzelm@28972: val join: 'a future -> 'a wenzelm@55257: val join_tasks: task list -> unit wenzelm@45169: val value_result: 'a Exn.result -> 'a future wenzelm@34277: val value: 'a -> 'a future wenzelm@45298: val cond_forks: params -> (unit -> 'a) list -> 'a future list wenzelm@28972: val map: ('a -> 'b) -> 'a future -> 'b future wenzelm@45175: val promise_group: group -> (unit -> unit) -> 'a future wenzelm@45173: val promise: (unit -> unit) -> 'a future wenzelm@34277: val fulfill_result: 'a future -> 'a Exn.result -> unit wenzelm@34277: val fulfill: 'a future -> 'a -> unit wenzelm@55257: val group_snapshot: group -> task list wenzelm@50921: val terminate: group -> unit wenzelm@28203: val shutdown: unit -> unit wenzelm@28156: end; wenzelm@28156: wenzelm@28156: structure Future: FUTURE = wenzelm@28156: struct wenzelm@28156: wenzelm@28177: (** future values **) wenzelm@28177: wenzelm@45175: type task = Task_Queue.task; wenzelm@45175: type group = Task_Queue.group; wenzelm@45175: val new_group = Task_Queue.new_group; wenzelm@45175: wenzelm@45175: wenzelm@28167: (* identifiers *) wenzelm@28167: wenzelm@32074: local wenzelm@45175: val tag = Universal.tag () : task option Universal.tag; wenzelm@32074: in wenzelm@42563: fun worker_task () = the_default NONE (Thread.getLocal tag); wenzelm@44993: fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x; wenzelm@28167: end; wenzelm@28167: wenzelm@42563: val worker_group = Option.map Task_Queue.group_of_task o worker_task; wenzelm@53740: wenzelm@53740: fun the_worker_group () = wenzelm@53740: (case worker_group () of wenzelm@53740: SOME group => group wenzelm@53740: | NONE => raise Fail "Missing worker thread context"); wenzelm@53740: wenzelm@45175: fun worker_subgroup () = new_group (worker_group ()); wenzelm@34277: wenzelm@55286: fun worker_context name group f x = wenzelm@55286: setmp_worker_task (Task_Queue.new_task group name NONE) f x; wenzelm@53695: wenzelm@42559: fun worker_joining e = wenzelm@42559: (case worker_task () of wenzelm@42559: NONE => e () wenzelm@42559: | SOME task => Task_Queue.joining task e); wenzelm@42559: wenzelm@42560: fun worker_waiting deps e = wenzelm@42541: (case worker_task () of wenzelm@42541: NONE => e () wenzelm@42560: | SOME task => Task_Queue.waiting task deps e); wenzelm@42541: wenzelm@28167: wenzelm@28167: (* datatype future *) wenzelm@28167: wenzelm@35016: type 'a result = 'a Exn.result Single_Assignment.var; wenzelm@35016: wenzelm@28972: datatype 'a future = Future of wenzelm@34277: {promised: bool, wenzelm@45175: task: task, wenzelm@35016: result: 'a result}; wenzelm@28167: wenzelm@28167: fun task_of (Future {task, ...}) = task; wenzelm@32257: fun result_of (Future {result, ...}) = result; wenzelm@28167: wenzelm@35016: fun peek x = Single_Assignment.peek (result_of x); wenzelm@28558: fun is_finished x = is_some (peek x); wenzelm@28320: wenzelm@28167: wenzelm@28177: wenzelm@28177: (** scheduling **) wenzelm@28177: wenzelm@28177: (* synchronization *) wenzelm@28156: wenzelm@32219: val scheduler_event = ConditionVar.conditionVar (); wenzelm@32219: val work_available = ConditionVar.conditionVar (); wenzelm@32219: val work_finished = ConditionVar.conditionVar (); wenzelm@32219: wenzelm@28156: local wenzelm@28156: val lock = Mutex.mutex (); wenzelm@28156: in wenzelm@28156: wenzelm@37216: fun SYNCHRONIZED name = Simple_Thread.synchronized name lock; wenzelm@28156: wenzelm@32219: fun wait cond = (*requires SYNCHRONIZED*) wenzelm@32298: Multithreading.sync_wait NONE NONE cond lock; wenzelm@28206: wenzelm@32298: fun wait_timeout timeout cond = (*requires SYNCHRONIZED*) wenzelm@32298: Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock; wenzelm@28166: wenzelm@32219: fun signal cond = (*requires SYNCHRONIZED*) wenzelm@32219: ConditionVar.signal cond; wenzelm@32219: wenzelm@32219: fun broadcast cond = (*requires SYNCHRONIZED*) wenzelm@28166: ConditionVar.broadcast cond; wenzelm@28156: wenzelm@28156: end; wenzelm@28156: wenzelm@28156: wenzelm@33431: (* global state *) wenzelm@33431: wenzelm@33431: val queue = Unsynchronized.ref Task_Queue.empty; wenzelm@33431: val next = Unsynchronized.ref 0; wenzelm@33431: val scheduler = Unsynchronized.ref (NONE: Thread.thread option); wenzelm@45175: val canceled = Unsynchronized.ref ([]: group list); wenzelm@33431: val do_shutdown = Unsynchronized.ref false; wenzelm@33431: val max_workers = Unsynchronized.ref 0; wenzelm@33431: val max_active = Unsynchronized.ref 0; wenzelm@33432: val worker_trend = Unsynchronized.ref 0; wenzelm@33431: wenzelm@51295: val status_ticks = Unsynchronized.ref 0; wenzelm@51295: val last_round = Unsynchronized.ref Time.zeroTime; wenzelm@51295: val next_round = seconds 0.05; wenzelm@51295: wenzelm@33431: datatype worker_state = Working | Waiting | Sleeping; wenzelm@33431: val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list); wenzelm@33431: wenzelm@33431: fun count_workers state = (*requires SYNCHRONIZED*) wenzelm@33431: fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0; wenzelm@33431: wenzelm@33431: wenzelm@51295: wenzelm@51295: (* status *) wenzelm@51295: wenzelm@51295: val ML_statistics = Unsynchronized.ref false; wenzelm@51295: wenzelm@51295: fun report_status () = (*requires SYNCHRONIZED*) wenzelm@51295: if ! ML_statistics then wenzelm@51295: let wenzelm@51295: val {ready, pending, running, passive} = Task_Queue.status (! queue); wenzelm@51295: val total = length (! workers); wenzelm@51295: val active = count_workers Working; wenzelm@51295: val waiting = count_workers Waiting; wenzelm@51295: val stats = wenzelm@53127: [("now", Markup.print_real (Time.toReal (Time.now ()))), wenzelm@51295: ("tasks_ready", Markup.print_int ready), wenzelm@51295: ("tasks_pending", Markup.print_int pending), wenzelm@51295: ("tasks_running", Markup.print_int running), wenzelm@51295: ("tasks_passive", Markup.print_int passive), wenzelm@51295: ("workers_total", Markup.print_int total), wenzelm@51295: ("workers_active", Markup.print_int active), wenzelm@51295: ("workers_waiting", Markup.print_int waiting)] @ wenzelm@51295: ML_Statistics.get (); wenzelm@52798: in Output.try_protocol_message (Markup.ML_statistics :: stats) "" end wenzelm@51295: else (); wenzelm@51295: wenzelm@51295: wenzelm@44993: (* cancellation primitives *) wenzelm@32119: wenzelm@34279: fun cancel_now group = (*requires SYNCHRONIZED*) wenzelm@45221: let wenzelm@48275: val running = Task_Queue.cancel (! queue) group; wenzelm@50909: val _ = running |> List.app (fn thread => wenzelm@50909: if Simple_Thread.is_self thread then () wenzelm@50909: else Simple_Thread.interrupt_unsynchronized thread); wenzelm@48275: in running end; wenzelm@45221: wenzelm@45221: fun cancel_all () = (*requires SYNCHRONIZED*) wenzelm@45221: let wenzelm@45221: val (groups, threads) = Task_Queue.cancel_all (! queue); wenzelm@45221: val _ = List.app Simple_Thread.interrupt_unsynchronized threads; wenzelm@45221: in groups end; wenzelm@34279: wenzelm@34279: fun cancel_later group = (*requires SYNCHRONIZED*) wenzelm@32738: (Unsynchronized.change canceled (insert Task_Queue.eq_group group); wenzelm@32738: broadcast scheduler_event); wenzelm@29341: wenzelm@45176: fun interruptible_task f x = wenzelm@45176: (if Multithreading.available then wenzelm@45176: Multithreading.with_attributes wenzelm@45176: (if is_some (worker_task ()) wenzelm@45176: then Multithreading.private_interrupts wenzelm@45176: else Multithreading.public_interrupts) wenzelm@45176: (fn _ => f x) wenzelm@45176: else interruptible f x) wenzelm@45176: before Multithreading.interrupted (); wenzelm@45176: wenzelm@45176: wenzelm@44993: (* worker threads *) wenzelm@44993: wenzelm@44993: fun worker_exec (task, jobs) = wenzelm@28167: let wenzelm@42563: val group = Task_Queue.group_of_task task; wenzelm@32122: val valid = not (Task_Queue.is_canceled group); wenzelm@42541: val ok = wenzelm@42541: Task_Queue.running task (fn () => wenzelm@42563: setmp_worker_task task (fn () => wenzelm@42541: fold (fn job => fn ok => job valid andalso ok) jobs true) ()); wenzelm@51990: val _ = wenzelm@51990: if ! Multithreading.trace >= 2 then wenzelm@52798: Output.try_protocol_message (Markup.task_statistics :: Task_Queue.task_statistics task) "" wenzelm@51990: else (); wenzelm@32250: val _ = SYNCHRONIZED "finish" (fn () => wenzelm@32219: let wenzelm@32738: val maximal = Unsynchronized.change_result queue (Task_Queue.finish task); wenzelm@45170: val test = Exn.capture Multithreading.interrupted (); wenzelm@32219: val _ = wenzelm@45170: if ok andalso not (Exn.is_interrupt_exn test) then () wenzelm@45174: else if null (cancel_now group) then () wenzelm@34279: else cancel_later group; wenzelm@32219: val _ = broadcast work_finished; wenzelm@33434: val _ = if maximal then () else signal work_available; wenzelm@32219: in () end); wenzelm@28167: in () end; wenzelm@28167: wenzelm@33431: fun worker_wait active cond = (*requires SYNCHRONIZED*) wenzelm@53695: (case AList.lookup Thread.equal (! workers) (Thread.self ()) of wenzelm@53695: SOME state => wenzelm@53695: (state := (if active then Waiting else Sleeping); wenzelm@53695: wait cond; wenzelm@53695: state := Working) wenzelm@53695: | NONE => ignore (wait cond)); wenzelm@28162: wenzelm@33436: fun worker_next () = (*requires SYNCHRONIZED*) wenzelm@33427: if length (! workers) > ! max_workers then wenzelm@33427: (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ())); wenzelm@33436: signal work_available; wenzelm@28167: NONE) wenzelm@33431: else if count_workers Working > ! max_active then wenzelm@33436: (worker_wait false work_available; worker_next ()) wenzelm@28166: else wenzelm@32738: (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of wenzelm@33436: NONE => (worker_wait false work_available; worker_next ()) wenzelm@33434: | some => (signal work_available; some)); wenzelm@28156: wenzelm@28167: fun worker_loop name = wenzelm@33436: (case SYNCHRONIZED name (fn () => worker_next ()) of wenzelm@29120: NONE => () wenzelm@45170: | SOME work => (worker_exec work; worker_loop name)); wenzelm@28156: wenzelm@33428: fun worker_start name = (*requires SYNCHRONIZED*) wenzelm@37216: Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name), wenzelm@33431: Unsynchronized.ref Working)); wenzelm@28156: wenzelm@28156: wenzelm@28156: (* scheduler *) wenzelm@28156: wenzelm@28206: fun scheduler_next () = (*requires SYNCHRONIZED*) wenzelm@28156: let wenzelm@33428: val now = Time.now (); wenzelm@33428: val tick = Time.<= (Time.+ (! last_round, next_round), now); wenzelm@33428: val _ = if tick then last_round := now else (); wenzelm@33428: wenzelm@33436: wenzelm@51295: (* runtime status *) wenzelm@33436: wenzelm@32226: val _ = wenzelm@52014: if tick then Unsynchronized.change status_ticks (fn i => i + 1) else (); wenzelm@33428: val _ = wenzelm@52014: if tick andalso ! status_ticks mod (if ! Multithreading.trace >= 1 then 2 else 10) = 0 wenzelm@52014: then report_status () else (); wenzelm@32053: wenzelm@28191: val _ = wenzelm@32219: if forall (Thread.isActive o #1) (! workers) then () wenzelm@32115: else wenzelm@33430: let wenzelm@37698: val (alive, dead) = List.partition (Thread.isActive o #1) (! workers); wenzelm@33430: val _ = workers := alive; wenzelm@33430: in wenzelm@33430: Multithreading.tracing 0 (fn () => wenzelm@52416: "SCHEDULER: disposed " ^ string_of_int (length dead) ^ " dead worker threads") wenzelm@33430: end; wenzelm@28191: wenzelm@33436: wenzelm@33436: (* worker pool adjustments *) wenzelm@33436: wenzelm@33436: val max_active0 = ! max_active; wenzelm@33436: val max_workers0 = ! max_workers; wenzelm@33436: wenzelm@28206: val m = if ! do_shutdown then 0 else Multithreading.max_threads_value (); wenzelm@33427: val _ = max_active := m; wenzelm@33427: wenzelm@33432: val mm = wenzelm@33432: if ! do_shutdown then 0 wenzelm@33434: else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m); wenzelm@33432: val _ = wenzelm@33432: if tick andalso mm > ! max_workers then wenzelm@33432: Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1) wenzelm@33432: else if tick andalso mm < ! max_workers then wenzelm@33432: Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1) wenzelm@33432: else (); wenzelm@33432: val _ = wenzelm@33436: if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then wenzelm@33436: max_workers := mm wenzelm@45044: else if ! worker_trend > 5 andalso ! max_workers < 2 * m orelse ! max_workers = 0 then wenzelm@33436: max_workers := Int.min (mm, 2 * m) wenzelm@33432: else (); wenzelm@33427: wenzelm@33428: val missing = ! max_workers - length (! workers); wenzelm@28203: val _ = wenzelm@33428: if missing > 0 then wenzelm@33436: funpow missing (fn () => wenzelm@33436: ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) () wenzelm@28203: else (); wenzelm@28206: wenzelm@33436: val _ = wenzelm@33436: if ! max_active = max_active0 andalso ! max_workers = max_workers0 then () wenzelm@33436: else signal work_available; wenzelm@33436: wenzelm@33436: wenzelm@33436: (* canceled groups *) wenzelm@33436: wenzelm@32225: val _ = wenzelm@32225: if null (! canceled) then () wenzelm@32293: else wenzelm@32293: (Multithreading.tracing 1 (fn () => wenzelm@32293: string_of_int (length (! canceled)) ^ " canceled groups"); wenzelm@45174: Unsynchronized.change canceled (filter_out (null o cancel_now)); wenzelm@52418: signal work_available); wenzelm@28206: wenzelm@33436: wenzelm@33436: (* delay loop *) wenzelm@33436: wenzelm@32298: val _ = Exn.release (wait_timeout next_round scheduler_event); wenzelm@32219: wenzelm@33436: wenzelm@33436: (* shutdown *) wenzelm@33436: wenzelm@34277: val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else (); wenzelm@32219: val continue = not (! do_shutdown andalso null (! workers)); wenzelm@51444: val _ = if continue then () else (report_status (); scheduler := NONE); wenzelm@33436: wenzelm@32219: val _ = broadcast scheduler_event; wenzelm@32298: in continue end wenzelm@39509: handle exn => wenzelm@39509: if Exn.is_interrupt exn then wenzelm@52416: (Multithreading.tracing 1 (fn () => "SCHEDULER: Interrupt"); wenzelm@45221: List.app cancel_later (cancel_all ()); wenzelm@52418: signal work_available; true) wenzelm@39509: else reraise exn; wenzelm@32298: wenzelm@28206: fun scheduler_loop () = wenzelm@45044: (while wenzelm@33437: Multithreading.with_attributes wenzelm@33437: (Multithreading.sync_interrupts Multithreading.public_interrupts) wenzelm@33437: (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ())) wenzelm@51444: do (); last_round := Time.zeroTime); wenzelm@28191: wenzelm@28203: fun scheduler_active () = (*requires SYNCHRONIZED*) wenzelm@28203: (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread); wenzelm@28203: wenzelm@32228: fun scheduler_check () = (*requires SYNCHRONIZED*) wenzelm@32228: (do_shutdown := false; wenzelm@32252: if scheduler_active () then () wenzelm@37216: else scheduler := SOME (Simple_Thread.fork false scheduler_loop)); wenzelm@28191: wenzelm@45176: wenzelm@45176: wenzelm@45176: (** futures **) wenzelm@45176: wenzelm@45176: (* cancel *) wenzelm@45176: wenzelm@50921: fun cancel_group_unsynchronized group = (*requires SYNCHRONIZED*) wenzelm@45174: let wenzelm@48292: val _ = if null (cancel_now group) then () else cancel_later group; wenzelm@48292: val _ = signal work_available; wenzelm@48292: val _ = scheduler_check (); wenzelm@50921: in () end; wenzelm@50921: wenzelm@50921: fun cancel_group group = wenzelm@50921: SYNCHRONIZED "cancel_group" (fn () => cancel_group_unsynchronized group); wenzelm@45174: wenzelm@45176: fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x)); wenzelm@28191: wenzelm@28191: wenzelm@51929: (* results *) wenzelm@44993: wenzelm@51929: fun error_msg pos ((serial, msg), exec_id) = wenzelm@51931: Position.setmp_thread_data pos (fn () => wenzelm@51946: let val id = Position.get_id pos in wenzelm@51946: if is_none id orelse is_none exec_id orelse id = exec_id wenzelm@51946: then Output.error_msg' (serial, msg) else () wenzelm@51946: end) (); wenzelm@51929: wenzelm@51929: fun identify_result pos res = wenzelm@51929: (case res of wenzelm@51929: Exn.Exn exn => wenzelm@51929: let val exec_id = wenzelm@51929: (case Position.get_id pos of wenzelm@51929: NONE => [] wenzelm@51929: | SOME id => [(Markup.exec_idN, id)]) wenzelm@51929: in Exn.Exn (Par_Exn.identify exec_id exn) end wenzelm@51929: | _ => res); wenzelm@51929: wenzelm@51929: fun assign_result group result res = wenzelm@44993: let wenzelm@44993: val _ = Single_Assignment.assign result res wenzelm@44993: handle exn as Fail _ => wenzelm@44993: (case Single_Assignment.peek result of wenzelm@44993: SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn) wenzelm@44993: | _ => reraise exn); wenzelm@44993: val ok = wenzelm@44993: (case the (Single_Assignment.peek result) of wenzelm@44994: Exn.Exn exn => wenzelm@44994: (SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false) wenzelm@44993: | Exn.Res _ => true); wenzelm@44993: in ok end; wenzelm@44993: wenzelm@51929: wenzelm@51929: (* future jobs *) wenzelm@51929: wenzelm@44996: fun future_job group interrupts (e: unit -> 'a) = wenzelm@44993: let wenzelm@44993: val result = Single_Assignment.var "future" : 'a result; wenzelm@44993: val pos = Position.thread_data (); wenzelm@44993: fun job ok = wenzelm@44993: let wenzelm@44993: val res = wenzelm@44993: if ok then wenzelm@44993: Exn.capture (fn () => wenzelm@44996: Multithreading.with_attributes wenzelm@44996: (if interrupts wenzelm@44996: then Multithreading.private_interrupts else Multithreading.no_interrupts) wenzelm@45170: (fn _ => Position.setmp_thread_data pos e ())) () wenzelm@44993: else Exn.interrupt_exn; wenzelm@51929: in assign_result group result (identify_result pos res) end; wenzelm@44993: in (result, job) end; wenzelm@44993: wenzelm@44993: wenzelm@29370: (* fork *) wenzelm@29370: wenzelm@45298: type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool}; wenzelm@45298: val default_params: params = {name = "", group = NONE, deps = [], pri = 0, interrupts = true}; wenzelm@44996: wenzelm@45298: fun forks ({name, group, deps, pri, interrupts}: params) es = wenzelm@42545: if null es then [] wenzelm@42545: else wenzelm@42545: let wenzelm@42545: val grp = wenzelm@42545: (case group of wenzelm@42545: NONE => worker_subgroup () wenzelm@42545: | SOME grp => grp); wenzelm@42579: fun enqueue e queue = wenzelm@42545: let wenzelm@44996: val (result, job) = future_job grp interrupts e; wenzelm@42579: val (task, queue') = Task_Queue.enqueue name grp deps pri job queue; wenzelm@42563: val future = Future {promised = false, task = task, result = result}; wenzelm@42579: in (future, queue') end; wenzelm@42545: in wenzelm@42545: SYNCHRONIZED "enqueue" (fn () => wenzelm@42545: let wenzelm@42579: val (futures, queue') = fold_map enqueue es (! queue); wenzelm@42579: val _ = queue := queue'; wenzelm@42579: val minimal = forall (not o Task_Queue.known_task queue') deps; wenzelm@42545: val _ = if minimal then signal work_available else (); wenzelm@42545: val _ = scheduler_check (); wenzelm@42545: in futures end) wenzelm@42545: end; wenzelm@28162: wenzelm@51998: fun fork e = wenzelm@51998: (singleton o forks) {name = "fork", group = NONE, deps = [], pri = 0, interrupts = true} e; wenzelm@28162: wenzelm@28186: wenzelm@29370: (* join *) wenzelm@28186: wenzelm@32119: fun get_result x = wenzelm@32119: (case peek x of wenzelm@38120: NONE => Exn.Exn (Fail "Unfinished future") wenzelm@39509: | SOME res => wenzelm@39509: if Exn.is_interrupt_exn res then wenzelm@45125: (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of wenzelm@54327: [] => res wenzelm@54327: | exns => Exn.Exn (Par_Exn.make exns)) wenzelm@39509: else res); wenzelm@28177: wenzelm@50950: local wenzelm@50950: wenzelm@32115: fun join_next deps = (*requires SYNCHRONIZED*) wenzelm@42566: if null deps then NONE wenzelm@32224: else wenzelm@42561: (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of wenzelm@42566: (NONE, []) => NONE wenzelm@42566: | (NONE, deps') => wenzelm@42566: (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps') wenzelm@32224: | (SOME work, deps') => SOME (work, deps')); wenzelm@32115: wenzelm@32823: fun execute_work NONE = () wenzelm@44993: | execute_work (SOME (work, deps')) = wenzelm@44993: (worker_joining (fn () => worker_exec work); join_work deps') wenzelm@32823: and join_work deps = wenzelm@44409: Multithreading.with_attributes Multithreading.no_interrupts wenzelm@44409: (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps))); wenzelm@32823: wenzelm@29551: in wenzelm@29551: wenzelm@29370: fun join_results xs = wenzelm@42559: let wenzelm@42559: val _ = wenzelm@42559: if forall is_finished xs then () wenzelm@42559: else if Multithreading.self_critical () then wenzelm@53740: raise Fail "Cannot join future values within critical section" wenzelm@42566: else if is_some (worker_task ()) then join_work (map task_of xs) wenzelm@42561: else List.app (ignore o Single_Assignment.await o result_of) xs; wenzelm@42559: in map get_result xs end; wenzelm@28186: wenzelm@29551: end; wenzelm@29551: wenzelm@28647: fun join_result x = singleton join_results x; wenzelm@45210: fun joins xs = Par_Exn.release_all (join_results xs); wenzelm@28647: fun join x = Exn.release (join_result x); wenzelm@28156: wenzelm@55257: fun join_tasks tasks = wenzelm@55257: if null tasks then () wenzelm@55257: else wenzelm@55257: (singleton o forks) wenzelm@55257: {name = "join_tasks", group = SOME (new_group NONE), wenzelm@55257: deps = tasks, pri = 0, interrupts = false} I wenzelm@55257: |> join; wenzelm@55257: wenzelm@28979: wenzelm@52462: (* fast-path operations -- bypass task queue if possible *) wenzelm@34277: wenzelm@45169: fun value_result (res: 'a Exn.result) = wenzelm@34277: let wenzelm@46007: val task = Task_Queue.dummy_task; wenzelm@42563: val group = Task_Queue.group_of_task task; wenzelm@35016: val result = Single_Assignment.var "value" : 'a result; wenzelm@51929: val _ = assign_result group result (identify_result (Position.thread_data ()) res); wenzelm@42563: in Future {promised = false, task = task, result = result} end; wenzelm@28191: wenzelm@45169: fun value x = value_result (Exn.Res x); wenzelm@45169: wenzelm@45210: fun cond_forks args es = wenzelm@45210: if Multithreading.enabled () then forks args es wenzelm@45210: else map (fn e => value_result (Exn.interruptible_capture e ())) es; wenzelm@45210: wenzelm@29384: fun map_future f x = wenzelm@52462: if is_finished x then value_result (Exn.interruptible_capture (f o join) x) wenzelm@50950: else wenzelm@50950: let wenzelm@50950: val task = task_of x; wenzelm@50950: val group = Task_Queue.group_of_task task; wenzelm@50950: val (result, job) = future_job group true (fn () => f (join x)); wenzelm@29384: wenzelm@50950: val extended = SYNCHRONIZED "extend" (fn () => wenzelm@50950: (case Task_Queue.extend task job (! queue) of wenzelm@50950: SOME queue' => (queue := queue'; true) wenzelm@50950: | NONE => false)); wenzelm@50950: in wenzelm@50950: if extended then Future {promised = false, task = task, result = result} wenzelm@50950: else wenzelm@50950: (singleton o cond_forks) wenzelm@50950: {name = "map_future", group = SOME group, deps = [task], wenzelm@50950: pri = Task_Queue.pri_of_task task, interrupts = true} wenzelm@50950: (fn () => f (join x)) wenzelm@50950: end; wenzelm@29370: wenzelm@29370: wenzelm@34277: (* promised futures -- fulfilled by external means *) wenzelm@34277: wenzelm@45173: fun promise_group group abort : 'a future = wenzelm@34277: let wenzelm@35016: val result = Single_Assignment.var "promise" : 'a result; wenzelm@45173: fun assign () = assign_result group result Exn.interrupt_exn wenzelm@39520: handle Fail _ => true wenzelm@39520: | exn => wenzelm@45173: if Exn.is_interrupt exn wenzelm@45173: then raise Fail "Concurrent attempt to fulfill promise" wenzelm@39520: else reraise exn; wenzelm@45173: fun job () = wenzelm@45173: Multithreading.with_attributes Multithreading.no_interrupts wenzelm@48294: (fn _ => Exn.release (Exn.capture assign () before abort ())); wenzelm@38122: val task = SYNCHRONIZED "enqueue_passive" (fn () => wenzelm@45173: Unsynchronized.change_result queue (Task_Queue.enqueue_passive group job)); wenzelm@42563: in Future {promised = true, task = task, result = result} end; wenzelm@34277: wenzelm@45173: fun promise abort = promise_group (worker_subgroup ()) abort; wenzelm@34277: wenzelm@42563: fun fulfill_result (Future {promised, task, result}) res = wenzelm@39520: if not promised then raise Fail "Not a promised future" wenzelm@39520: else wenzelm@39520: let wenzelm@42563: val group = Task_Queue.group_of_task task; wenzelm@51929: val pos = Position.thread_data (); wenzelm@51929: fun job ok = wenzelm@51929: assign_result group result (if ok then identify_result pos res else Exn.interrupt_exn); wenzelm@39520: val _ = wenzelm@39520: Multithreading.with_attributes Multithreading.no_interrupts (fn _ => wenzelm@39520: let wenzelm@48294: val passive_job = wenzelm@39520: SYNCHRONIZED "fulfill_result" (fn () => wenzelm@39520: Unsynchronized.change_result queue wenzelm@39520: (Task_Queue.dequeue_passive (Thread.self ()) task)); wenzelm@48294: in wenzelm@48294: (case passive_job of wenzelm@48294: SOME true => worker_exec (task, [job]) wenzelm@48294: | SOME false => () wenzelm@48294: | NONE => ignore (job (not (Task_Queue.is_canceled group)))) wenzelm@48294: end); wenzelm@42561: val _ = wenzelm@42566: if is_some (Single_Assignment.peek result) then () wenzelm@42566: else worker_waiting [task] (fn () => ignore (Single_Assignment.await result)); wenzelm@39520: in () end; wenzelm@34277: wenzelm@44633: fun fulfill x res = fulfill_result x (Exn.Res res); wenzelm@34277: wenzelm@34277: wenzelm@55257: (* group snapshot *) wenzelm@55257: wenzelm@55257: fun group_snapshot group = wenzelm@55257: SYNCHRONIZED "group_snapshot" (fn () => wenzelm@55257: Task_Queue.group_tasks (! queue) group); wenzelm@55257: wenzelm@55257: wenzelm@50921: (* terminate *) wenzelm@50921: wenzelm@50921: fun terminate group = wenzelm@55257: SYNCHRONIZED "terminate" (fn () => wenzelm@55257: let val _ = cancel_group_unsynchronized group; wenzelm@55257: in Task_Queue.group_tasks (! queue) group end) wenzelm@55257: |> join_tasks; wenzelm@50921: wenzelm@50921: wenzelm@32228: (* shutdown *) wenzelm@29370: wenzelm@28203: fun shutdown () = wenzelm@52420: if not Multithreading.available then () wenzelm@52420: else if is_some (worker_task ()) then wenzelm@52420: raise Fail "Cannot shutdown while running as worker thread" wenzelm@52420: else wenzelm@28276: SYNCHRONIZED "shutdown" (fn () => wenzelm@52416: while scheduler_active () do wenzelm@52416: (Multithreading.tracing 1 (fn () => "SHUTDOWN: wait"); wenzelm@52420: wait scheduler_event)); wenzelm@28203: wenzelm@29370: wenzelm@29370: (*final declarations of this structure!*) wenzelm@29370: val map = map_future; wenzelm@29370: wenzelm@28156: end; wenzelm@28972: wenzelm@28972: type 'a future = 'a Future.future; wenzelm@28972: