1.1 --- a/src/Pure/Concurrent/future.ML Fri Sep 10 12:39:20 2010 +0200
1.2 +++ b/src/Pure/Concurrent/future.ML Fri Sep 10 14:54:08 2010 +0200
1.3 @@ -489,7 +489,11 @@
1.4 fun promise_group group : 'a future =
1.5 let
1.6 val result = Single_Assignment.var "promise" : 'a result;
1.7 - fun abort () = assign_result group result Exn.interrupt_exn handle Fail _ => true;
1.8 + fun abort () = assign_result group result Exn.interrupt_exn
1.9 + handle Fail _ => true
1.10 + | exn =>
1.11 + if Exn.is_interrupt exn then raise Fail "Concurrent attempt to fulfill promise"
1.12 + else reraise exn;
1.13 val task = SYNCHRONIZED "enqueue_passive" (fn () =>
1.14 Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));
1.15 in Future {promised = true, task = task, group = group, result = result} end;
1.16 @@ -497,11 +501,20 @@
1.17 fun promise () = promise_group (worker_subgroup ());
1.18
1.19 fun fulfill_result (Future {promised, task, group, result}) res =
1.20 - let
1.21 - val _ = promised orelse raise Fail "Not a promised future";
1.22 - fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
1.23 - val _ = execute (task, group, [job]);
1.24 - in () end;
1.25 + if not promised then raise Fail "Not a promised future"
1.26 + else
1.27 + let
1.28 + fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
1.29 + val _ =
1.30 + Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
1.31 + let
1.32 + val still_passive =
1.33 + SYNCHRONIZED "fulfill_result" (fn () =>
1.34 + Unsynchronized.change_result queue
1.35 + (Task_Queue.dequeue_passive (Thread.self ()) task));
1.36 + in if still_passive then execute (task, group, [job]) else () end);
1.37 + val _ = Single_Assignment.await result;
1.38 + in () end;
1.39
1.40 fun fulfill x res = fulfill_result x (Exn.Result res);
1.41