1.1 --- a/src/Pure/Concurrent/consumer_thread.scala Thu Apr 24 13:13:48 2014 +0200
1.2 +++ b/src/Pure/Concurrent/consumer_thread.scala Thu Apr 24 13:40:29 2014 +0200
1.3 @@ -11,19 +11,33 @@
1.4 import scala.annotation.tailrec
1.5
1.6
1.7 -class Consumer_Thread[A](name: String = "", daemon: Boolean = false)
1.8 +object Consumer_Thread
1.9 {
1.10 - def consume(x: A) { }
1.11 - def finish() { }
1.12 + def fork[A](name: String = "", daemon: Boolean = false)(consume: A => Unit): Consumer_Thread[A] =
1.13 + new Consumer_Thread[A](name, daemon, consume)
1.14 +}
1.15
1.16 +final class Consumer_Thread[A] private(name: String, daemon: Boolean, consume: A => Unit)
1.17 +{
1.18 + private var ready = true
1.19 private val mbox = Mailbox[Option[A]]
1.20 +
1.21 @tailrec private def loop(): Unit =
1.22 mbox.receive match {
1.23 case Some(x) => consume(x); loop()
1.24 - case None => finish()
1.25 + case None =>
1.26 }
1.27 private val thread = Simple_Thread.fork(name, daemon) { loop() }
1.28
1.29 - final def send(x: A) { mbox.send(Some(x)) }
1.30 - final def shutdown() { mbox.send(None); mbox.await_empty; thread.join }
1.31 + def send(x: A): Unit = synchronized {
1.32 + if (ready) mbox.send(Some(x))
1.33 + else error("Consumer thread not ready (after shutdown)")
1.34 + }
1.35 +
1.36 + def shutdown(): Unit =
1.37 + {
1.38 + synchronized { if (ready) { ready = false; mbox.send(None) } }
1.39 + mbox.await_empty
1.40 + thread.join
1.41 + }
1.42 }