9 |
9 |
10 |
10 |
11 import scala.annotation.tailrec |
11 import scala.annotation.tailrec |
12 |
12 |
13 |
13 |
14 class Consumer_Thread[A](name: String = "", daemon: Boolean = false) |
14 object Consumer_Thread |
15 { |
15 { |
16 def consume(x: A) { } |
16 def fork[A](name: String = "", daemon: Boolean = false)(consume: A => Unit): Consumer_Thread[A] = |
17 def finish() { } |
17 new Consumer_Thread[A](name, daemon, consume) |
|
18 } |
18 |
19 |
|
20 final class Consumer_Thread[A] private(name: String, daemon: Boolean, consume: A => Unit) |
|
21 { |
|
22 private var ready = true |
19 private val mbox = Mailbox[Option[A]] |
23 private val mbox = Mailbox[Option[A]] |
|
24 |
20 @tailrec private def loop(): Unit = |
25 @tailrec private def loop(): Unit = |
21 mbox.receive match { |
26 mbox.receive match { |
22 case Some(x) => consume(x); loop() |
27 case Some(x) => consume(x); loop() |
23 case None => finish() |
28 case None => |
24 } |
29 } |
25 private val thread = Simple_Thread.fork(name, daemon) { loop() } |
30 private val thread = Simple_Thread.fork(name, daemon) { loop() } |
26 |
31 |
27 final def send(x: A) { mbox.send(Some(x)) } |
32 def send(x: A): Unit = synchronized { |
28 final def shutdown() { mbox.send(None); mbox.await_empty; thread.join } |
33 if (ready) mbox.send(Some(x)) |
|
34 else error("Consumer thread not ready (after shutdown)") |
|
35 } |
|
36 |
|
37 def shutdown(): Unit = |
|
38 { |
|
39 synchronized { if (ready) { ready = false; mbox.send(None) } } |
|
40 mbox.await_empty |
|
41 thread.join |
|
42 } |
29 } |
43 } |