more robust shutdown;
authorwenzelm
Thu, 24 Apr 2014 13:40:29 +0200
changeset 58038ff782c5450bf
parent 58037 963732291084
child 58039 76b38be47feb
more robust shutdown;
less ooddities;
src/Pure/Concurrent/consumer_thread.scala
     1.1 --- a/src/Pure/Concurrent/consumer_thread.scala	Thu Apr 24 13:13:48 2014 +0200
     1.2 +++ b/src/Pure/Concurrent/consumer_thread.scala	Thu Apr 24 13:40:29 2014 +0200
     1.3 @@ -11,19 +11,33 @@
     1.4  import scala.annotation.tailrec
     1.5  
     1.6  
     1.7 -class Consumer_Thread[A](name: String = "", daemon: Boolean = false)
     1.8 +object Consumer_Thread
     1.9  {
    1.10 -  def consume(x: A) { }
    1.11 -  def finish() { }
    1.12 +  def fork[A](name: String = "", daemon: Boolean = false)(consume: A => Unit): Consumer_Thread[A] =
    1.13 +    new Consumer_Thread[A](name, daemon, consume)
    1.14 +}
    1.15  
    1.16 +final class Consumer_Thread[A] private(name: String, daemon: Boolean, consume: A => Unit)
    1.17 +{
    1.18 +  private var ready = true
    1.19    private val mbox = Mailbox[Option[A]]
    1.20 +
    1.21    @tailrec private def loop(): Unit =
    1.22      mbox.receive match {
    1.23        case Some(x) => consume(x); loop()
    1.24 -      case None => finish()
    1.25 +      case None =>
    1.26      }
    1.27    private val thread = Simple_Thread.fork(name, daemon) { loop() }
    1.28  
    1.29 -  final def send(x: A) { mbox.send(Some(x)) }
    1.30 -  final def shutdown() { mbox.send(None); mbox.await_empty; thread.join }
    1.31 +  def send(x: A): Unit = synchronized {
    1.32 +    if (ready) mbox.send(Some(x))
    1.33 +    else error("Consumer thread not ready (after shutdown)")
    1.34 +  }
    1.35 +
    1.36 +  def shutdown(): Unit =
    1.37 +  {
    1.38 +    synchronized { if (ready) { ready = false; mbox.send(None) } }
    1.39 +    mbox.await_empty
    1.40 +    thread.join
    1.41 +  }
    1.42  }