提交 6691674b 编写于 作者: G ghermann 提交者: Stephan Ewen

[streaming] Added StreamCollector2

上级 84759dad
......@@ -56,8 +56,6 @@ public class DataStream<T extends Tuple> {
initConnections();
}
//TODO: create copy method (or constructor) and copy datastream at every operator
private void initConnections() {
connectIDs = new ArrayList<String>();
......@@ -84,6 +82,7 @@ public class DataStream<T extends Tuple> {
for (int i = 0; i < batchSizes.size(); i++) {
batchSizes.set(i, batchSize);
}
context.setBatchSize(this);
return this;
}
......
......@@ -306,8 +306,7 @@ public class JobGraphBuilder {
public void setBatchSize(String componentName, int batchSize) {
Configuration config = components.get(componentName).getConfiguration();
config.setInteger("batchSize_"
+ (components.get(componentName).getNumberOfForwardConnections() - 1), batchSize);
config.setInteger("batchSize", batchSize);
}
/**
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import java.util.ArrayList;
import java.util.List;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
public class StreamCollector2<T extends Tuple> implements Collector<T> {
ArrayList<StreamCollector<Tuple>> notPartitionedCollectors;
ArrayList<StreamCollector<Tuple>[]> partitionedCollectors;
int keyPostition;
// TODO consider channelID
public StreamCollector2(int[] batchSizesOfNotPartitioned, int[] batchSizesOfPartitioned,
int[] parallelismOfOutput, int keyPosition, long batchTimeout, int channelID,
SerializationDelegate<Tuple> serializationDelegate,
List<RecordWriter<StreamRecord>> outputs) {
notPartitionedCollectors = new ArrayList<StreamCollector<Tuple>>(
batchSizesOfNotPartitioned.length);
partitionedCollectors = new ArrayList<StreamCollector<Tuple>[]>(
batchSizesOfPartitioned.length);
this.keyPostition = keyPosition;
for (int i = 0; i < batchSizesOfNotPartitioned.length; i++) {
notPartitionedCollectors.add(new StreamCollector<Tuple>(batchSizesOfNotPartitioned[i],
batchTimeout, channelID, serializationDelegate, outputs));
}
for (int i = 0; i < batchSizesOfPartitioned.length; i++) {
StreamCollector<Tuple>[] collectors = new StreamCollector[parallelismOfOutput[i]];
for (int j = 0; j < collectors.length; j++) {
collectors[j] = new StreamCollector<Tuple>(batchSizesOfPartitioned[i],
batchTimeout, channelID, serializationDelegate, outputs);
}
partitionedCollectors.add(collectors);
}
}
// TODO copy here instead of copying inside every StreamCollector
@Override
public void collect(T record) {
for (StreamCollector<Tuple> collector : notPartitionedCollectors) {
collector.collect(record);
}
int partitionHash = Math.abs(record.getField(keyPostition).hashCode());
for (StreamCollector<Tuple>[] collectors : partitionedCollectors) {
collectors[partitionHash % collectors.length].collect(record);
}
}
@Override
public void close() {
}
}
......@@ -91,8 +91,6 @@ public class StreamExecutionEnvironment {
}
}
this.setBatchSize(inputStream);
}
public <T extends Tuple, R extends Tuple> DataStream<R> addFunction(String functionName,
......
......@@ -18,7 +18,6 @@ package eu.stratosphere.streaming.api.streamcomponent;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.List;
......@@ -76,12 +75,6 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
private SerializationDelegate<Tuple> outSerializationDelegate = null;
public StreamCollector<Tuple> collector;
private List<Integer> batchsizes_s = new ArrayList<Integer>();
private List<Integer> batchsizes_f = new ArrayList<Integer>();
private int keyPosition = 0;
private List<RecordWriter<StreamRecord>> outputs_s = new ArrayList<RecordWriter<StreamRecord>>();
private List<RecordWriter<StreamRecord>> outputs_f = new ArrayList<RecordWriter<StreamRecord>>();
public static int newComponent() {
numComponents++;
......@@ -116,7 +109,6 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
List<RecordWriter<StreamRecord>> outputs) {
int batchSize = taskConfiguration.getInteger("batchSize", 1);
long batchTimeout = taskConfiguration.getLong("batchTimeout", 1000);
collector = new StreamCollector<Tuple>(batchSize, batchTimeout, id,
outSerializationDelegate, outputs);
......@@ -216,11 +208,10 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
List<ChannelSelector<StreamRecord>> partitioners) throws StreamComponentException {
int numberOfOutputs = taskConfiguration.getInteger("numberOfOutputs", 0);
for (int i = 0; i < numberOfOutputs; i++) {
setPartitioner(taskConfiguration, i, partitioners);
ChannelSelector<StreamRecord> outputPartitioner = partitioners.get(i);
}
for (ChannelSelector<StreamRecord> outputPartitioner : partitioners) {
if (taskBase instanceof StreamTask) {
outputs.add(new RecordWriter<StreamRecord>((StreamTask) taskBase,
StreamRecord.class, outputPartitioner));
......@@ -230,11 +221,6 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
} else {
throw new StreamComponentException("Nonsupported object passed to setConfigOutputs");
}
if (outputs_f.size() < batchsizes_f.size()) {
outputs_f.add(outputs.get(i));
} else {
outputs_s.add(outputs.get(i));
}
}
}
......@@ -335,18 +321,14 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
Class<? extends ChannelSelector<StreamRecord>> partitioner = taskConfiguration.getClass(
"partitionerClass_" + nrOutput, DefaultPartitioner.class, ChannelSelector.class);
Integer batchSize = taskConfiguration.getInteger("batchSize_" + nrOutput, 1);
try {
if (partitioner.equals(FieldsPartitioner.class)) {
batchsizes_f.add(batchSize);
// TODO:force one partitioning field
keyPosition = taskConfiguration.getInteger("partitionerIntParam_" + nrOutput, 1);
int keyPosition = taskConfiguration
.getInteger("partitionerIntParam_" + nrOutput, 1);
partitioners.add(partitioner.getConstructor(int.class).newInstance(keyPosition));
} else {
batchsizes_s.add(batchSize);
partitioners.add(partitioner.newInstance());
}
if (log.isTraceEnabled()) {
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
public class StreamCollector2Test {
StreamCollector2<Tuple> collector;
@Test
public void testCollect() {
int[] batchSizesOfNotPartitioned = new int[] {};
int[] batchSizesOfPartitioned = new int[] {2, 2};
int[] parallelismOfOutput = new int[] {2, 1};
int keyPosition = 0;
long batchTimeout = 1000;
int channelID = 1;
collector = new StreamCollector2<Tuple>(batchSizesOfNotPartitioned, batchSizesOfPartitioned, parallelismOfOutput, keyPosition, batchTimeout, channelID, null, null);
Tuple1<Integer> t = new Tuple1<Integer>();
StreamCollector<Tuple> sc1 = new StreamCollector<Tuple>(1, batchTimeout, channelID, null);
t.f0 = 0;
collector.collect(t);
t.f0 = 1;
collector.collect(t);
t.f0 = 0;
collector.collect(t);
t.f0 = 1;
collector.collect(t);
}
@Test
public void testClose() {
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册