parallel scheduling of jobs;
authorwenzelm
Sun, 22 Jul 2012 23:31:57 +0200
changeset 494400d95980e9aae
parent 49439 e6b0c14f04c8
child 49444 4b7f4482c552
parallel scheduling of jobs;
misc tuning;
lib/Tools/build
src/Pure/System/build.scala
src/Pure/library.scala
     1.1 --- a/lib/Tools/build	Sun Jul 22 21:59:14 2012 +0200
     1.2 +++ b/lib/Tools/build	Sun Jul 22 23:31:57 2012 +0200
     1.3 @@ -18,8 +18,10 @@
     1.4    echo "    -a           all sessions"
     1.5    echo "    -b           build target images"
     1.6    echo "    -d DIR       additional session directory with ROOT file"
     1.7 +  echo "    -j INT       maximum number of jobs (default 1)"
     1.8    echo "    -l           list sessions only"
     1.9    echo "    -o OPTION    override session configuration OPTION (via NAME=VAL or NAME)"
    1.10 +  echo "    -v           verbose"
    1.11    echo
    1.12    echo "  Build and manage Isabelle sessions, depending on implicit"
    1.13    echo "  ISABELLE_BUILD_OPTIONS=\"$ISABELLE_BUILD_OPTIONS\""
    1.14 @@ -38,17 +40,24 @@
    1.15    exit 2
    1.16  }
    1.17  
    1.18 +function check_number()
    1.19 +{
    1.20 +  [ -n "$1" -a -z "$(echo "$1" | tr -d '[0-9]')" ] || fail "Bad number: \"$1\""
    1.21 +}
    1.22 +
    1.23  
    1.24  ## process command line
    1.25  
    1.26  ALL_SESSIONS=false
    1.27  BUILD_IMAGES=false
    1.28 +MAX_JOBS=1
    1.29  LIST_ONLY=false
    1.30 +VERBOSE=false
    1.31  
    1.32  declare -a MORE_DIRS=()
    1.33  eval "declare -a BUILD_OPTIONS=($ISABELLE_BUILD_OPTIONS)"
    1.34  
    1.35 -while getopts "abd:lo:" OPT
    1.36 +while getopts "abd:j:lo:v" OPT
    1.37  do
    1.38    case "$OPT" in
    1.39      a)
    1.40 @@ -60,12 +69,19 @@
    1.41      d)
    1.42        MORE_DIRS["${#MORE_DIRS[@]}"]="$OPTARG"
    1.43        ;;
    1.44 +    j)
    1.45 +      check_number "$OPTARG"
    1.46 +      MAX_JOBS="$OPTARG"
    1.47 +      ;;
    1.48      l)
    1.49        LIST_ONLY="true"
    1.50        ;;
    1.51      o)
    1.52        BUILD_OPTIONS["${#BUILD_OPTIONS[@]}"]="$OPTARG"
    1.53        ;;
    1.54 +    v)
    1.55 +      VERBOSE="true"
    1.56 +      ;;
    1.57      \?)
    1.58        usage
    1.59        ;;
    1.60 @@ -80,5 +96,5 @@
    1.61  [ -e "$ISABELLE_HOME/Admin/build" ] && { "$ISABELLE_HOME/Admin/build" jars || exit $?; }
    1.62  
    1.63  exec "$ISABELLE_TOOL" java isabelle.Build \
    1.64 -  "$ALL_SESSIONS" "$BUILD_IMAGES" "$LIST_ONLY" \
    1.65 +  "$ALL_SESSIONS" "$BUILD_IMAGES" "$MAX_JOBS" "$LIST_ONLY" "$VERBOSE" \
    1.66    "${MORE_DIRS[@]}" $'\n' "${BUILD_OPTIONS[@]}" $'\n' "$@"
     2.1 --- a/src/Pure/System/build.scala	Sun Jul 22 21:59:14 2012 +0200
     2.2 +++ b/src/Pure/System/build.scala	Sun Jul 22 23:31:57 2012 +0200
     2.3 @@ -62,8 +62,10 @@
     2.4        keys: Map[String, Key] = Map.empty,
     2.5        graph: Graph[Key, Info] = Graph.empty(Key.Ordering))
     2.6      {
     2.7 +      def is_empty: Boolean = graph.is_empty
     2.8 +
     2.9 +      def apply(name: String): Info = graph.get_node(keys(name))
    2.10        def defined(name: String): Boolean = keys.isDefinedAt(name)
    2.11 -
    2.12        def is_inner(name: String): Boolean = !graph.is_maximal(keys(name))
    2.13  
    2.14        def + (key: Key, info: Info): Queue =
    2.15 @@ -85,6 +87,8 @@
    2.16          new Queue(keys1, graph1)
    2.17        }
    2.18  
    2.19 +      def - (name: String): Queue = new Queue(keys - name, graph.del_node(keys(name)))
    2.20 +
    2.21        def required(names: List[String]): Queue =
    2.22        {
    2.23          val req = graph.all_preds(names.map(keys(_))).map(_.name).toSet
    2.24 @@ -93,6 +97,14 @@
    2.25          new Queue(keys1, graph1)
    2.26        }
    2.27  
    2.28 +      def dequeue(skip: String => Boolean): Option[(String, Info)] =
    2.29 +      {
    2.30 +        val it = graph.entries.dropWhile(
    2.31 +          { case (key, (_, (deps, _))) => !deps.isEmpty || skip(key.name) })
    2.32 +        if (it.hasNext) { val (key, (info, _)) = it.next; Some((key.name, info)) }
    2.33 +        else None
    2.34 +      }
    2.35 +
    2.36        def topological_order: List[(String, Info)] =
    2.37          graph.topological_order.map(key => (key.name, graph.get_node(key)))
    2.38      }
    2.39 @@ -325,7 +337,7 @@
    2.40  
    2.41      def terminate: Unit = thread.interrupt
    2.42      def is_finished: Boolean = result.is_finished
    2.43 -    def join: (String, String, Int) = { val rc = result.join; args_file.delete; rc }
    2.44 +    def join: (String, String, Int) = { val res = result.join; args_file.delete; res }
    2.45    }
    2.46  
    2.47    private def start_job(save: Boolean, name: String, info: Session.Info): Job =
    2.48 @@ -366,12 +378,13 @@
    2.49    }
    2.50  
    2.51  
    2.52 -  /* Scala entry point */
    2.53 +  /* build */
    2.54  
    2.55    private def echo(msg: String) { java.lang.System.out.println(msg) }
    2.56 -  private def echo_n(msg: String) { java.lang.System.out.print(msg) }
    2.57 +  private def sleep(): Unit = Thread.sleep(500)
    2.58  
    2.59 -  def build(all_sessions: Boolean, build_images: Boolean, list_only: Boolean,
    2.60 +  def build(all_sessions: Boolean, build_images: Boolean, max_jobs: Int,
    2.61 +    list_only: Boolean, verbose: Boolean,
    2.62      more_dirs: List[Path], more_options: List[String], sessions: List[String]): Int =
    2.63    {
    2.64      val options = (Options.init() /: more_options)(_.define_simple(_))
    2.65 @@ -396,37 +409,60 @@
    2.66      val log_dir = Path.explode("$ISABELLE_OUTPUT/log")
    2.67      log_dir.file.mkdirs()
    2.68  
    2.69 +    // scheduler loop
    2.70 +    @tailrec def loop(
    2.71 +      pending: Session.Queue,
    2.72 +      running: Map[String, Job],
    2.73 +      results: Map[String, Int]): Map[String, Int] =
    2.74 +    {
    2.75 +      if (pending.is_empty) results
    2.76 +      else if (running.exists({ case (_, job) => job.is_finished })) {
    2.77 +        val (name, job) = running.find({ case (_, job) => job.is_finished }).get
    2.78  
    2.79 -    val rcs =
    2.80 -      for ((name, info) <- queue.topological_order) yield
    2.81 -      {
    2.82 -        if (list_only) { echo(name + " in " + info.dir); 0 }
    2.83 +        val (out, err, rc) = job.join
    2.84 +        echo(Library.trim_line(err))
    2.85 +
    2.86 +        val log = log_dir + Path.basic(name)
    2.87 +        if (rc == 0) {
    2.88 +          val sources =
    2.89 +            (queue(name).digest :: deps.sources(name).map(_._2)).map(_.toString).sorted
    2.90 +              .mkString("sources: ", " ", "\n")
    2.91 +          File.write_zip(log.ext("gz"), sources + out)
    2.92 +        }
    2.93          else {
    2.94 -          val save = build_images || queue.is_inner(name)
    2.95 -          echo((if (save) "Building " else "Running ") + name + " ...")
    2.96 -
    2.97 -          val (out, err, rc) = start_job(save, name, info).join
    2.98 -          echo_n(err)
    2.99 -
   2.100 -          val log = log_dir + Path.basic(name)
   2.101 -          if (rc == 0) {
   2.102 -            val sources =
   2.103 -              (info.digest :: deps.sources(name).map(_._2)).map(_.toString).sorted
   2.104 -                .mkString("sources: ", " ", "\n")
   2.105 -            File.write_zip(log.ext("gz"), sources + out)
   2.106 -          }
   2.107 -          else {
   2.108 -            File.write(log, out)
   2.109 -            echo(name + " FAILED")
   2.110 -            echo("(see also " + log.file + ")")
   2.111 -            val lines = split_lines(out)
   2.112 -            val tail = lines.drop(lines.length - 20 max 0)
   2.113 -            echo("\n" + cat_lines(tail))
   2.114 -          }
   2.115 -          rc
   2.116 +          File.write(log, out)
   2.117 +          echo(name + " FAILED")
   2.118 +          echo("(see also " + log.file + ")")
   2.119 +          val lines = split_lines(out)
   2.120 +          val tail = lines.drop(lines.length - 20 max 0)
   2.121 +          echo("\n" + cat_lines(tail))
   2.122 +        }
   2.123 +        loop(pending - name, running - name, results + (name -> rc))
   2.124 +      }
   2.125 +      else if (running.size < (max_jobs max 1)) {
   2.126 +        pending.dequeue(running.isDefinedAt(_)) match {
   2.127 +          case Some((name, info)) =>
   2.128 +            if (list_only) {
   2.129 +              echo(name + " in " + info.dir)
   2.130 +              loop(pending - name, running, results + (name -> 0))
   2.131 +            }
   2.132 +            else if (info.parent.map(results(_)).forall(_ == 0)) {
   2.133 +              val save = build_images || queue.is_inner(name)
   2.134 +              echo((if (save) "Building " else "Running ") + name + " ...")
   2.135 +              val job = start_job(save, name, info)
   2.136 +              loop(pending, running + (name -> job), results)
   2.137 +            }
   2.138 +            else {
   2.139 +              echo(name + " CANCELLED")
   2.140 +              loop(pending - name, running, results + (name -> 1))
   2.141 +            }
   2.142 +          case None => sleep(); loop(pending, running, results)
   2.143          }
   2.144        }
   2.145 -    (0 /: rcs)(_ max _)
   2.146 +      else { sleep(); loop(pending, running, results) }
   2.147 +    }
   2.148 +
   2.149 +    (0 /: loop(queue, Map.empty, Map.empty))({ case (rc1, (_, rc2)) => rc1 max rc2 })
   2.150    }
   2.151  
   2.152  
   2.153 @@ -439,9 +475,11 @@
   2.154          case
   2.155            Properties.Value.Boolean(all_sessions) ::
   2.156            Properties.Value.Boolean(build_images) ::
   2.157 +          Properties.Value.Int(max_jobs) ::
   2.158            Properties.Value.Boolean(list_only) ::
   2.159 +          Properties.Value.Boolean(verbose) ::
   2.160            Command_Line.Chunks(more_dirs, options, sessions) =>
   2.161 -            build(all_sessions, build_images, list_only,
   2.162 +            build(all_sessions, build_images, max_jobs, list_only, verbose,
   2.163                more_dirs.map(Path.explode), options, sessions)
   2.164          case _ => error("Bad arguments:\n" + cat_lines(args))
   2.165        }
     3.1 --- a/src/Pure/library.scala	Sun Jul 22 21:59:14 2012 +0200
     3.2 +++ b/src/Pure/library.scala	Sun Jul 22 23:31:57 2012 +0200
     3.3 @@ -62,6 +62,10 @@
     3.4  
     3.5    def split_lines(str: String): List[String] = space_explode('\n', str)
     3.6  
     3.7 +  def trim_line(str: String): String =
     3.8 +    if (str.endsWith("\n")) str.substring(0, str.length - 1)
     3.9 +    else str
    3.10 +
    3.11  
    3.12    /* iterate over chunks (cf. space_explode) */
    3.13