allow more control of main loop;
authorwenzelm
Thu, 24 Apr 2014 14:51:41 +0200
changeset 58040e0655270d3f3
parent 58039 76b38be47feb
child 58041 60ad80f5cb62
allow more control of main loop;
more robust is_active test, although thread could terminate at any time;
src/Pure/Concurrent/consumer_thread.scala
     1.1 --- a/src/Pure/Concurrent/consumer_thread.scala	Thu Apr 24 13:54:45 2014 +0200
     1.2 +++ b/src/Pure/Concurrent/consumer_thread.scala	Thu Apr 24 14:51:41 2014 +0200
     1.3 @@ -13,31 +13,37 @@
     1.4  
     1.5  object Consumer_Thread
     1.6  {
     1.7 -  def fork[A](name: String = "", daemon: Boolean = false)(consume: A => Unit): Consumer_Thread[A] =
     1.8 -    new Consumer_Thread[A](name, daemon, consume)
     1.9 +  def fork[A](name: String = "", daemon: Boolean = false)(
    1.10 +      consume: A => Boolean,
    1.11 +      finish: () => Unit = () => ()): Consumer_Thread[A] =
    1.12 +    new Consumer_Thread[A](name, daemon, consume, finish)
    1.13  }
    1.14  
    1.15 -final class Consumer_Thread[A] private(name: String, daemon: Boolean, consume: A => Unit)
    1.16 +final class Consumer_Thread[A] private(
    1.17 +  name: String, daemon: Boolean, consume: A => Boolean, finish: () => Unit)
    1.18  {
    1.19 -  private var ready = true
    1.20 +  private var active = true
    1.21    private val mbox = Mailbox[Option[A]]
    1.22  
    1.23    @tailrec private def loop(): Unit =
    1.24      mbox.receive match {
    1.25 -      case Some(x) => consume(x); loop()
    1.26 -      case None =>
    1.27 +      case Some(x) =>
    1.28 +        val continue = consume(x)
    1.29 +        if (continue) loop() else finish()
    1.30 +      case None => finish()
    1.31      }
    1.32    private val thread = Simple_Thread.fork(name, daemon) { loop() }
    1.33  
    1.34 +  private def is_active: Boolean = active && thread.isAlive
    1.35 +
    1.36    def send(x: A): Unit = synchronized {
    1.37 -    if (ready) mbox.send(Some(x))
    1.38 -    else error("Consumer thread not ready (after shutdown)")
    1.39 +    if (is_active) mbox.send(Some(x))
    1.40 +    else error("Consumer thread not active")
    1.41    }
    1.42  
    1.43    def shutdown(): Unit =
    1.44    {
    1.45 -    synchronized { if (ready) { ready = false; mbox.send(None) } }
    1.46 -    mbox.await_empty
    1.47 +    synchronized { if (is_active) { active = false; mbox.send(None) } }
    1.48      thread.join
    1.49    }
    1.50  }