equal
deleted
inserted
replaced
46 val new_group: group option -> group |
46 val new_group: group option -> group |
47 val worker_task: unit -> task option |
47 val worker_task: unit -> task option |
48 val worker_group: unit -> group option |
48 val worker_group: unit -> group option |
49 val the_worker_group: unit -> group |
49 val the_worker_group: unit -> group |
50 val worker_subgroup: unit -> group |
50 val worker_subgroup: unit -> group |
51 val worker_context: string -> group -> ('a -> 'b) -> 'a -> 'b |
|
52 type 'a future |
51 type 'a future |
53 val task_of: 'a future -> task |
52 val task_of: 'a future -> task |
54 val peek: 'a future -> 'a Exn.result option |
53 val peek: 'a future -> 'a Exn.result option |
55 val is_finished: 'a future -> bool |
54 val is_finished: 'a future -> bool |
56 val ML_statistics: bool Unsynchronized.ref |
55 val ML_statistics: bool Unsynchronized.ref |
66 val join_results: 'a future list -> 'a Exn.result list |
65 val join_results: 'a future list -> 'a Exn.result list |
67 val join_result: 'a future -> 'a Exn.result |
66 val join_result: 'a future -> 'a Exn.result |
68 val joins: 'a future list -> 'a list |
67 val joins: 'a future list -> 'a list |
69 val join: 'a future -> 'a |
68 val join: 'a future -> 'a |
70 val join_tasks: task list -> unit |
69 val join_tasks: task list -> unit |
|
70 val task_context: string -> group -> ('a -> 'b) -> 'a -> 'b |
71 val value_result: 'a Exn.result -> 'a future |
71 val value_result: 'a Exn.result -> 'a future |
72 val value: 'a -> 'a future |
72 val value: 'a -> 'a future |
73 val cond_forks: params -> (unit -> 'a) list -> 'a future list |
73 val cond_forks: params -> (unit -> 'a) list -> 'a future list |
74 val map: ('a -> 'b) -> 'a future -> 'b future |
74 val map: ('a -> 'b) -> 'a future -> 'b future |
75 val promise_group: group -> (unit -> unit) -> 'a future |
75 val promise_group: group -> (unit -> unit) -> 'a future |
106 (case worker_group () of |
106 (case worker_group () of |
107 SOME group => group |
107 SOME group => group |
108 | NONE => raise Fail "Missing worker thread context"); |
108 | NONE => raise Fail "Missing worker thread context"); |
109 |
109 |
110 fun worker_subgroup () = new_group (worker_group ()); |
110 fun worker_subgroup () = new_group (worker_group ()); |
111 |
|
112 fun worker_context name group f x = |
|
113 setmp_worker_task (Task_Queue.new_task group name NONE) f x; |
|
114 |
111 |
115 fun worker_joining e = |
112 fun worker_joining e = |
116 (case worker_task () of |
113 (case worker_task () of |
117 NONE => e () |
114 NONE => e () |
118 | SOME task => Task_Queue.joining task e); |
115 | SOME task => Task_Queue.joining task e); |
469 in ok end; |
466 in ok end; |
470 |
467 |
471 |
468 |
472 (* future jobs *) |
469 (* future jobs *) |
473 |
470 |
474 fun future_job group interrupts (e: unit -> 'a) = |
471 fun future_job group atts (e: unit -> 'a) = |
475 let |
472 let |
476 val result = Single_Assignment.var "future" : 'a result; |
473 val result = Single_Assignment.var "future" : 'a result; |
477 val pos = Position.thread_data (); |
474 val pos = Position.thread_data (); |
478 fun job ok = |
475 fun job ok = |
479 let |
476 let |
480 val res = |
477 val res = |
481 if ok then |
478 if ok then |
482 Exn.capture (fn () => |
479 Exn.capture (fn () => |
483 Multithreading.with_attributes |
480 Multithreading.with_attributes atts (fn _ => Position.setmp_thread_data pos e ())) () |
484 (if interrupts |
|
485 then Multithreading.private_interrupts else Multithreading.no_interrupts) |
|
486 (fn _ => Position.setmp_thread_data pos e ())) () |
|
487 else Exn.interrupt_exn; |
481 else Exn.interrupt_exn; |
488 in assign_result group result (identify_result pos res) end; |
482 in assign_result group result (identify_result pos res) end; |
489 in (result, job) end; |
483 in (result, job) end; |
490 |
484 |
491 |
485 |
502 (case group of |
496 (case group of |
503 NONE => worker_subgroup () |
497 NONE => worker_subgroup () |
504 | SOME grp => grp); |
498 | SOME grp => grp); |
505 fun enqueue e queue = |
499 fun enqueue e queue = |
506 let |
500 let |
507 val (result, job) = future_job grp interrupts e; |
501 val atts = |
|
502 if interrupts |
|
503 then Multithreading.private_interrupts |
|
504 else Multithreading.no_interrupts; |
|
505 val (result, job) = future_job grp atts e; |
508 val (task, queue') = Task_Queue.enqueue name grp deps pri job queue; |
506 val (task, queue') = Task_Queue.enqueue name grp deps pri job queue; |
509 val future = Future {promised = false, task = task, result = result}; |
507 val future = Future {promised = false, task = task, result = result}; |
510 in (future, queue') end; |
508 in (future, queue') end; |
511 in |
509 in |
512 SYNCHRONIZED "enqueue" (fn () => |
510 SYNCHRONIZED "enqueue" (fn () => |
578 {name = "join_tasks", group = SOME (new_group NONE), |
576 {name = "join_tasks", group = SOME (new_group NONE), |
579 deps = tasks, pri = 0, interrupts = false} I |
577 deps = tasks, pri = 0, interrupts = false} I |
580 |> join; |
578 |> join; |
581 |
579 |
582 |
580 |
|
581 (* task context for running thread *) |
|
582 |
|
583 fun task_context name group f x = |
|
584 Multithreading.with_attributes Multithreading.no_interrupts (fn orig_atts => |
|
585 let |
|
586 val (result, job) = future_job group orig_atts (fn () => f x); |
|
587 val task = |
|
588 SYNCHRONIZED "enroll" (fn () => |
|
589 Unsynchronized.change_result queue (Task_Queue.enroll (Thread.self ()) name group)); |
|
590 val _ = worker_exec (task, [job]); |
|
591 in |
|
592 (case Single_Assignment.peek result of |
|
593 NONE => raise Fail "Missing task context result" |
|
594 | SOME res => Exn.release res) |
|
595 end); |
|
596 |
|
597 |
583 (* fast-path operations -- bypass task queue if possible *) |
598 (* fast-path operations -- bypass task queue if possible *) |
584 |
599 |
585 fun value_result (res: 'a Exn.result) = |
600 fun value_result (res: 'a Exn.result) = |
586 let |
601 let |
587 val task = Task_Queue.dummy_task; |
602 val task = Task_Queue.dummy_task; |
600 if is_finished x then value_result (Exn.interruptible_capture (f o join) x) |
615 if is_finished x then value_result (Exn.interruptible_capture (f o join) x) |
601 else |
616 else |
602 let |
617 let |
603 val task = task_of x; |
618 val task = task_of x; |
604 val group = Task_Queue.group_of_task task; |
619 val group = Task_Queue.group_of_task task; |
605 val (result, job) = future_job group true (fn () => f (join x)); |
620 val (result, job) = |
|
621 future_job group Multithreading.private_interrupts (fn () => f (join x)); |
606 |
622 |
607 val extended = SYNCHRONIZED "extend" (fn () => |
623 val extended = SYNCHRONIZED "extend" (fn () => |
608 (case Task_Queue.extend task job (! queue) of |
624 (case Task_Queue.extend task job (! queue) of |
609 SOME queue' => (queue := queue'; true) |
625 SOME queue' => (queue := queue'; true) |
610 | NONE => false)); |
626 | NONE => false)); |