提交 3d528442 编写于 作者: N Nikolaas Steenbergen 提交者: Stephan Ewen

[FLINK-2161] [scala shell] Modify start script to take additional argument (-a...

[FLINK-2161] [scala shell] Modify start script to take additional argument (-a <path/to/class> or --addclasspath <path/to/class>) for external libraries

This closes #805
上级 9f96b32b
......@@ -68,3 +68,13 @@ Scala-Flink> env.execute("MyProgram")
The Flink Shell comes with command history and autocompletion.
## Adding external dependencies
It is possible to add external classpaths to the Scala-shell. These will be sent to the Jobmanager automatically alongside your shell program, when calling execute.
Use the parameter `-a <path/to/jar.jar>` or `--addclasspath <path/to/jar.jar>` to load additional classes.
~~~bash
bin/start-scala-shell --addclasspath <path/to/jar.jar>
~~~
......@@ -19,12 +19,17 @@ package org.apache.flink.api.java;
* limitations under the License.
*/
import org.apache.commons.lang.ArrayUtils;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.api.scala.FlinkILoop;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* Special version of {@link org.apache.flink.api.java.RemoteEnvironment} that has a reference
* to a {@link org.apache.flink.api.scala.FlinkILoop}. When execute is called this will
......@@ -61,8 +66,19 @@ public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
String jarFile = flinkILoop.writeFilesToDisk().getAbsolutePath();
// call "traditional" execution methods
PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFile);
// get "external jars, and add the shell command jar, pass to executor
List<String> alljars = new ArrayList<String>();
// get external (library) jars
String[] extJars = this.flinkILoop.getExternalJars();
if(!ArrayUtils.isEmpty(extJars)) {
alljars.addAll(Arrays.asList(extJars));
}
// add shell commands
alljars.add(jarFile);
String[] alljarsArr = new String[alljars.size()];
alljarsArr = alljars.toArray(alljarsArr);
PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, alljarsArr);
executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
return executor.executePlan(p);
......
......@@ -29,16 +29,27 @@ import org.apache.flink.util.AbstractID
class FlinkILoop(
val host: String,
val port: Int,
val externalJars: Option[Array[String]],
in0: Option[BufferedReader],
out0: JPrintWriter)
extends ILoop(in0, out0) {
def this(host:String, port:Int, in0: BufferedReader, out: JPrintWriter){
this(host:String, port:Int, Some(in0), out)
def this(host:String,
port:Int,
externalJars : Option[Array[String]],
in0: BufferedReader,
out: JPrintWriter){
this(host:String, port:Int, externalJars, Some(in0), out)
}
def this(host:String, port:Int){
this(host:String,port: Int,None, new JPrintWriter(Console.out, true))
def this(host:String, port:Int, externalJars : Option[Array[String]]){
this(host:String,port: Int, externalJars , None, new JPrintWriter(Console.out, true))
}
def this(host: String, port: Int, in0: BufferedReader, out: JPrintWriter){
this(host: String, port: Int, None, in0: BufferedReader, out: JPrintWriter)
}
// remote environment
private val remoteEnv: ScalaShellRemoteEnvironment = {
......@@ -126,6 +137,7 @@ class FlinkILoop(
}
val compiledClasses = new File(tmpDirShell.getAbsolutePath)
val jarFilePath = new File(tmpJarShell.getAbsolutePath)
val jh: JarHelper = new JarHelper
......@@ -191,5 +203,8 @@ HINT: You can use print() on a DataSet to print the contents to this shell.
)
}
def getExternalJars(): Array[String] = externalJars.getOrElse(Array.empty[String])
}
......@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.scala
......@@ -29,8 +30,10 @@ object FlinkShell {
def main(args: Array[String]) {
// scopt, command line arguments
case class Config(port: Int = -1,
host: String = "none")
case class Config(
port: Int = -1,
host: String = "none",
externalJars: Option[Array[String]] = None)
val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
head ("Flink Scala Shell")
......@@ -44,6 +47,12 @@ object FlinkShell {
c.copy (host = x)
} text("host specifies host name of running JobManager")
opt[(String)] ('a',"addclasspath") action {
case (x,c) =>
val xArray = x.split(":")
c.copy(externalJars = Option(xArray))
} text("specifies additional jars to be used in Flink")
help("help") text("prints this usage text")
}
......@@ -51,14 +60,18 @@ object FlinkShell {
// parse arguments
parser.parse (args, Config () ) match {
case Some(config) =>
startShell(config.host,config.port)
startShell(config.host,config.port,config.externalJars)
case _ => println("Could not parse program arguments")
}
}
def startShell(userHost : String, userPort : Int): Unit ={
def startShell(
userHost : String,
userPort : Int,
externalJars : Option[Array[String]] = None): Unit ={
println("Starting Flink Shell:")
var cluster: LocalFlinkMiniCluster = null
......@@ -73,9 +86,9 @@ object FlinkShell {
println(s"Connecting to remote server (host: $userHost, port: $userPort).")
(userHost, userPort)
}
// custom shell
val repl = new FlinkILoop(host, port) //new MyILoop();
val repl = new FlinkILoop(host, port, externalJars) //new MyILoop();
repl.settings = new Settings()
......
......@@ -19,14 +19,12 @@
package org.apache.flink.api.scala
import java.io._
import java.net.URLClassLoader
import java.util.concurrent.TimeUnit
import org.apache.flink.runtime.StreamingMode
import org.apache.flink.test.util.{TestEnvironment, TestBaseUtils, ForkableFlinkMiniCluster, FlinkTestBase}
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite, Matchers}
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration.FiniteDuration
import scala.tools.nsc.Settings
......@@ -122,15 +120,47 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
output should include("WC(hello,1)")
output should include("WC(world,10)")
}
test("Submit external library") {
val input : String =
"""
import org.apache.flink.ml.math._
val denseVectors = env.fromElements(DenseVector(1.0, 2.0, 3.0))
denseVectors.print()
""".stripMargin
// find jar file that contains the ml code
var externalJar : String = ""
var folder : File = new File("../flink-ml/target/");
var listOfFiles : Array[File] = folder.listFiles();
for(i <- 0 to listOfFiles.length - 1){
var filename : String = listOfFiles(i).getName();
if(!filename.contains("test") && !filename.contains("original") && filename.contains(".jar")){
println("ive found file:" + listOfFiles(i).getAbsolutePath)
externalJar = listOfFiles(i).getAbsolutePath
}
}
assert(externalJar != "")
val output : String = processInShell(input,Option(externalJar))
output should not include "failed"
output should not include "error"
output should not include "Exception"
output should include( "\nDenseVector(1.0, 2.0, 3.0)")
}
/**
* Run the input using a Scala Shell and return the output of the shell.
* @param input commands to be processed in the shell
* @return output of shell
*/
def processInShell(input : String): String ={
def processInShell(input : String, externalJars : Option[String] = None): String ={
val in = new BufferedReader(new StringReader(input + "\n"))
val out = new StringWriter()
val baos = new ByteArrayOutputStream()
......@@ -142,32 +172,31 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
val host = "localhost"
val port = cluster match {
case Some(c) => c.getLeaderRPCPort
case _ => throw new RuntimeException("Test cluster not initialized.")
}
val cl = getClass.getClassLoader
var paths = new ArrayBuffer[String]
cl match {
case urlCl: URLClassLoader =>
for (url <- urlCl.getURLs) {
if (url.getProtocol == "file") {
paths += url.getFile
}
}
case _ =>
var repl : FlinkILoop= null
externalJars match {
case Some(ej) => repl = new FlinkILoop(
host, port,
Option(Array(ej)),
in, new PrintWriter(out))
case None => repl = new FlinkILoop(
host,port,
in,new PrintWriter(out))
}
val classpath = paths.mkString(File.pathSeparator)
val repl = new FlinkILoop(host, port, in, new PrintWriter(out)) //new MyILoop();
repl.settings = new Settings()
// enable this line to use scala in intellij
repl.settings.usejavacp.value = true
repl.addedClasspath = classpath
externalJars match {
case Some(ej) => repl.settings.classpath.value = ej
case None =>
}
repl.process(repl.settings)
......@@ -175,11 +204,10 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
System.setOut(oldOut)
baos.flush()
val stdout = baos.toString
// reprint because ScalaTest fails if we don't
print(stdout)
out.toString + stdout
}
......
......@@ -52,7 +52,35 @@ bin=`cd "$bin"; pwd`
FLINK_CLASSPATH=`constructFlinkClassPath`
java -cp "$FLINK_CLASSPATH" org.apache.flink.api.scala.FlinkShell $@
# https://issues.scala-lang.org/browse/SI-6502, cant load external jars interactively
# in scala shell since 2.10, has to be done at startup
# checks arguments for additional classpath and adds it to the "standard classpath"
EXTERNAL_LIB_FOUND=false
for ((i=1;i<=$#;i++))
do
if [[ ${!i} = "-a" || ${!i} = "--addclasspath" ]]
then
EXTERNAL_LIB_FOUND=true
#adding to classpath
k=$((i+1))
j=$((k+1))
echo " "
echo "Additional classpath:${!k}"
echo " "
EXT_CLASSPATH="${!k}"
FLINK_CLASSPATH="$FLINK_CLASSPATH:${!k}"
set -- "${@:1:$((i-1))}" "${@:j}"
fi
done
if ${EXTERNAL_LIB_FOUND}
then
java -cp "$FLINK_CLASSPATH" org.apache.flink.api.scala.FlinkShell --addclasspath "$EXT_CLASSPATH" $@
else
java -cp "$FLINK_CLASSPATH" org.apache.flink.api.scala.FlinkShell $@
fi
#restore echo
onExit
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册