Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TOREE-463] break out scala interpreter initialization to speed up startup #151

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions kernel/src/main/scala/org/apache/toree/boot/KernelBootstrap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class KernelBootstrap(config: Config) extends LogLike {
// customPrintStream as their initial Console.out value
//

val startNanos: Double = System.nanoTime()

// ENSURE THAT WE SET THE RIGHT SPARK PROPERTIES
val execUri = System.getenv("SPARK_EXECUTOR_URI")
System.setProperty("spark.repl.class.outputDir", outputDir.getAbsolutePath)
Expand All @@ -78,6 +80,9 @@ class KernelBootstrap(config: Config) extends LogLike {
// is ready
initializeShutdownHook()

//handle scala interpreter separately because is slow to startup
val (interpreterManager, maybeEventualPartialScalaInterpreter) = initializeSlowComponents(config)

// Initialize the bare minimum to report a starting message
val (actorSystem, actorLoader, kernelMessageRelayActor, statusDispatch) =
initializeBare(
Expand All @@ -96,10 +101,12 @@ class KernelBootstrap(config: Config) extends LogLike {
// Initialize components needed elsewhere
val (commStorage, commRegistrar, commManager, interpreter,
kernel, dependencyDownloader,
magicManager, pluginManager, responseMap) =
magicManager, pluginManager, responseMap, maybeEventualReadyScalaInterpreter) =
initializeComponents(
config = config,
actorLoader = actorLoader
actorLoader = actorLoader,
interpreterManager = interpreterManager,
maybeEventualScalaInterp = maybeEventualPartialScalaInterpreter
)
this.interpreters ++= Seq(interpreter)

Expand All @@ -126,12 +133,19 @@ class KernelBootstrap(config: Config) extends LogLike {
logger.debug("Initializing security manager")
System.setSecurityManager(new KernelSecurityManager)

//Wait for scala interpreter to finish initializing if it is present
maybeEventualReadyScalaInterpreter.map { eventualScalaInterpreter =>
Await.ready(eventualScalaInterpreter, Duration.Inf)
}

logger.debug("Running postInit for interpreters")
interpreters foreach {_.postInit()}

logger.info("Marking relay as ready for receiving messages")
kernelMessageRelayActor ! true

val endNanos: Double = System.nanoTime()
logger.trace(s"Kernel bootstrap took ${(endNanos - startNanos) / 1000000000}s")
this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.toree.comm.{CommManager, CommRegistrar, CommStorage, KernelCom
import org.apache.toree.dependencies.{CoursierDependencyDownloader, Credentials, DependencyDownloader}
import org.apache.toree.interpreter._
import org.apache.toree.kernel.api.Kernel
import org.apache.toree.kernel.interpreter.scala.ScalaInterpreter
Copy link
Contributor

@sanjay-saxena sanjay-saxena Feb 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my limited understanding of the code, picking up a compile-time dependency on an implementation such as ScalaInterpreter from scala-interpreter project in the generic kernel project maybe an abstraction violation. It may also lead to circular dependencies. Maybe, you should look at either reusing the existing APIs on the generic trait Interpreter. And, if the existing APIs are not sufficient, then maybe add new APIs to the generic trait Interpreter.

import org.apache.toree.kernel.protocol.v5.KMBuilder
import org.apache.toree.kernel.protocol.v5.kernel.ActorLoader
import org.apache.toree.magic.MagicManager
Expand All @@ -36,22 +37,39 @@ import org.apache.toree.utils.{LogLike, FileUtils}
import scala.collection.JavaConverters._
import org.apache.toree.plugins.AllInterpretersReady

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

/**
* Represents the component initialization. All component-related pieces of the
* kernel (non-actors) should be created here. Limited items should be exposed.
*/
trait ComponentInitialization {

/**
* Start initializing slow components, specifically the scala interpreter. These are broken out so we
* can start them as early as possible.
*
* @param config the config used for initialization
*/
def initializeSlowComponents(config: Config): (InterpreterManager, Option[Future[ScalaInterpreter]])

/**
* Initializes and registers all components (not needed by bare init).
*
* @param config The config used for initialization
* @param actorLoader The actor loader to use for some initialization
* @param interpreterManager the interpreterManager
* @param maybeEventualScalaInterp a future that will hold the Scala Interpreter when its ready
*/
def initializeComponents(
config: Config, actorLoader: ActorLoader
config: Config,
actorLoader: ActorLoader,
interpreterManager: InterpreterManager,
maybeEventualScalaInterp: Option[Future[ScalaInterpreter]]
): (CommStorage, CommRegistrar, CommManager, Interpreter,
Kernel, DependencyDownloader, MagicManager, PluginManager,
collection.mutable.Map[String, ActorRef])
collection.mutable.Map[String, ActorRef], Option[Future[ScalaInterpreter]])
}

/**
Expand All @@ -60,37 +78,62 @@ trait ComponentInitialization {
trait StandardComponentInitialization extends ComponentInitialization {
this: LogLike =>

/**
* Start initializing slow components, specifically the scala interpreter. These are broken out so we
* can start them as early as possible.
*
* @param config the config used for initialization
*/
def initializeSlowComponents(config: Config): (InterpreterManager, Option[Future[ScalaInterpreter]]) = {
val interpreterManager= InterpreterManager(config)
val optionScalaInterp = interpreterManager.scalaInterpreter
val futureScalaInterp = optionScalaInterp.map { scalaInterp =>
Future(scalaInterp.startInit())
}

(interpreterManager, futureScalaInterp)
}


/**
* Initializes and registers all components (not needed by bare init).
*
* @param config The config used for initialization
* @param actorLoader The actor loader to use for some initialization
* @param interpreterManager the interpreterManager
* @param maybeEventualScalaInterp a future that will hold the Scala Interpreter when its ready
*/
def initializeComponents(
config: Config, actorLoader: ActorLoader
config: Config,
actorLoader: ActorLoader,
interpreterManager: InterpreterManager,
maybeEventualScalaInterp: Option[Future[ScalaInterpreter]]
) = {
val (commStorage, commRegistrar, commManager) =
initializeCommObjects(actorLoader)

val interpreterManager = InterpreterManager(config)
interpreterManager.interpreters foreach(println)

val dependencyDownloader = initializeDependencyDownloader(config)
val pluginManager = createPluginManager(config, interpreterManager, dependencyDownloader)

val kernel = initializeKernel(config, actorLoader, interpreterManager, commManager, pluginManager)

val updatedScalaInterp: Option[Future[ScalaInterpreter]] = maybeEventualScalaInterp.map { futureScalaInterp =>
futureScalaInterp.map { scalaInterp =>
scalaInterp.finishInit(kernel)
}
}

initializePlugins(config, pluginManager)

interpreterManager.initializeInterpreters(kernel)
interpreterManager.initializeRegularInterpreters(kernel)

pluginManager.fireEvent(AllInterpretersReady)

val responseMap = initializeResponseMap()

(commStorage, commRegistrar, commManager,
interpreterManager.defaultInterpreter.get, kernel,
dependencyDownloader, kernel.magics, pluginManager, responseMap)
dependencyDownloader, kernel.magics, pluginManager, responseMap, updatedScalaInterp)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,20 @@ package org.apache.toree.boot.layer
import org.apache.toree.kernel.api.KernelLike
import com.typesafe.config.Config
import org.apache.toree.interpreter._
import scala.collection.JavaConverters._
import org.apache.toree.kernel.interpreter.scala.ScalaInterpreter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment about abstraction violation applies here as well.


import scala.collection.JavaConverters._
import org.slf4j.LoggerFactory

case class InterpreterManager(
default: String = "Scala",
interpreters: Map[String, Interpreter] = Map[String, Interpreter]()
) {

//Scala Interpreter is handled separately
def initializeRegularInterpreters(kernel: KernelLike): Unit = interpreters
.filterNot { case (name, interp) => name == "Scala" && interp.isInstanceOf[ScalaInterpreter] }
.foreach { case (_, interpreter) => interpreter.init(kernel) }

def initializeInterpreters(kernel: KernelLike): Unit = {
interpreters.values.foreach(interpreter =>
Expand All @@ -46,8 +51,15 @@ case class InterpreterManager(
def defaultInterpreter: Option[Interpreter] = {
interpreters.get(default)
}

/**
* returns builtin toree scala interpreter if present.
* @return an option containg the scala interpreter if present
*/
def scalaInterpreter: Option[ScalaInterpreter] = interpreters.get("Scala").collect { case s: ScalaInterpreter => s}
}


object InterpreterManager {

protected val logger = LoggerFactory.getLogger(this.getClass.getName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,14 +258,13 @@ trait ScalaInterpreterSpecific extends SettingsProducerLike { this: ScalaInterpr
* Starts the interpreter, initializing any internal state.
* @return A reference to the interpreter
*/
override def start(): Interpreter = {
override def start(): ScalaInterpreter = {
require(iMain == null && taskManager == null)

taskManager = newTaskManager()

logger.debug("Initializing task manager")
taskManager.start()

iMain = newIMain(settings, new JPrintWriter(lastResultOut, true))

//logger.debug("Initializing interpreter")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.toree.kernel.interpreter.scala

import java.io.ByteArrayOutputStream
import java.util.concurrent.{ExecutionException, TimeoutException, TimeUnit}
import java.util.concurrent.{ExecutionException, TimeUnit, TimeoutException}

import com.typesafe.config.{Config, ConfigFactory}
import jupyter.Displayers
import org.apache.spark.SparkContext
Expand All @@ -30,6 +31,7 @@ import org.apache.toree.utils.TaskManager
import org.slf4j.LoggerFactory
import org.apache.toree.kernel.BuildInfo
import org.apache.toree.kernel.protocol.v5.MIMEType

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.concurrent.{Await, Future}
Expand Down Expand Up @@ -66,9 +68,7 @@ class ScalaInterpreter(private val config:Config = ConfigFactory.load) extends I
settings
}

protected var settings: Settings = newSettings(List())
settings = appendClassPath(settings)

protected var settings: Settings = _

private val maxInterpreterThreads: Int = {
if(config.hasPath("max_interpreter_threads"))
Expand Down Expand Up @@ -99,6 +99,33 @@ class ScalaInterpreter(private val config:Config = ConfigFactory.load) extends I
this
}

/**
* Start initialization with only the given config
*
* @return this partially initialized scalaInterpreter
*/
def startInit(): ScalaInterpreter = {
import scala.collection.JavaConverters._
val args = config.getStringList("interpreter_args").asScala.toList
settings = newSettings(args)
settings = appendClassPath(settings)

start()
}


/**
* Finish initializing this interpreter with the kernel.
*
* @param kernel the kernel
* @return this fully initialized scalaInterpreter
*/
def finishInit(kernel: KernelLike): ScalaInterpreter = {
this._kernel = kernel
bindVariables()
this
}

protected def bindVariables(): Unit = {
bindKernelVariable(kernel)
bindSparkSession()
Expand Down