提交 6be31d5c 编写于 作者: M Márton Balassi 提交者: Stephan Ewen

[streaming] Rat plugin skeleton added

上级 9840f820
......@@ -144,7 +144,6 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
......@@ -171,7 +170,44 @@
<forkMode>once</forkMode>
<argLine>-Xmx1024m</argLine>
</configuration>
</plugin>
</plugin>
<!--TODO fix RAT plugin && subs tabs for spaces-->
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<version>0.10</version>
<executions>
<execution>
<phase>verify</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
<configuration>
<excludeSubProjects>false</excludeSubProjects>
<numUnapprovedLicenses>0</numUnapprovedLicenses>
<excludes>
<!-- Additional files like .gitignore etc.-->
<exclude>**/.*</exclude>
<exclude>**/*.prefs</exclude>
<!-- Resource files which have values. -->
<exclude>**/resources/**</exclude>
<!-- Configuration Files. -->
<exclude>**/stratosphere-bin/conf/slaves</exclude>
<!-- Administrative files in the main trunk. -->
<exclude>README.md</exclude>
<exclude>CHANGELOG</exclude>
<exclude>**/*.creole</exclude>
<exclude>CONTRIBUTORS</exclude>
<!-- Build fiels -->
<exclude>**/pom.xml</exclude>
<exclude>**/*.iml</exclude>
<!-- Generated content -->
<exclude>**/target/**</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
......@@ -24,6 +24,8 @@ import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
/**
* TaskEvent for sending record acknowledgements to the input's fault tolerance
* buffer
*
*
*/
public class AckEvent extends AbstractTaskEvent {
......
......@@ -49,13 +49,15 @@ public class AckEventListener implements EventListener {
/**
* When an AckEvent occurs checks if it was directed at this task, if so,
* acknowledges the record given in the AckEvent
*
*/
public void eventOccurred(AbstractTaskEvent event) {
AckEvent ackEvent = (AckEvent) event;
String recordId = ackEvent.getRecordId();
String ackChannelId = recordId.split("-", 2)[0];
String ackCID = recordId.split("-", 2)[0];
if (ackChannelId.equals(taskInstanceID)) {
if (ackCID.equals(taskInstanceID)) {
Long nt = System.nanoTime();
recordBuffer.ackRecord(ackEvent.getRecordId());
......
......@@ -39,19 +39,18 @@ public class FaultToleranceBuffer {
private static final Log log = LogFactory
.getLog(FaultToleranceBuffer.class);
private long timeout = 10000;
private long TIMEOUT = 10000;
private Long timeOfLastUpdate;
private Map<String, StreamRecord> recordBuffer;
private Map<String, Integer> ackCounter;
private Map<String, int[]> ackMap;
private SortedMap<Long, Set<String>> recordsByTime;
private Map<String, Long> recordTimestamps;
private int numberofOutputs;
private List<RecordWriter<StreamRecord>> outputs;
private final String channelID;
private int numberOfChannels;
private int numberOfOutputs;
private int[] numberOfOutputChannels;
/**
......@@ -65,6 +64,7 @@ public class FaultToleranceBuffer {
* @param numberOfChannels
* Number of output channels for the output components
*/
public FaultToleranceBuffer(List<RecordWriter<StreamRecord>> outputs,
String channelID, int[] numberOfChannels) {
this.timeOfLastUpdate = System.currentTimeMillis();
......@@ -72,14 +72,14 @@ public class FaultToleranceBuffer {
this.recordBuffer = new HashMap<String, StreamRecord>();
this.ackCounter = new HashMap<String, Integer>();
this.ackMap = new HashMap<String, int[]>();
this.numberOfOutputChannels = numberOfChannels;
this.numberOfOutputChannels=numberOfChannels;
int totalChannels = 0;
for (int i : numberOfChannels)
totalChannels += i;
this.numberofOutputs = numberOfOutputChannels.length;
this.numberOfChannels = totalChannels;
this.numberOfOutputs = totalChannels;
this.channelID = channelID;
this.recordsByTime = new TreeMap<Long, Set<String>>();
this.recordTimestamps = new HashMap<String, Long>();
......@@ -88,21 +88,15 @@ public class FaultToleranceBuffer {
/**
* Adds the record to the fault tolerance buffer. This record will be
* monitored for acknowledgements and timeout.
*
*/
public void addRecord(StreamRecord streamRecord) {
String id = streamRecord.getId();
recordBuffer.put(id, streamRecord.copy());
ackCounter.put(id, numberOfChannels);
//TODO: remove comments for exactly once processing
// int[] ackCounts = new int[numberOfChannels + 1];
//
// for (int i = 0; i < numberOfOutputChannels.length; i++) {
// ackCounts[i + 1] = numberOfOutputChannels[i];
// }
//
// ackMap.put(id, ackCounts);
ackCounter.put(id, numberOfOutputs);
ackMap.put(id,numberOfOutputChannels.clone());
addTimestamp(id);
log.trace("Record added to buffer: " + id);
}
......@@ -117,11 +111,11 @@ public class FaultToleranceBuffer {
* @return Returns the list of the records that have timed out.
*/
List<String> timeoutRecords(Long currentTime) {
if (timeOfLastUpdate + timeout < currentTime) {
if (timeOfLastUpdate + TIMEOUT < currentTime) {
log.trace("Updating record buffer");
List<String> timedOutRecords = new LinkedList<String>();
Map<Long, Set<String>> timedOut = recordsByTime.subMap(0L,
currentTime - timeout);
currentTime - TIMEOUT);
for (Set<String> recordSet : timedOut.values()) {
if (!recordSet.isEmpty()) {
......@@ -174,11 +168,15 @@ public class FaultToleranceBuffer {
*
* @param recordID
* The ID of the record that will be removed
*
*/
public StreamRecord removeRecord(String recordID) {
ackCounter.remove(recordID);
recordsByTime.get(recordTimestamps.remove(recordID)).remove(recordID);
log.trace("Record removed from buffer: " + recordID);
return recordBuffer.remove(recordID);
}
......@@ -192,7 +190,7 @@ public class FaultToleranceBuffer {
// TODO: find a place to call timeoutRecords
public void ackRecord(String recordID) {
if (ackCounter.containsKey(recordID)) {
Integer ackCount = ackCounter.get(recordID) - 1;
Integer ackCount = ackCounter.get(recordID)-1;
if (ackCount == 0) {
removeRecord(recordID);
} else {
......@@ -208,25 +206,35 @@ public class FaultToleranceBuffer {
* @param recordID
* ID of the record that has been acknowledged
*
* @param output
* @param outputChannel
* Number of the output channel that sent the ack
*/
public void ackRecord(String recordID, int output) {
public void ackRecord(String recordID, int outputChannel) {
if (ackMap.containsKey(recordID)) {
if (decreaseAckCounter(recordID, output)) {
int[] acks = ackMap.get(recordID);
acks[outputChannel]--;
if (allZero(acks)) {
removeRecord(recordID);
}
}
}
private boolean decreaseAckCounter(String recordID, int output) {
int[] acks = ackMap.get(recordID);
acks[output + 1]--;
if (acks[output + 1] == 0) {
acks[0]++;
/**
* Checks whether an int array contains only zeros.
* @param values
* The array to check
* @return
* true only if the array contains only zeros
*/
private static boolean allZero(int[] values) {
for (int value : values) {
if (value!=0)
return false;
}
return (acks[0] == numberofOutputs);
return true;
}
/**
......@@ -294,12 +302,12 @@ public class FaultToleranceBuffer {
}
public long getTimeout() {
return this.timeout;
public long getTIMEOUT() {
return this.TIMEOUT;
}
public void setTimeout(long timeout) {
this.timeout = timeout;
public void setTIMEOUT(long TIMEOUT) {
this.TIMEOUT = TIMEOUT;
}
public Map<String, StreamRecord> getRecordBuffer() {
......@@ -331,11 +339,11 @@ public class FaultToleranceBuffer {
}
public int getNumberOfOutputs() {
return this.numberOfChannels;
return this.numberOfOutputs;
}
void setNumberOfOutputs(int numberOfOutputs) {
this.numberOfChannels = numberOfOutputs;
this.numberOfOutputs = numberOfOutputs;
}
}
......@@ -48,6 +48,7 @@ import eu.stratosphere.types.Key;
/**
* Object for building Stratosphere stream processing job graphs
*
*/
public class JobGraphBuilder {
......@@ -57,6 +58,7 @@ public class JobGraphBuilder {
private Map<String, Integer> numberOfInstances;
private Map<String, List<Integer>> numberOfOutputChannels;
/**
* Creates a new JobGraph with the given name
*
......@@ -67,8 +69,9 @@ public class JobGraphBuilder {
jobGraph = new JobGraph(jobGraphName);
components = new HashMap<String, AbstractJobVertex>();
numberOfInstances = new HashMap<String, Integer>();
numberOfOutputChannels = new HashMap<String, List<Integer>>();
numberOfOutputChannels=new HashMap<String, List<Integer>>();
log.debug("JobGraph created");
}
/**
......@@ -79,10 +82,13 @@ public class JobGraphBuilder {
* @param InvokableClass
* User defined class describing the source
*/
public void setSource(String sourceName, final Class<? extends UserSourceInvokable> InvokableClass) {
public void setSource(String sourceName,
final Class<? extends UserSourceInvokable> InvokableClass) {
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
source.setInputClass(StreamSource.class);
Configuration config = new TaskConfig(source.getConfiguration()).getConfiguration();
Configuration config = new TaskConfig(source.getConfiguration())
.getConfiguration();
config.setClass("userfunction", InvokableClass);
config.setString("componentName", sourceName);
components.put(sourceName, source);
......@@ -90,7 +96,6 @@ public class JobGraphBuilder {
log.debug("SOURCE: " + sourceName);
}
// TODO: eliminate code repetition
/**
* Adds a source component to the JobGraph
*
......@@ -101,11 +106,15 @@ public class JobGraphBuilder {
* @param parallelism
* Number of task instances of this type to run in parallel
*/
public void setSource(String sourceName, final Class<? extends UserSourceInvokable> InvokableClass, int parallelism) {
public void setSource(String sourceName,
final Class<? extends UserSourceInvokable> InvokableClass,
int parallelism) {
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
source.setInputClass(StreamSource.class);
source.setNumberOfSubtasks(parallelism);
Configuration config = new TaskConfig(source.getConfiguration()).getConfiguration();
Configuration config = new TaskConfig(source.getConfiguration())
.getConfiguration();
config.setClass("userfunction", InvokableClass);
config.setString("componentName", sourceName);
components.put(sourceName, source);
......@@ -121,10 +130,13 @@ public class JobGraphBuilder {
* @param InvokableClass
* User defined class describing the task
*/
public void setTask(String taskName, final Class<? extends UserTaskInvokable> InvokableClass) {
public void setTask(String taskName,
final Class<? extends UserTaskInvokable> InvokableClass) {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setTaskClass(StreamTask.class);
Configuration config = new TaskConfig(task.getConfiguration()).getConfiguration();
Configuration config = new TaskConfig(task.getConfiguration())
.getConfiguration();
config.setClass("userfunction", InvokableClass);
config.setString("componentName", taskName);
components.put(taskName, task);
......@@ -142,11 +154,15 @@ public class JobGraphBuilder {
* @param parallelism
* Number of task instances of this type to run in parallel
*/
public void setTask(String taskName, final Class<? extends UserTaskInvokable> InvokableClass, int parallelism) {
public void setTask(String taskName,
final Class<? extends UserTaskInvokable> InvokableClass,
int parallelism) {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setTaskClass(StreamTask.class);
task.setNumberOfSubtasks(parallelism);
Configuration config = new TaskConfig(task.getConfiguration()).getConfiguration();
Configuration config = new TaskConfig(task.getConfiguration())
.getConfiguration();
config.setClass("userfunction", InvokableClass);
config.setString("componentName", taskName);
components.put(taskName, task);
......@@ -162,10 +178,13 @@ public class JobGraphBuilder {
* @param InvokableClass
* User defined class describing the sink
*/
public void setSink(String sinkName, final Class<? extends UserSinkInvokable> InvokableClass) {
public void setSink(String sinkName,
final Class<? extends UserSinkInvokable> InvokableClass) {
final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
sink.setOutputClass(StreamSink.class);
Configuration config = new TaskConfig(sink.getConfiguration()).getConfiguration();
Configuration config = new TaskConfig(sink.getConfiguration())
.getConfiguration();
config.setClass("userfunction", InvokableClass);
config.setString("componentName", sinkName);
components.put(sinkName, sink);
......@@ -183,12 +202,15 @@ public class JobGraphBuilder {
* @param parallelism
* Number of task instances of this type to run in parallel
*/
public void setSink(String sinkName, final Class<? extends UserSinkInvokable> InvokableClass, int parallelism) {
public void setSink(String sinkName,
final Class<? extends UserSinkInvokable> InvokableClass,
int parallelism) {
final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
sink.setOutputClass(StreamSink.class);
sink.setNumberOfSubtasks(parallelism);
Configuration config = new TaskConfig(sink.getConfiguration()).getConfiguration();
Configuration config = new TaskConfig(sink.getConfiguration())
.getConfiguration();
config.setClass("userfunction", InvokableClass);
config.setString("componentName", sinkName);
components.put(sinkName, sink);
......@@ -210,22 +232,33 @@ public class JobGraphBuilder {
* @param channelType
* Channel Type
*/
private void connect(String upStreamComponentName, String downStreamComponentName,
Class<? extends ChannelSelector<StreamRecord>> PartitionerClass, ChannelType channelType) {
private void connect(String upStreamComponentName,
String downStreamComponentName,
Class<? extends ChannelSelector<StreamRecord>> PartitionerClass,
ChannelType channelType) {
AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
AbstractJobVertex upStreamComponent = components
.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components
.get(downStreamComponentName);
try {
upStreamComponent.connectTo(downStreamComponent, channelType);
Configuration config = new TaskConfig(upStreamComponent.getConfiguration()).getConfiguration();
config.setClass("partitionerClass_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
Configuration config = new TaskConfig(
upStreamComponent.getConfiguration()).getConfiguration();
config.setClass(
"partitionerClass_"
+ (upStreamComponent.getNumberOfForwardConnections()-1),
PartitionerClass);
log.debug("CONNECTED: " + PartitionerClass.getSimpleName() + " - " + upStreamComponentName + " -> "
+ downStreamComponentName);
log.debug("CONNECTED: "
+ PartitionerClass.getSimpleName() + " - "
+ upStreamComponentName + " -> " + downStreamComponentName);
} catch (JobGraphDefinitionException e) {
log.error("Cannot connect components with " + PartitionerClass.getSimpleName() + " : "
+ upStreamComponentName + " -> " + downStreamComponentName, e);
log.error(
"Cannot connect components with "
+ PartitionerClass.getSimpleName() + " : "
+ upStreamComponentName + " -> "
+ downStreamComponentName, e);
}
}
......@@ -241,9 +274,16 @@ public class JobGraphBuilder {
* Name of the downstream component, that will receive the
* records
*/
public void broadcastConnect(String upStreamComponentName, String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName, BroadcastPartitioner.class, ChannelType.INMEMORY);
addOutputChannels(upStreamComponentName, numberOfInstances.get(downStreamComponentName));
public void broadcastConnect(String upStreamComponentName,
String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName,
BroadcastPartitioner.class, ChannelType.INMEMORY);
addOutputChannels(upStreamComponentName,numberOfInstances.get(downStreamComponentName));
// log.debug("Components connected with broadcast: " +
// upStreamComponentName + " to " + downStreamComponentName);
}
/**
......@@ -263,32 +303,44 @@ public class JobGraphBuilder {
* @param keyClass
* Class of the key Value stored in the record
*/
public void fieldsConnect(String upStreamComponentName, String downStreamComponentName, int keyPosition,
public void fieldsConnect(String upStreamComponentName,
String downStreamComponentName, int keyPosition,
Class<? extends Key> keyClass) {
AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
AbstractJobVertex upStreamComponent = components
.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components
.get(downStreamComponentName);
try {
upStreamComponent.connectTo(downStreamComponent, ChannelType.INMEMORY);
upStreamComponent.connectTo(downStreamComponent,
ChannelType.INMEMORY);
Configuration config = new TaskConfig(upStreamComponent.getConfiguration()).getConfiguration();
Configuration config = new TaskConfig(
upStreamComponent.getConfiguration()).getConfiguration();
config.setClass("partitionerClass_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
config.setClass(
"partitionerClass_"
+ (upStreamComponent.getNumberOfForwardConnections()-1),
FieldsPartitioner.class);
config.setClass("partitionerClassParam_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
config.setClass(
"partitionerClassParam_"
+ (upStreamComponent.getNumberOfForwardConnections()-1),
keyClass);
config.setInteger("partitionerIntParam_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
config.setInteger(
"partitionerIntParam_"
+ (upStreamComponent.getNumberOfForwardConnections()-1),
keyPosition);
addOutputChannels(upStreamComponentName, 1);
log.debug("CONNECTED: FIELD PARTITIONING - " + upStreamComponentName + " -> " + downStreamComponentName
+ ", KEY: " + keyPosition);
addOutputChannels(upStreamComponentName,1);
log.debug("CONNECTED: FIELD PARTITIONING - " + upStreamComponentName
+ " -> " + downStreamComponentName + ", KEY: "
+ keyPosition);
} catch (JobGraphDefinitionException e) {
log.error(
"Cannot connect components by field: " + upStreamComponentName + " to " + downStreamComponentName,
log.error("Cannot connect components by field: "
+ upStreamComponentName + " to " + downStreamComponentName,
e);
}
}
......@@ -305,9 +357,14 @@ public class JobGraphBuilder {
* Name of the downstream component, that will receive the
* records
*/
public void globalConnect(String upStreamComponentName, String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName, GlobalPartitioner.class, ChannelType.INMEMORY);
addOutputChannels(upStreamComponentName, 1);
public void globalConnect(String upStreamComponentName,
String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName,
GlobalPartitioner.class, ChannelType.INMEMORY);
addOutputChannels(upStreamComponentName,1);
}
/**
......@@ -322,9 +379,13 @@ public class JobGraphBuilder {
* Name of the downstream component, that will receive the
* records
*/
public void shuffleConnect(String upStreamComponentName, String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName, ShufflePartitioner.class, ChannelType.INMEMORY);
addOutputChannels(upStreamComponentName, 1);
public void shuffleConnect(String upStreamComponentName,
String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName,
ShufflePartitioner.class, ChannelType.INMEMORY);
addOutputChannels(upStreamComponentName,1);
}
private void addOutputChannels(String upStreamComponentName, int numOfInstances) {
......@@ -338,25 +399,31 @@ public class JobGraphBuilder {
private void setNumberOfJobInputs() {
for (AbstractJobVertex component : components.values()) {
component.getConfiguration().setInteger("numberOfInputs", component.getNumberOfBackwardConnections());
component.getConfiguration().setInteger("numberOfInputs",
component.getNumberOfBackwardConnections());
}
}
private void setNumberOfJobOutputs() {
for (AbstractJobVertex component : components.values()) {
component.getConfiguration().setInteger("numberOfOutputs", component.getNumberOfForwardConnections());
component.getConfiguration().setInteger("numberOfOutputs",
component.getNumberOfForwardConnections());
}
for (String component : numberOfOutputChannels.keySet()) {
Configuration config = components.get(component).getConfiguration();
List<Integer> channelNumList = numberOfOutputChannels.get(component);
for (int i = 0; i < channelNumList.size(); i++) {
config.setInteger("channels_" + i, channelNumList.get(i));
List<Integer> channelNumList=numberOfOutputChannels.get(component);
for(int i=0;i<channelNumList.size();i++){
config.setInteger("channels_"+i,channelNumList.get(i));
}
}
}
/**
*
* @return The JobGraph object
*/
public JobGraph getJobGraph() {
......@@ -364,4 +431,5 @@ public class JobGraphBuilder {
setNumberOfJobOutputs();
return jobGraph;
}
}
......@@ -19,6 +19,7 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.types.StringValue;
public class DefaultSinkInvokable extends UserSinkInvokable {
@Override
public void invoke(StreamRecord record) throws Exception {
StringValue value = (StringValue) record.getField(0, 0);
......
......@@ -18,6 +18,7 @@ package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class DefaultTaskInvokable extends UserTaskInvokable {
@Override
public void invoke(StreamRecord record) throws Exception {
emit(record);
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api.invokable;
public interface Invokable {
public void invoke() throws Exception;
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public interface RecordInvokable {
public void invoke(StreamRecord record) throws Exception;
}
......@@ -15,10 +15,14 @@
package eu.stratosphere.streaming.api.invokable;
public abstract class UserSinkInvokable implements RecordInvokable {
//TODO: consider moving this to an interface
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public abstract class UserSinkInvokable {
public abstract void invoke(StreamRecord record) throws Exception;
public String getResult() {
return "Override getResult() to pass your own results";
}
}
\ No newline at end of file
......@@ -17,5 +17,8 @@ package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.streaming.api.streamcomponent.StreamInvokableComponent;
public abstract class UserSourceInvokable extends StreamInvokableComponent implements Invokable {
public abstract class UserSourceInvokable extends StreamInvokableComponent {
public void invoke() throws Exception {
}
}
......@@ -16,6 +16,10 @@
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.streaming.api.streamcomponent.StreamInvokableComponent;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public abstract class UserTaskInvokable extends StreamInvokableComponent implements RecordInvokable {
}
\ No newline at end of file
public abstract class UserTaskInvokable extends StreamInvokableComponent {
public void invoke(StreamRecord record) throws Exception {
}
}
/***********************************************************************************************************************
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
......
......@@ -146,7 +146,7 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
return userFunction;
}
// TODO find a better solution for this
// TODO: use TCP-like waiting
public void threadSafePublish(AbstractTaskEvent event,
RecordReader<StreamRecord> input) throws InterruptedException,
IOException {
......
......@@ -44,6 +44,7 @@ public abstract class StreamInvokableComponent {
}
public final void emit(StreamRecord record) {
record.setId(channelID);
emittedRecords.addRecord(record);
try {
......@@ -57,13 +58,19 @@ public abstract class StreamInvokableComponent {
}
}
//TODO: Add fault tolerance
//TODO:Add fault tolreance
public final void emit(StreamRecord record, int outputChannel) {
record.setId(channelID);
try {
outputs.get(outputChannel).emit(record);
} catch (Exception e) {
log.warn("EMIT ERROR: " + e.getMessage() + " -- " + name);
}
}
}
......@@ -58,7 +58,6 @@ public class StreamSink extends AbstractOutputTask {
userFunction = streamSinkHelper.getUserFunction(taskConfiguration);
}
//TODO: eliminate code repetition (StreamTask)
@Override
public void invoke() throws Exception {
log.debug("SINK " + name + " invoked");
......
......@@ -84,7 +84,6 @@ public class StreamTask extends AbstractTask {
streamTaskHelper.setFailListener(recordBuffer, taskInstanceID, outputs);
}
//TODO: eliminate code repetition (StreamSink)
@Override
public void invoke() throws Exception {
log.debug("TASK " + name + " invoked with instance id " + taskInstanceID);
......@@ -97,6 +96,7 @@ public class StreamTask extends AbstractTask {
hasInput = true;
StreamRecord streamRecord = input.next();
String id = streamRecord.getId();
// TODO create method for concurrent publishing
try {
userFunction.invoke(streamRecord);
streamTaskHelper.threadSafePublish(new AckEvent(id), input);
......@@ -108,7 +108,7 @@ public class StreamTask extends AbstractTask {
}
}
}
log.debug("TASK " + name + " invoke finished with instance id " + taskInstanceID);
log.debug("TASK " + name + "invoke finished with instance id " + taskInstanceID);
}
}
......@@ -16,6 +16,7 @@
package eu.stratosphere.streaming.api.streamrecord;
public class NoSuchFieldException extends StreamRecordException {
private static final long serialVersionUID = 3604681465275112784L;
}
......@@ -105,7 +105,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
return numOfRecords;
}
// TODO: use UUID
/**
* Set the ID of the StreamRecord object
*
......@@ -346,19 +345,18 @@ public class StreamRecord implements IOReadableWritable, Serializable {
// TODO: fix this method to work properly for non StringValue types
public String toString() {
StringBuilder outputString = new StringBuilder("(");
StringBuilder outputString = new StringBuilder();
StringValue output;
for (int k = 0; k < numOfRecords; ++k) {
for (int i = 0; i < numOfFields; i++) {
try {
output = (StringValue) recordBatch.get(k)[i];
outputString.append(output.getValue() + ",");
outputString.append(output.getValue() + "*");
} catch (ClassCastException e) {
outputString.append("NON-STRING,");
outputString.append("NON-STRING*");
}
}
}
outputString.append(")");
return outputString.toString();
}
......
......@@ -22,10 +22,12 @@ public class BroadcastPartitioner implements ChannelSelector<StreamRecord> {
@Override
public int[] selectChannels(StreamRecord record, int numberOfOutputChannels) {
int[] returnChannels = new int[numberOfOutputChannels];
for (int i = 0; i < numberOfOutputChannels; i++) {
returnChannels[i] = i;
}
return returnChannels;
}
}
\ No newline at end of file
......@@ -22,7 +22,10 @@ public class DefaultPartitioner implements ChannelSelector<StreamRecord> {
@Override
public int[] selectChannels(StreamRecord record, int numberOfOutputChannels) {
return new ShufflePartitioner().selectChannels(record,
numberOfOutputChannels);
}
}
......@@ -47,12 +47,19 @@ public class CellInfo {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("infoSource", InfoSourceInvokable.class);
// graphBuilder.setSource("infoSource2", InfoSourceInvokable.class);
graphBuilder.setSource("querySource", QuerySourceInvokable.class);
// graphBuilder.setSource("querySource2", QuerySourceInvokable.class);
graphBuilder.setTask("cellTask", CellTaskInvokable.class, 3);
graphBuilder.setSink("sink", CellSinkInvokable.class);
graphBuilder.fieldsConnect("infoSource", "cellTask", 0, IntValue.class);
graphBuilder.fieldsConnect("querySource", "cellTask", 0, IntValue.class);
graphBuilder
.fieldsConnect("querySource", "cellTask", 0, IntValue.class);
// graphBuilder.fieldsConnect("infoSource2", "cellTask", 0,
// IntValue.class);
// graphBuilder.fieldsConnect("querySource2", "cellTask",0,
// IntValue.class);
graphBuilder.shuffleConnect("cellTask", "sink");
return graphBuilder.getJobGraph();
......@@ -63,7 +70,8 @@ public class CellInfo {
NepheleMiniCluster exec = new NepheleMiniCluster();
try {
File file = new File("target/stratosphere-streaming-0.5-SNAPSHOT.jar");
File file = new File(
"target/stratosphere-streaming-0.5-SNAPSHOT.jar");
JobWithJars.checkJarFile(file);
JobGraph jG = getJobGraph();
......@@ -76,7 +84,8 @@ public class CellInfo {
// 6498),
// configuration);
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123), configuration);
Client client = new Client(new InetSocketAddress(
"hadoop02.ilab.sztaki.hu", 6123), configuration);
exec.start();
client.run(jG, true);
exec.stop();
......
......@@ -20,16 +20,16 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class CellSinkInvokable extends UserSinkInvokable {
int counter = 0;
int c=0;
@Override
public void invoke(StreamRecord record) throws Exception {
@Override
public void invoke(StreamRecord record) throws Exception {
counter++;
}
@Override
public String getResult() {
return String.valueOf(counter);
}
c++;
}
@Override
public String getResult(){
return String.valueOf(c);
}
}
......@@ -40,8 +40,6 @@ public class CellTaskInvokable extends UserTaskInvokable {
timeStamp = (LongValue) record.getField(1);
numOfFields=record.getNumOfFields();
//TODO: consider adding source to StreamRecord as a workaround
// INFO
if (numOfFields == 2) {
engine.put(cellID.getValue(), timeStamp.getValue());
......
......@@ -17,5 +17,6 @@ package eu.stratosphere.streaming.test.cellinfo;
public interface IWorkerEngine {
public int get(long timeStamp, long lastMillis, int cellId);
public void put(int cellId, long timeStamp);
}
......@@ -23,9 +23,9 @@ import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue;
public class InfoSourceInvokable extends UserSourceInvokable {
Random rand = new Random();
int cellNumber = 10;
Random _rand= new Random();
int _cellNumber=10;
private IntValue cellId = new IntValue(5);
private LongValue timeStamp = new LongValue(500);
......@@ -34,11 +34,11 @@ public class InfoSourceInvokable extends UserSourceInvokable {
@Override
public void invoke() throws Exception {
for (int i = 0; i < 500000; i++) {
cellId.setValue(rand.nextInt(cellNumber));
cellId.setValue(_rand.nextInt(_cellNumber));
timeStamp.setValue(System.currentTimeMillis());
record.setRecord(cellId, timeStamp);
record.setRecord(cellId,timeStamp);
emit(record);
}
}
......
......@@ -24,13 +24,13 @@ import eu.stratosphere.types.LongValue;
public class QuerySourceInvokable extends UserSourceInvokable {
Random _rand = new Random();
int _cellNumber = 10;
Random _rand= new Random();
int _cellNumber=10;
private IntValue cellId = new IntValue(5);
private LongValue timeStamp = new LongValue(500);
private IntValue lastMillis = new IntValue(100);
private StreamRecord record = new StreamRecord(cellId, timeStamp, lastMillis);
private StreamRecord record = new StreamRecord(cellId, timeStamp,lastMillis);
@Override
public void invoke() throws Exception {
......@@ -38,11 +38,12 @@ public class QuerySourceInvokable extends UserSourceInvokable {
Thread.sleep(1);
cellId.setValue(_rand.nextInt(_cellNumber));
timeStamp.setValue(System.currentTimeMillis());
record.setRecord(cellId, timeStamp, lastMillis);
record.setRecord(cellId,timeStamp,lastMillis);
emit(record);
}
}
}
......@@ -16,11 +16,11 @@
package eu.stratosphere.streaming.test.cellinfo;
public class Util {
public static int mod(int x, int y) {
int result = x % y;
if (result < 0) {
result += y;
}
return result;
}
public static int mod(int x, int y) {
int result = x % y;
if (result < 0) {
result += y;
}
return result;
}
}
......@@ -25,7 +25,8 @@ public class WorkerEngineBin implements java.io.Serializable, IWorkerEngine {
private int[][] counters_;
private int pointer_;
public WorkerEngineBin(long unitLength, int numOfCells, int bufferInterval, long currentTime) {
public WorkerEngineBin(long unitLength, int numOfCells, int bufferInterval,
long currentTime) {
lastTimeUpdated_ = currentTime / unitLength * unitLength;
unitLength_ = unitLength;
counters_ = new int[(int) (bufferInterval / unitLength) + 1][numOfCells];
......@@ -63,11 +64,15 @@ public class WorkerEngineBin implements java.io.Serializable, IWorkerEngine {
int shift = refresh(timeStamp);
int numOfLastIntervals = (int) (lastMillis / unitLength_);
if (shift >= counters_.length || numOfLastIntervals >= counters_.length) {
// System.out.println(counters_.length);
// System.out.println(shift);
// System.out.println(numOfLastIntervals);
return -1;
}
int sum = 0;
for (int i = shift + 1; i < shift + numOfLastIntervals + 1; ++i) {
sum += getCell(i, cellId);
// System.out.println(i + " " + getCell(i, cellId));
}
return sum;
}
......@@ -78,10 +83,12 @@ public class WorkerEngineBin implements java.io.Serializable, IWorkerEngine {
int retVal;
if (shiftBy > 0) {
lastTimeUpdated_ = timeStamp / unitLength_ * unitLength_;
// System.out.println(lastTimeUpdated_);
retVal = 0;
} else {
retVal = -shiftBy;
}
// System.out.println("Shiftby " + shiftBy + " at " + timeStamp);
return retVal;
}
......@@ -90,5 +97,7 @@ public class WorkerEngineBin implements java.io.Serializable, IWorkerEngine {
if (shift >= counters_.length)
return;
incrCell(shift, cellId);
// System.out.println("Pointer:" + pointer_);
// System.out.println(getCell(shift, cellId));
}
}
......@@ -26,7 +26,8 @@ public class WorkerEngineExact implements java.io.Serializable, IWorkerEngine {
private TreeMap<Long, Integer>[] counters_;
@SuppressWarnings("unchecked")
public WorkerEngineExact(int numOfCells, int bufferInterval, long currentTime) {
public WorkerEngineExact(int numOfCells, int bufferInterval,
long currentTime) {
lastTimeUpdated_ = currentTime;
bufferInterval_ = bufferInterval;
counters_ = new TreeMap[numOfCells];
......@@ -37,7 +38,8 @@ public class WorkerEngineExact implements java.io.Serializable, IWorkerEngine {
public int get(long timeStamp, long lastMillis, int cellId) {
refresh(timeStamp);
Map<Long, Integer> subMap = counters_[cellId].subMap(timeStamp - lastMillis, true, timeStamp, false);
Map<Long, Integer> subMap = counters_[cellId].subMap(timeStamp
- lastMillis, true, timeStamp, false);
int retVal = 0;
for (Map.Entry<Long, Integer> entry : subMap.entrySet()) {
retVal += entry.getValue();
......@@ -48,6 +50,7 @@ public class WorkerEngineExact implements java.io.Serializable, IWorkerEngine {
public void put(int cellId, long timeStamp) {
refresh(timeStamp);
TreeMap<Long, Integer> map = counters_[cellId];
// System.out.println(map.size());
if (map.containsKey(timeStamp)) {
map.put(timeStamp, map.get(timeStamp) + 1);
} else {
......@@ -57,11 +60,16 @@ public class WorkerEngineExact implements java.io.Serializable, IWorkerEngine {
public void refresh(long timeStamp) {
if (timeStamp - lastTimeUpdated_ > bufferInterval_) {
// System.out.println("Refresh at " + timeStamp);
for (int i = 0; i < counters_.length; ++i) {
for (Iterator<Map.Entry<Long, Integer>> it = counters_[i].entrySet().iterator(); it.hasNext();) {
for (Iterator<Map.Entry<Long, Integer>> it = counters_[i]
.entrySet().iterator(); it.hasNext();) {
Map.Entry<Long, Integer> entry = it.next();
long time = entry.getKey();
if (timeStamp - time > bufferInterval_) {
// System.out.println("Remove: " + i + "_" + time +
// " at " +
// timeStamp);
it.remove();
} else {
break;
......
......@@ -29,7 +29,6 @@ public class WordCountCounter extends UserTaskInvokable {
private StringValue wordValue = new StringValue();
private IntValue countValue = new IntValue();
private String word = new String();
private StreamRecord streamRecord = new StreamRecord();
private int count = 1;
private int i = 0;
private long time;
......@@ -55,8 +54,6 @@ public class WordCountCounter extends UserTaskInvokable {
countValue.setValue(1);
}
// TODO: object reuse
// streamRecord.addRecord(wordValue, countValue);
// emit(streamRecord.copy());
emit(new StreamRecord(wordValue, countValue));
}
}
......@@ -25,10 +25,12 @@ public class WordCountDummySource extends UserSourceInvokable {
StreamRecord record = new StreamRecord(lineValue);
public WordCountDummySource() {
}
@Override
public void invoke() throws Exception {
for (int i = 0; i < 1000; i++) {
if (i % 2 == 0) {
lineValue.setValue("Gyula Marci");
......@@ -39,4 +41,4 @@ public class WordCountDummySource extends UserSourceInvokable {
emit(record);
}
}
}
\ No newline at end of file
}
......@@ -25,8 +25,9 @@ public class WordCountDummySource2 extends UserSourceInvokable {
StreamRecord record = new StreamRecord(lineValue);
private long time;
private long prevTime = System.currentTimeMillis();
public WordCountDummySource2() {
}
@Override
......@@ -34,11 +35,11 @@ public class WordCountDummySource2 extends UserSourceInvokable {
for (int i = 0; i < 1000000; i++) {
if (i % 50000 == 0) {
time = System.currentTimeMillis();
System.out.println("Source:\t\t" + i + "\t\tTime: " + (time - prevTime));
prevTime = time;
time= System.currentTimeMillis();
System.out.println("Source:\t" + i + "\t----Time: "+(time-prevTime));
prevTime=time;
}
if (i % 2 == 0) {
lineValue.setValue("Gyula Marci");
} else {
......
......@@ -37,17 +37,18 @@ public class WordCountLocal {
graphBuilder.setSink("WordCountSink", WordCountSink.class);
graphBuilder.shuffleConnect("WordCountSource", "WordCountSplitter");
graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0, StringValue.class);
graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0,
StringValue.class);
graphBuilder.shuffleConnect("WordCountCounter", "WordCountSink");
return graphBuilder.getJobGraph();
}
//TODO: arguments check
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
......@@ -61,14 +62,18 @@ public class WordCountLocal {
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
Client client = new Client(new InetSocketAddress("localhost",
6498), configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123), configuration);
Client client = new Client(new InetSocketAddress(
"hadoop02.ilab.sztaki.hu", 6123), configuration);
client.run(jG, true);
}
......
......@@ -39,7 +39,8 @@ public class WordCountRemote {
graphBuilder.setSink("WordCountSink", WordCountSink.class);
graphBuilder.shuffleConnect("WordCountSource", "WordCountSplitter");
graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0, StringValue.class);
graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0,
StringValue.class);
graphBuilder.shuffleConnect("WordCountCounter", "WordCountSink");
return graphBuilder.getJobGraph();
......@@ -49,7 +50,9 @@ public class WordCountRemote {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
File file = new File("target/stratosphere-streaming-0.5-SNAPSHOT.jar");
File file = new File(
"target/stratosphere-streaming-0.5-SNAPSHOT.jar");
JobWithJars.checkJarFile(file);
JobGraph jG = getJobGraph();
......@@ -57,8 +60,12 @@ public class WordCountRemote {
jG.addJar(new Path(file.getAbsolutePath()));
Configuration configuration = jG.getJobConfiguration();
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123), configuration);
Client client = new Client(new InetSocketAddress(
"hadoop02.ilab.sztaki.hu", 6123), configuration);
client.run(jG, true);
} catch (Exception e) {
System.out.println(e);
}
......
......@@ -20,23 +20,22 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WordCountSink extends UserSinkInvokable {
int nrOfRecords = 0;
int nrOfRecords=0;
private long time;
private long prevTime = System.currentTimeMillis();
@Override
public void invoke(StreamRecord record) throws Exception {
nrOfRecords++;
if (nrOfRecords % 50000 == 0) {
time = System.currentTimeMillis();
System.out.println("Sink:\t" + nrOfRecords + "\t----Time: " + (time - prevTime));
prevTime = time;
time= System.currentTimeMillis();
System.out.println("Sink:\t" + nrOfRecords + "\t----Time: "+(time-prevTime));
prevTime=time;
}
}
@Override
public String getResult() {
public String getResult(){
return String.valueOf(nrOfRecords);
}
}
......@@ -33,7 +33,8 @@ public class WordCountSource extends UserSourceInvokable {
public WordCountSource() {
try {
br = new BufferedReader(new FileReader("src/test/resources/testdata/hamlet.txt"));
br = new BufferedReader(new FileReader(
"src/test/resources/testdata/hamlet.txt"));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
......
......@@ -34,8 +34,9 @@ public class WordCountSplitter extends UserTaskInvokable {
i++;
if (i % 50000 == 0) {
time = System.currentTimeMillis();
System.out.println("Splitter:\t" + i + "\t----Time: " + (time - prevTime));
prevTime = time;
System.out.println("Splitter:\t" + i + "\t----Time: "
+ (time - prevTime));
prevTime=time;
}
sentence = (StringValue) record.getRecord(0)[0];
words = sentence.getValue().split(" ");
......
......@@ -230,7 +230,7 @@ public class FaultToleranceBufferTest {
// TODO: create more tests for this method
@Test
public void testTimeOutRecords() {
faultTolerancyBuffer.setTimeout(1000);
faultTolerancyBuffer.setTIMEOUT(1000);
StreamRecord record1 = (new StreamRecord(1)).setId("1");
record1.addRecord(new StringValue("V1"));
......
......@@ -12,7 +12,6 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api.streamcomponent;
import static org.junit.Assert.assertEquals;
......
......@@ -32,17 +32,20 @@ public class StreamRecordTest {
@Test
public void singleRecordSetGetTest() {
StreamRecord record = new StreamRecord(new StringValue("Stratosphere"), new IntValue(1));
StreamRecord record = new StreamRecord(new StringValue("Stratosphere"),
new IntValue(1));
assertEquals(2, record.getNumOfFields());
assertEquals(1, record.getNumOfRecords());
assertEquals("Stratosphere", ((StringValue) record.getField(0)).getValue());
assertEquals("Stratosphere",
((StringValue) record.getField(0)).getValue());
assertEquals(1, ((IntValue) record.getField(1)).getValue());
record.setField(1, new StringValue("Big Data"));
assertEquals("Big Data", ((StringValue) record.getField(1)).getValue());
record.setRecord(new IntValue(2), new StringValue("Big Data looks tiny from here."));
record.setRecord(new IntValue(2), new StringValue(
"Big Data looks tiny from here."));
assertEquals(2, record.getNumOfFields());
assertEquals(1, record.getNumOfRecords());
assertEquals(2, ((IntValue) record.getField(0)).getValue());
......@@ -79,9 +82,11 @@ public class StreamRecordTest {
public void copyTest() {
StreamRecord a = new StreamRecord(new StringValue("Big"));
StreamRecord b = a.copy();
assertTrue(((StringValue) a.getField(0)).getValue().equals(((StringValue) b.getField(0)).getValue()));
assertTrue(((StringValue) a.getField(0)).getValue().equals(
((StringValue) b.getField(0)).getValue()));
b.setRecord(new StringValue("Data"));
assertFalse(((StringValue) a.getField(0)).getValue().equals(((StringValue) b.getField(0)).getValue()));
assertFalse(((StringValue) a.getField(0)).getValue().equals(
((StringValue) b.getField(0)).getValue()));
}
@Test
......@@ -94,7 +99,8 @@ public class StreamRecordTest {
}
try {
a.setRecord(new StringValue("Data"), new StringValue("Stratosphere"));
a.setRecord(new StringValue("Data"),
new StringValue("Stratosphere"));
fail();
} catch (RecordSizeMismatchException e) {
}
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.partitioner;
import static org.junit.Assert.assertArrayEquals;
......@@ -38,7 +23,7 @@ public class BroadcastPartitionerTest {
int[] first = new int[] { 0 };
int[] second = new int[] { 0, 1 };
int[] sixth = new int[] { 0, 1, 2, 3, 4, 5 };
assertArrayEquals(first, broadcastPartitioner.selectChannels(streamRecord, 1));
assertArrayEquals(second, broadcastPartitioner.selectChannels(streamRecord, 2));
assertArrayEquals(sixth, broadcastPartitioner.selectChannels(streamRecord, 6));
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.partitioner;
import static org.junit.Assert.assertEquals;
......@@ -37,9 +22,12 @@ public class DefaultPartitionerTest {
@Test
public void testSelectChannelsLength() {
assertEquals(1, defaultPartitioner.selectChannels(streamRecord, 1).length);
assertEquals(1, defaultPartitioner.selectChannels(streamRecord, 2).length);
assertEquals(1, defaultPartitioner.selectChannels(streamRecord, 1024).length);
assertEquals(1,
defaultPartitioner.selectChannels(streamRecord, 1).length);
assertEquals(1,
defaultPartitioner.selectChannels(streamRecord, 2).length);
assertEquals(1,
defaultPartitioner.selectChannels(streamRecord, 1024).length);
}
@Test
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.partitioner;
import static org.junit.Assert.assertArrayEquals;
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.partitioner;
import static org.junit.Assert.assertArrayEquals;
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.partitioner;
import static org.junit.Assert.assertEquals;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册