提交 84759dad 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] added support for batch partitioning to jobgraphbuilder

上级 7e7051b3
...@@ -57,6 +57,8 @@ public class DataStream<T extends Tuple> { ...@@ -57,6 +57,8 @@ public class DataStream<T extends Tuple> {
} }
//TODO: create copy method (or constructor) and copy datastream at every operator
private void initConnections() { private void initConnections() {
connectIDs = new ArrayList<String>(); connectIDs = new ArrayList<String>();
connectIDs.add(getId()); connectIDs.add(getId());
...@@ -82,7 +84,6 @@ public class DataStream<T extends Tuple> { ...@@ -82,7 +84,6 @@ public class DataStream<T extends Tuple> {
for (int i = 0; i < batchSizes.size(); i++) { for (int i = 0; i < batchSizes.size(); i++) {
batchSizes.set(i, batchSize); batchSizes.set(i, batchSize);
} }
context.setBatchSize(this);
return this; return this;
} }
......
...@@ -306,7 +306,8 @@ public class JobGraphBuilder { ...@@ -306,7 +306,8 @@ public class JobGraphBuilder {
public void setBatchSize(String componentName, int batchSize) { public void setBatchSize(String componentName, int batchSize) {
Configuration config = components.get(componentName).getConfiguration(); Configuration config = components.get(componentName).getConfiguration();
config.setInteger("batchSize", batchSize); config.setInteger("batchSize_"
+ (components.get(componentName).getNumberOfForwardConnections() - 1), batchSize);
} }
/** /**
......
...@@ -91,6 +91,8 @@ public class StreamExecutionEnvironment { ...@@ -91,6 +91,8 @@ public class StreamExecutionEnvironment {
} }
} }
this.setBatchSize(inputStream);
} }
public <T extends Tuple, R extends Tuple> DataStream<R> addFunction(String functionName, public <T extends Tuple, R extends Tuple> DataStream<R> addFunction(String functionName,
......
...@@ -18,6 +18,7 @@ package eu.stratosphere.streaming.api.streamcomponent; ...@@ -18,6 +18,7 @@ package eu.stratosphere.streaming.api.streamcomponent;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.ObjectInputStream; import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.ConcurrentModificationException; import java.util.ConcurrentModificationException;
import java.util.List; import java.util.List;
...@@ -75,6 +76,12 @@ public final class StreamComponentHelper<T extends AbstractInvokable> { ...@@ -75,6 +76,12 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
private SerializationDelegate<Tuple> outSerializationDelegate = null; private SerializationDelegate<Tuple> outSerializationDelegate = null;
public StreamCollector<Tuple> collector; public StreamCollector<Tuple> collector;
private List<Integer> batchsizes_s = new ArrayList<Integer>();
private List<Integer> batchsizes_f = new ArrayList<Integer>();
private int keyPosition = 0;
private List<RecordWriter<StreamRecord>> outputs_s = new ArrayList<RecordWriter<StreamRecord>>();
private List<RecordWriter<StreamRecord>> outputs_f = new ArrayList<RecordWriter<StreamRecord>>();
public static int newComponent() { public static int newComponent() {
numComponents++; numComponents++;
...@@ -109,6 +116,7 @@ public final class StreamComponentHelper<T extends AbstractInvokable> { ...@@ -109,6 +116,7 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
List<RecordWriter<StreamRecord>> outputs) { List<RecordWriter<StreamRecord>> outputs) {
int batchSize = taskConfiguration.getInteger("batchSize", 1); int batchSize = taskConfiguration.getInteger("batchSize", 1);
long batchTimeout = taskConfiguration.getLong("batchTimeout", 1000); long batchTimeout = taskConfiguration.getLong("batchTimeout", 1000);
collector = new StreamCollector<Tuple>(batchSize, batchTimeout, id, collector = new StreamCollector<Tuple>(batchSize, batchTimeout, id,
outSerializationDelegate, outputs); outSerializationDelegate, outputs);
...@@ -208,10 +216,11 @@ public final class StreamComponentHelper<T extends AbstractInvokable> { ...@@ -208,10 +216,11 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
List<ChannelSelector<StreamRecord>> partitioners) throws StreamComponentException { List<ChannelSelector<StreamRecord>> partitioners) throws StreamComponentException {
int numberOfOutputs = taskConfiguration.getInteger("numberOfOutputs", 0); int numberOfOutputs = taskConfiguration.getInteger("numberOfOutputs", 0);
for (int i = 0; i < numberOfOutputs; i++) { for (int i = 0; i < numberOfOutputs; i++) {
setPartitioner(taskConfiguration, i, partitioners); setPartitioner(taskConfiguration, i, partitioners);
} ChannelSelector<StreamRecord> outputPartitioner = partitioners.get(i);
for (ChannelSelector<StreamRecord> outputPartitioner : partitioners) {
if (taskBase instanceof StreamTask) { if (taskBase instanceof StreamTask) {
outputs.add(new RecordWriter<StreamRecord>((StreamTask) taskBase, outputs.add(new RecordWriter<StreamRecord>((StreamTask) taskBase,
StreamRecord.class, outputPartitioner)); StreamRecord.class, outputPartitioner));
...@@ -221,6 +230,11 @@ public final class StreamComponentHelper<T extends AbstractInvokable> { ...@@ -221,6 +230,11 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
} else { } else {
throw new StreamComponentException("Nonsupported object passed to setConfigOutputs"); throw new StreamComponentException("Nonsupported object passed to setConfigOutputs");
} }
if (outputs_f.size() < batchsizes_f.size()) {
outputs_f.add(outputs.get(i));
} else {
outputs_s.add(outputs.get(i));
}
} }
} }
...@@ -321,14 +335,18 @@ public final class StreamComponentHelper<T extends AbstractInvokable> { ...@@ -321,14 +335,18 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
Class<? extends ChannelSelector<StreamRecord>> partitioner = taskConfiguration.getClass( Class<? extends ChannelSelector<StreamRecord>> partitioner = taskConfiguration.getClass(
"partitionerClass_" + nrOutput, DefaultPartitioner.class, ChannelSelector.class); "partitionerClass_" + nrOutput, DefaultPartitioner.class, ChannelSelector.class);
Integer batchSize = taskConfiguration.getInteger("batchSize_" + nrOutput, 1);
try { try {
if (partitioner.equals(FieldsPartitioner.class)) { if (partitioner.equals(FieldsPartitioner.class)) {
int keyPosition = taskConfiguration batchsizes_f.add(batchSize);
.getInteger("partitionerIntParam_" + nrOutput, 1); // TODO:force one partitioning field
keyPosition = taskConfiguration.getInteger("partitionerIntParam_" + nrOutput, 1);
partitioners.add(partitioner.getConstructor(int.class).newInstance(keyPosition)); partitioners.add(partitioner.getConstructor(int.class).newInstance(keyPosition));
} else { } else {
batchsizes_s.add(batchSize);
partitioners.add(partitioner.newInstance()); partitioners.add(partitioner.newInstance());
} }
if (log.isTraceEnabled()) { if (log.isTraceEnabled()) {
......
package eu.stratosphere.streaming.util;
import java.io.IOException;
import java.io.File;
public class TestDataUtil {
public static void downloadIfNotExists(String fileName) {
String testDataDir = "";
File file = new File(testDataDir + fileName);
String testRepoUrl = "info.ilab.sztaki.hu/~mbalassi/flink-streaming/testdata/";
if (file.exists()){
System.out.println(fileName +" already exists");
} else {
System.out.println("downloading " + fileName);
try {
String myCommand = "wget -O " + testDataDir + fileName + " " + testRepoUrl + fileName;
System.out.println(myCommand);
Runtime.getRuntime().exec(myCommand);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册