1.1 --- a/src/Pure/Concurrent/consumer_thread.scala Thu Apr 24 13:54:45 2014 +0200
1.2 +++ b/src/Pure/Concurrent/consumer_thread.scala Thu Apr 24 14:51:41 2014 +0200
1.3 @@ -13,31 +13,37 @@
1.4
1.5 object Consumer_Thread
1.6 {
1.7 - def fork[A](name: String = "", daemon: Boolean = false)(consume: A => Unit): Consumer_Thread[A] =
1.8 - new Consumer_Thread[A](name, daemon, consume)
1.9 + def fork[A](name: String = "", daemon: Boolean = false)(
1.10 + consume: A => Boolean,
1.11 + finish: () => Unit = () => ()): Consumer_Thread[A] =
1.12 + new Consumer_Thread[A](name, daemon, consume, finish)
1.13 }
1.14
1.15 -final class Consumer_Thread[A] private(name: String, daemon: Boolean, consume: A => Unit)
1.16 +final class Consumer_Thread[A] private(
1.17 + name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit)
1.18 {
1.19 - private var ready = true
1.20 + private var active = true
1.21 private val mbox = Mailbox[Option[A]]
1.22
1.23 @tailrec private def loop(): Unit =
1.24 mbox.receive match {
1.25 - case Some(x) => consume(x); loop()
1.26 - case None =>
1.27 + case Some(x) =>
1.28 + val continue = consume(x)
1.29 + if (continue) loop() else finish()
1.30 + case None => finish()
1.31 }
1.32 private val thread = Simple_Thread.fork(name, daemon) { loop() }
1.33
1.34 + private def is_active: Boolean = active && thread.isAlive
1.35 +
1.36 def send(x: A): Unit = synchronized {
1.37 - if (ready) mbox.send(Some(x))
1.38 - else error("Consumer thread not ready (after shutdown)")
1.39 + if (is_active) mbox.send(Some(x))
1.40 + else error("Consumer thread not active")
1.41 }
1.42
1.43 def shutdown(): Unit =
1.44 {
1.45 - synchronized { if (ready) { ready = false; mbox.send(None) } }
1.46 - mbox.await_empty
1.47 + synchronized { if (is_active) { active = false; mbox.send(None) } }
1.48 thread.join
1.49 }
1.50 }