提交 9a18e579 编写于 作者: T Till Rohrmann

[yarn] [python] Adds -Xms start option for yarn, fixes python test case to...

[yarn] [python] Adds -Xms start option for yarn, fixes python test case to overwrite files, fixes python docs

Fixes failing yarn test cases
上级 844d5e2c
...@@ -125,7 +125,7 @@ of these methods on DataSet: ...@@ -125,7 +125,7 @@ of these methods on DataSet:
{% highlight python %} {% highlight python %}
data.write_text("<file-path>", WriteMode=Constants.NO_OVERWRITE) data.write_text("<file-path>", WriteMode=Constants.NO_OVERWRITE)
write_csv("<file-path>", WriteMode=Constants.NO_OVERWRITE, line_delimiter='\n', field_delimiter=',') write_csv("<file-path>", line_delimiter='\n', field_delimiter=',', write_mode=Constants.NO_OVERWRITE)
output() output()
{% endhighlight %} {% endhighlight %}
......
...@@ -17,13 +17,14 @@ ...@@ -17,13 +17,14 @@
################################################################################ ################################################################################
from flink.plan.Environment import get_environment from flink.plan.Environment import get_environment
from flink.plan.Constants import INT, STRING from flink.plan.Constants import INT, STRING
from flink.plan.Constants import WriteMode
if __name__ == "__main__": if __name__ == "__main__":
env = get_environment() env = get_environment()
d1 = env.read_csv("src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_csv", (INT, INT, STRING)) d1 = env.read_csv("src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_csv", (INT, INT, STRING))
d1.write_csv("/tmp/flink/result") d1.write_csv("/tmp/flink/result", line_delimiter="\n", field_delimiter="|", write_mode=WriteMode.OVERWRITE)
env.set_degree_of_parallelism(1) env.set_degree_of_parallelism(1)
......
...@@ -514,8 +514,8 @@ public class YARNSessionFIFOITCase extends YarnTestBase { ...@@ -514,8 +514,8 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog); Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog);
content = FileUtils.readFileToString(jobmanagerLog); content = FileUtils.readFileToString(jobmanagerLog);
// expecting 512 mb, because TM was started with 1024, we cut off 50% (NOT THE DEFAULT VALUE). // expecting 512 mb, because TM was started with 1024, we cut off 50% (NOT THE DEFAULT VALUE).
Assert.assertTrue("Expected string 'Starting TM with command=$JAVA_HOME/bin/java -Xmx512m' not found in JobManager log: '"+jobmanagerLog+"'", Assert.assertTrue("Expected string 'Starting TM with command=$JAVA_HOME/bin/java -Xms512m -Xmx512m' not found in JobManager log: '"+jobmanagerLog+"'",
content.contains("Starting TM with command=$JAVA_HOME/bin/java -Xmx512m")); content.contains("Starting TM with command=$JAVA_HOME/bin/java -Xms512m -Xmx512m"));
Assert.assertTrue("Expected string ' (2/2) (attempt #0) to ' not found in JobManager log." + Assert.assertTrue("Expected string ' (2/2) (attempt #0) to ' not found in JobManager log." +
"This string checks that the job has been started with a parallelism of 2. Log contents: '"+jobmanagerLog+"'", "This string checks that the job has been started with a parallelism of 2. Log contents: '"+jobmanagerLog+"'",
content.contains(" (2/2) (attempt #0) to ")); content.contains(" (2/2) (attempt #0) to "));
......
...@@ -506,7 +506,8 @@ trait ApplicationMasterActor extends ActorLogMessages { ...@@ -506,7 +506,8 @@ trait ApplicationMasterActor extends ActorLogMessages {
val ctx = Records.newRecord(classOf[ContainerLaunchContext]) val ctx = Records.newRecord(classOf[ContainerLaunchContext])
val javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "") val javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "")
val tmCommand = new StringBuilder(s"$$JAVA_HOME/bin/java -Xmx${heapLimit}m $javaOpts") val tmCommand = new StringBuilder(s"$$JAVA_HOME/bin/java -Xms${heapLimit}m " +
s"-Xmx${heapLimit}m $javaOpts")
if (hasLogback || hasLog4j) { if (hasLogback || hasLog4j) {
tmCommand ++= tmCommand ++=
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册