提交 839d46ce 编写于 作者: Y Yingjun Wu 提交者: Stephan Ewen

[streaming] adapt to new APIs

上级 d3ba0b3c
......@@ -87,7 +87,12 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>org.jblas</groupId>
<artifactId>jblas</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
......
......@@ -36,6 +36,7 @@ public class DataStream<T extends Tuple> {
List<String> connectIDs;
List<ConnectionType> ctypes;
List<Integer> cparams;
List<Integer> batchSizes;
protected DataStream() {
// TODO implement
......@@ -63,16 +64,33 @@ public class DataStream<T extends Tuple> {
ctypes.add(ConnectionType.SHUFFLE);
cparams = new ArrayList<Integer>();
cparams.add(0);
batchSizes = new ArrayList<Integer>();
batchSizes.add(1);
}
public String getId() {
return id;
}
public DataStream<T> batch(int batchSize) {
if (batchSize < 1) {
throw new IllegalArgumentException("Batch size must be positive.");
}
for (int i = 0; i < batchSizes.size(); i++) {
batchSizes.set(i, batchSize);
}
context.setBatchSize(this);
return this;
}
public DataStream<T> connectWith(DataStream<T> stream) {
connectIDs.addAll(stream.connectIDs);
ctypes.addAll(stream.ctypes);
cparams.addAll(stream.cparams);
batchSizes.addAll(stream.batchSizes);
return this;
}
......
......@@ -65,6 +65,7 @@ public class JobGraphBuilder {
protected int maxParallelism;
protected FaultToleranceType faultToleranceType;
private int batchSize;
private long batchTimeout;
/**
* Creates a new JobGraph with the given name
......@@ -85,7 +86,6 @@ public class JobGraphBuilder {
log.debug("JobGraph created");
}
this.faultToleranceType = faultToleranceType;
batchSize = 1;
}
/**
......@@ -99,9 +99,11 @@ public class JobGraphBuilder {
this(jobGraphName, FaultToleranceType.NONE);
}
public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType, int batchSize) {
public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType,
int defaultBatchSize, long defaultBatchTimeoutMillis) {
this(jobGraphName, faultToleranceType);
this.batchSize = batchSize;
this.batchSize = defaultBatchSize;
this.batchTimeout = defaultBatchTimeoutMillis;
}
/**
......@@ -247,7 +249,6 @@ public class JobGraphBuilder {
* @param component
* AbstractJobVertex associated with the component
*/
private Configuration setComponent(String componentName,
final Class<? extends StreamComponent> InvokableClass, int parallelism,
int subtasksPerInstance, AbstractJobVertex component) {
......@@ -263,6 +264,7 @@ public class JobGraphBuilder {
config.setClass("userfunction", InvokableClass);
config.setString("componentName", componentName);
config.setInteger("batchSize", batchSize);
config.setLong("batchTimeout", batchTimeout);
// config.setBytes("operator", getSerializedFunction());
config.setInteger("faultToleranceType", faultToleranceType.id);
......@@ -302,6 +304,11 @@ public class JobGraphBuilder {
return config;
}
public void setBatchSize(String componentName, int batchSize) {
Configuration config = components.get(componentName).getConfiguration();
config.setInteger("batchSize", batchSize);
}
/**
* Adds serialized invokable object to the JobVertex configuration
*
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import java.io.Serializable;
import eu.stratosphere.api.java.tuple.Tuple;
public abstract class SinkFunction<IN extends Tuple> implements Serializable {
private static final long serialVersionUID = 1L;
public abstract void invoke(IN tuple);
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import java.io.Serializable;
import eu.stratosphere.api.java.tuple.Tuple;
public abstract class SinkFunction<IN extends Tuple> implements Serializable {
private static final long serialVersionUID = 1L;
public abstract void invoke(IN tuple);
}
......@@ -28,37 +28,54 @@ public class StreamCollector<T extends Tuple> implements Collector<T> {
protected StreamRecord streamRecord;
protected int batchSize;
protected long batchTimeout;
protected int counter = 0;
protected int channelID;
private long timeOfLastRecordEmitted = System.currentTimeMillis();;
private List<RecordWriter<StreamRecord>> outputs;
public StreamCollector(int batchSize, int channelID,
public StreamCollector(int batchSize, long batchTimeout, int channelID,
SerializationDelegate<Tuple> serializationDelegate,
List<RecordWriter<StreamRecord>> outputs) {
this.batchSize = batchSize;
this.batchTimeout = batchTimeout;
this.streamRecord = new ArrayStreamRecord(batchSize);
this.streamRecord.setSeralizationDelegate(serializationDelegate);
this.channelID = channelID;
this.outputs = outputs;
}
public StreamCollector(int batchSize, int channelID,
public StreamCollector(int batchSize, long batchTimeout, int channelID,
SerializationDelegate<Tuple> serializationDelegate) {
this(batchSize, channelID, serializationDelegate, null);
this(batchSize, batchTimeout, channelID, serializationDelegate, null);
}
// TODO reconsider emitting mechanism at timeout (find a place to timeout)
@Override
public void collect(T tuple) {
streamRecord.setTuple(counter, StreamRecord.copyTuple(tuple));
counter++;
if (counter >= batchSize) {
counter = 0;
streamRecord.setId(channelID);
emit(streamRecord);
// timeOfLastRecordEmitted = System.currentTimeMillis();
} else {
// timeout();
}
}
public void timeout() {
if (timeOfLastRecordEmitted + batchTimeout < System.currentTimeMillis()) {
StreamRecord truncatedRecord = new ArrayStreamRecord(streamRecord, counter);
emit(truncatedRecord);
timeOfLastRecordEmitted = System.currentTimeMillis();
}
}
private void emit(StreamRecord streamRecord) {
counter = 0;
streamRecord.setId(channelID);
if (outputs == null) {
System.out.println(streamRecord);
} else {
......
......@@ -32,16 +32,20 @@ import eu.stratosphere.util.Collector;
//TODO: figure out generic dummysink
public class StreamExecutionEnvironment {
JobGraphBuilder jobGraphBuilder;
public StreamExecutionEnvironment(int batchSize) {
if (batchSize < 1) {
public StreamExecutionEnvironment(int defaultBatchSize, long defaultBatchTimeoutMillis) {
if (defaultBatchSize < 1) {
throw new IllegalArgumentException("Batch size must be positive.");
}
jobGraphBuilder = new JobGraphBuilder("jobGraph", FaultToleranceType.NONE, batchSize);
if (defaultBatchTimeoutMillis < 1) {
throw new IllegalArgumentException("Batch timeout must be positive.");
}
jobGraphBuilder = new JobGraphBuilder("jobGraph", FaultToleranceType.NONE,
defaultBatchSize, defaultBatchTimeoutMillis);
}
public StreamExecutionEnvironment() {
this(1);
this(1, 1000);
}
private static class DummySource extends UserSourceInvokable<Tuple1<String>> {
......@@ -54,11 +58,19 @@ public class StreamExecutionEnvironment {
}
}
}
public static enum ConnectionType {
SHUFFLE, BROADCAST, FIELD
}
public <T extends Tuple> void setBatchSize(DataStream<T> inputStream) {
for (int i = 0; i < inputStream.connectIDs.size(); i++) {
jobGraphBuilder.setBatchSize(inputStream.connectIDs.get(i),
inputStream.batchSizes.get(i));
}
}
private <T extends Tuple> void connectGraph(DataStream<T> inputStream, String outputID) {
for (int i = 0; i < inputStream.connectIDs.size(); i++) {
......@@ -80,8 +92,10 @@ public class StreamExecutionEnvironment {
}
}
public <T extends Tuple, R extends Tuple> DataStream<R> addFunction(String functionName, DataStream<T> inputStream, final AbstractFunction function, UserTaskInvokable<T, R> functionInvokable) {
public <T extends Tuple, R extends Tuple> DataStream<R> addFunction(String functionName,
DataStream<T> inputStream, final AbstractFunction function,
UserTaskInvokable<T, R> functionInvokable) {
DataStream<R> returnStream = new DataStream<R>(this);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
......@@ -93,14 +107,13 @@ public class StreamExecutionEnvironment {
e.printStackTrace();
}
jobGraphBuilder.setTask(returnStream.getId(), functionInvokable,
functionName, baos.toByteArray());
jobGraphBuilder.setTask(returnStream.getId(), functionInvokable, functionName,
baos.toByteArray());
connectGraph(inputStream, returnStream.getId());
return returnStream;
}
public <T extends Tuple> DataStream<T> addSink(DataStream<T> inputStream,
SinkFunction<T> sinkFunction) {
......@@ -164,8 +177,8 @@ public class StreamExecutionEnvironment {
public DataStream<Tuple1<String>> readTextFile(String path) {
return addSource(new FileSourceFunction(path));
}
}
public DataStream<Tuple1<String>> addDummySource() {
DataStream<Tuple1<String>> returnStream = new DataStream<Tuple1<String>>(this);
......
......@@ -107,8 +107,11 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
public StreamCollector<Tuple> setCollector(Configuration taskConfiguration, int id,
List<RecordWriter<StreamRecord>> outputs) {
int batchSize = taskConfiguration.getInteger("batchSize", -1);
collector = new StreamCollector<Tuple>(batchSize, id, outSerializationDelegate, outputs);
int batchSize = taskConfiguration.getInteger("batchSize", 1);
long batchTimeout = taskConfiguration.getLong("batchTimeout", 1000);
collector = new StreamCollector<Tuple>(batchSize, batchTimeout, id,
outSerializationDelegate, outputs);
return collector;
}
......@@ -121,7 +124,7 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
try {
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(operatorBytes));
Object function = in.readObject();
if (operatorName.equals("flatMap")) {
setSerializer(function, FlatMapFunction.class);
} else if (operatorName.equals("map")) {
......@@ -151,21 +154,21 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
}
private void setSerializer(Object function, Class<? extends AbstractFunction> clazz) {
inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz,
function.getClass(), 0, null, null);
inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
0, null, null);
inTupleSerializer = inTupleTypeInfo.createSerializer();
inDeserializationDelegate = new DeserializationDelegate<Tuple>(inTupleSerializer);
outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz,
function.getClass(), 1, null, null);
outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
1, null, null);
outTupleSerializer = outTupleTypeInfo.createSerializer();
outSerializationDelegate = new SerializationDelegate<Tuple>(outTupleSerializer);
}
public AbstractRecordReader getConfigInputs(T taskBase, Configuration taskConfiguration)
throws StreamComponentException {
int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
......
......@@ -15,20 +15,21 @@
package eu.stratosphere.streaming.api.streamcomponent;
import eu.stratosphere.streaming.api.StreamCollector;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.ArrayStreamRecord;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import java.util.ArrayList;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.SlidingWindowState;
import eu.stratosphere.util.Collector;
public class StreamWindowTask extends UserTaskInvokable {
public class StreamWindowTask extends FlatMapFunction<Tuple, Tuple> {
private static final long serialVersionUID = 1L;
private int computeGranularity;
private int windowFieldId = 1;
private StreamRecord tempRecord;
private ArrayList tempArrayList;
private SlidingWindowState<Integer> window;
private MutableTableState<String, Integer> sum;
private long initTimestamp = -1;
......@@ -44,43 +45,42 @@ public class StreamWindowTask extends UserTaskInvokable {
sum.put("sum", 0);
}
private void incrementCompute(StreamRecord record){}
private void decrementCompute(StreamRecord record){}
private void produceRecord(long progress){}
private void incrementCompute(ArrayList tupleArray) {}
private void decrementCompute(ArrayList tupleArray) {}
private void produceOutput(long progress, Collector out) {}
@Override
public void invoke(StreamRecord record, StreamCollector collector) throws Exception {
int numTuple = record.getBatchSize();
int tupleIndex = 0;
for (int i = 0; i < numTuple; ++i) {
long progress = record.getTuple(i).getField(windowFieldId);
if (initTimestamp == -1) {
initTimestamp = progress;
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new ArrayStreamRecord(record.getBatchSize());
} else {
if (progress > nextTimestamp) {
public void flatMap(Tuple value, Collector<Tuple> out) throws Exception {
// TODO Auto-generated method stub
long progress = value.getField(windowFieldId);
if (initTimestamp == -1) {
initTimestamp = progress;
nextTimestamp = initTimestamp + computeGranularity;
tempArrayList = new ArrayList();
} else {
if (progress > nextTimestamp) {
if (window.isFull()) {
ArrayList expiredArrayList = window.popFront();
incrementCompute(tempArrayList);
decrementCompute(expiredArrayList);
window.pushBack(tempArrayList);
if (window.isEmittable()) {
produceOutput(progress, out);
}
} else {
incrementCompute(tempArrayList);
window.pushBack(tempArrayList);
if (window.isFull()) {
StreamRecord expiredRecord = window.popFront();
incrementCompute(tempRecord);
decrementCompute(expiredRecord);
window.pushBack(tempRecord);
if (window.isEmittable()) {
produceRecord(progress);
}
} else {
incrementCompute(tempRecord);
window.pushBack(tempRecord);
if (window.isFull()) {
produceRecord(progress);
}
produceOutput(progress, out);
}
initTimestamp = nextTimestamp;
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new ArrayStreamRecord(record.getBatchSize());
}
tempRecord.setTuple(tupleIndex++, record.getTuple(i));
initTimestamp = nextTimestamp;
nextTimestamp = initTimestamp + computeGranularity;
tempArrayList = new ArrayList();
}
}
tempArrayList.add(value);
}
}
}
......@@ -45,14 +45,19 @@ public class ArrayStreamRecord extends StreamRecord {
}
public ArrayStreamRecord(StreamRecord record) {
tupleBatch = new Tuple[record.getBatchSize()];
this(record, record.getBatchSize());
}
public ArrayStreamRecord(StreamRecord record, int truncatedSize) {
tupleBatch = new Tuple[truncatedSize];
this.uid = new UID(Arrays.copyOf(record.getId().getId(), 20));
for (int i = 0; i < record.getBatchSize(); ++i) {
for (int i = 0; i < truncatedSize; ++i) {
this.tupleBatch[i] = copyTuple(record.getTuple(i));
}
this.batchSize = tupleBatch.length;
}
/**
* Creates a new batch of records containing the given Tuple array as
* elements
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class IterativeLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("IterativeSource", IterativeSource.class);
graphBuilder.setTask("IterativeParallel", IterativeParallel.class, 1, 1);
graphBuilder.setTask("IterativeStateHolder", IterativeStateHolder.class);
graphBuilder.setSink("IterativeSink", IterativeSink.class);
graphBuilder.fieldsConnect("IterativeSource", "IterativeParallel", 1);
graphBuilder.fieldsConnect("IterativeParallel", "IterativeStateHolder", 1);
graphBuilder.globalConnect("IterativeStateHolder", "IterativeSink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
}
}
......@@ -15,11 +15,32 @@
package eu.stratosphere.streaming.examples.iterative.collaborativefilter;
import org.apache.log4j.Level;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class CollaborativeFilteringLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("Source", new CollaborativeFilteringSource());
graphBuilder.setTask("Task", new CollaborativeFilteringTask(), 1, 1);
graphBuilder.setSink("Sink", new CollaborativeFilteringSink());
public static void main(String[] args) {
// TODO Auto-generated method stub
graphBuilder.fieldsConnect("Source", "Task", 0);
graphBuilder.shuffleConnect("Task", "Sink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
ClusterUtil.runOnMiniCluster(getJobGraph());
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.collaborativefilter;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class CollaborativeFilteringSink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
// TODO Auto-generated method stub
System.out.println("received record...");
int tupleNum = record.getNumOfTuples();
System.out.println("============================================");
for (int i = 0; i < tupleNum; ++i) {
System.out.println("name=" + record.getField(i, 0) + ", grade="
+ record.getField(i, 1) + ", salary="
+ record.getField(i, 2));
}
System.out.println("============================================");
}
}
\ No newline at end of file
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.collaborativefilter;
import java.io.BufferedReader;
import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class CollaborativeFilteringSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
private BufferedReader br = null;
private String line = new String();
private StreamRecord outRecord = new StreamRecord(new Tuple3<Integer, Integer, Integer>());
@Override
public void invoke() throws Exception {
// TODO Auto-generated method stub
br = new BufferedReader(new FileReader(
"src/test/resources/testdata/MovieLens100k.data"));
while (true) {
line = br.readLine();
if (line == null) {
break;
}
if (line != "") {
String[] items=line.split("\t");
outRecord.setInteger(0, Integer.valueOf(items[0]));
outRecord.setInteger(1, Integer.valueOf(items[1]));
outRecord.setInteger(2, Integer.valueOf(items[2]));
emit(outRecord);
performanceCounter.count();
}
line = br.readLine();
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.collaborativefilter;
import java.util.HashMap;
import org.jblas.DoubleMatrix;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class CollaborativeFilteringTask extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private StreamRecord outRecord = new StreamRecord(new Tuple1<String>());
HashMap<Integer, Integer> rowIndex=new HashMap<Integer, Integer>();
HashMap<Integer, Integer> columnIndex=new HashMap<Integer, Integer>();
DoubleMatrix userItem=new DoubleMatrix(1000, 2000);
DoubleMatrix coOccurence=new DoubleMatrix(2000, 2000);
@Override
public void invoke(StreamRecord record) throws Exception {
// TODO Auto-generated method stub
int userId = record.getInteger(0, 0);
int itemId = record.getInteger(0, 1);
int rating = record.getInteger(0, 2);
if(!rowIndex.containsKey(userId)){
rowIndex.put(userId, rowIndex.size());
}
if(!columnIndex.containsKey(itemId)){
columnIndex.put(itemId, columnIndex.size());
}
userItem.put(rowIndex.get(userId), columnIndex.get(itemId), rating);
//outRecord.setString(0, line);
}
}
......@@ -13,22 +13,20 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative;
package eu.stratosphere.streaming.examples.iterative.kmeans;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class IterativeStateHolder extends UserTaskInvokable {
private static final long serialVersionUID = -3042489460184024483L;
public IterativeStateHolder() {
}
public class KMeansSink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
// TODO Auto-generated method stub
//int tupleNum = record.getNumOfTuples();
System.out.println("============================================");
System.out.println("record=" + record.getString(0, 0));
System.out.println("============================================");
}
}
}
\ No newline at end of file
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.kmeans;
import java.util.Random;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class KMeansSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
private static final long DEFAULT_SEED = 4650285087650871364L;
private Random random = new Random(DEFAULT_SEED);
private StreamRecord outRecord = new StreamRecord(new Tuple1<String>());
private int numCenter;
private int dimension;
private double absoluteStdDev;
private double range;
private StringBuilder buffer = new StringBuilder();
public KMeansSource(int numCenter, int dimension, double stddev, double range){
this.numCenter=numCenter;
this.dimension=dimension;
this.absoluteStdDev = stddev * range;
this.range=range;
}
@Override
public void invoke() throws Exception {
// TODO Auto-generated method stub
double[][] means = uniformRandomCenters(random, numCenter, dimension, range);
double[] point = new double[dimension];
int nextCentroid = 0;
while (true) {
// generate a point for the current centroid
double[] centroid = means[nextCentroid];
for (int d = 0; d < dimension; d++) {
point[d] = (random.nextGaussian() * absoluteStdDev) + centroid[d];
}
nextCentroid = (nextCentroid + 1) % numCenter;
String pointString=generatePointString(point);
outRecord.setString(0, pointString);
emit(outRecord);
}
}
private double[][] uniformRandomCenters(Random rnd, int num, int dimensionality, double range) {
final double halfRange = range / 2;
final double[][] points = new double[num][dimensionality];
for (int i = 0; i < num; i++) {
for (int dim = 0; dim < dimensionality; dim ++) {
points[i][dim] = (rnd.nextDouble() * range) - halfRange;
}
}
return points;
}
private String generatePointString(double[] point){
buffer.setLength(0);
for (int j = 0; j < dimension; j++) {
buffer.append(point[j]);
if(j < dimension - 1) {
buffer.append(" ");
}
}
return buffer.toString();
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.kmeans;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class KMeansTask extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private StreamRecord outRecord = new StreamRecord(new Tuple1<String>());
private double[] point=null;
public KMeansTask(int dimension){
point = new double[dimension];
}
@Override
public void invoke(StreamRecord record) throws Exception {
// TODO Auto-generated method stub
String[] pointStr = record.getString(0, 0).split(" ");
for(int i=0; i<pointStr.length; ++i){
point[i]=Double.valueOf(pointStr[i]);
}
outRecord.setString(0, record.getString(0, 0));
emit(outRecord);
}
}
......@@ -15,11 +15,33 @@
package eu.stratosphere.streaming.examples.iterative.pagerank;
public class PagerankLocal {
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public static void main(String[] args) {
// TODO Auto-generated method stub
public class Graph {
public Map<Integer, Set<Integer>> _vertices = null;
public Graph() {
_vertices = new HashMap<Integer, Set<Integer>>();
}
public void insertDirectedEdge(int sourceNode, int targetNode) {
if (!_vertices.containsKey(sourceNode)) {
_vertices.put(sourceNode, new HashSet<Integer>());
}
_vertices.get(sourceNode).add(targetNode);
}
public void insertUndirectedEdge(int sourceNode, int targetNode){
if(!_vertices.containsKey(sourceNode)){
_vertices.put(sourceNode, new HashSet<Integer>());
}
if(!_vertices.containsKey(targetNode)){
_vertices.put(targetNode, new HashSet<Integer>());
}
_vertices.get(sourceNode).add(targetNode);
_vertices.get(targetNode).add(sourceNode);
}
}
......@@ -13,13 +13,34 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.kmeans;
package eu.stratosphere.streaming.examples.iterative.pagerank;
public class KMeansLocal {
import org.apache.log4j.Level;
public static void main(String[] args) {
// TODO Auto-generated method stub
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class PageRankLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("Source", new PageRankSource());
graphBuilder.setTask("Task", new PageRankTask(), 1, 1);
graphBuilder.setSink("Sink", new PageRankSink());
graphBuilder.fieldsConnect("Source", "Task", 0);
graphBuilder.shuffleConnect("Task", "Sink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
ClusterUtil.runOnMiniCluster(getJobGraph());
}
}
......@@ -13,17 +13,17 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative;
package eu.stratosphere.streaming.examples.iterative.pagerank;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class IterativeSink extends UserSinkInvokable {
private static final long serialVersionUID = -1989637817643875304L;
public class PageRankSink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
// TODO Auto-generated method stub
System.out.println("received record...");
int tupleNum = record.getNumOfTuples();
System.out.println("============================================");
......@@ -32,6 +32,6 @@ public class IterativeSink extends UserSinkInvokable {
+ record.getField(i, 1) + ", salary="
+ record.getField(i, 2));
}
System.out.println("============================================");
System.out.println("============================================");
}
}
}
\ No newline at end of file
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.pagerank;
import java.io.BufferedReader;
import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class PageRankSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
private BufferedReader br = null;
private StreamRecord outRecord = new StreamRecord(new Tuple2<Integer, Integer>());
@Override
public void invoke() throws Exception {
// TODO Auto-generated method stub
br = new BufferedReader(new FileReader(
"src/test/resources/testdata/ASTopology.data"));
while (true) {
String line = br.readLine();
if (line == null) {
break;
}
if (line != "") {
String[] link=line.split(":");
outRecord.setInteger(0, Integer.valueOf(link[0]));
outRecord.setInteger(0, Integer.valueOf(link[1]));
emit(outRecord);
performanceCounter.count();
}
line = br.readLine();
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.pagerank;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class PageRankTask extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private StreamRecord outRecord = new StreamRecord(new Tuple1<String>());
private Graph linkGraph = new Graph();
@Override
public void invoke(StreamRecord record) throws Exception {
// TODO Auto-generated method stub
Integer sourceNode = record.getInteger(0, 0);
Integer targetNode = record.getInteger(0, 1);
// set the input graph.
linkGraph.insertDirectedEdge(sourceNode, targetNode);
//outRecord.setString(0, line);
}
}
......@@ -13,34 +13,35 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state;
package eu.stratosphere.streaming.examples.iterative.sssp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.index.IndexPair;
public class Graph {
public Map<Integer, Set<Integer>> _vertices = null;
public class LogTableStateIterator<K, V> implements TableStateIterator<K ,V>{
private Iterator<Entry<K, IndexPair>> iterator;
private HashMap<Integer, ArrayList<V>> blockList;
public LogTableStateIterator(Iterator<Entry<K, IndexPair>> iter, HashMap<Integer, ArrayList<V>> blocks){
iterator=iter;
blockList=blocks;
}
@Override
public boolean hasNext() {
// TODO Auto-generated method stub
return false;
public Graph() {
_vertices = new HashMap<Integer, Set<Integer>>();
}
@Override
public Tuple2<K, V> next() {
// TODO Auto-generated method stub
return null;
public void insertDirectedEdge(int sourceNode, int targetNode) {
if (!_vertices.containsKey(sourceNode)) {
_vertices.put(sourceNode, new HashSet<Integer>());
}
_vertices.get(sourceNode).add(targetNode);
}
public void insertUndirectedEdge(int sourceNode, int targetNode){
if(!_vertices.containsKey(sourceNode)){
_vertices.put(sourceNode, new HashSet<Integer>());
}
if(!_vertices.containsKey(targetNode)){
_vertices.put(targetNode, new HashSet<Integer>());
}
_vertices.get(sourceNode).add(targetNode);
_vertices.get(targetNode).add(sourceNode);
}
}
......@@ -15,11 +15,32 @@
package eu.stratosphere.streaming.examples.iterative.sssp;
import org.apache.log4j.Level;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class SSSPLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("Source", new SSSPSource());
graphBuilder.setTask("Task", new SSSPTask(), 1, 1);
graphBuilder.setSink("Sink", new SSSPSink());
public static void main(String[] args) {
// TODO Auto-generated method stub
graphBuilder.fieldsConnect("Source", "Task", 0);
graphBuilder.shuffleConnect("Task", "Sink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
ClusterUtil.runOnMiniCluster(getJobGraph());
}
}
......@@ -13,16 +13,17 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.join;
package eu.stratosphere.streaming.examples.iterative.sssp;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowJoinSink extends UserSinkInvokable {
public class SSSPSink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
// TODO Auto-generated method stub
System.out.println("received record...");
int tupleNum = record.getNumOfTuples();
System.out.println("============================================");
......@@ -31,6 +32,6 @@ public class WindowJoinSink extends UserSinkInvokable {
+ record.getField(i, 1) + ", salary="
+ record.getField(i, 2));
}
System.out.println("============================================");
System.out.println("============================================");
}
}
}
\ No newline at end of file
......@@ -13,18 +13,40 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative;
package eu.stratosphere.streaming.examples.iterative.sssp;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
public class IterativeSource extends UserSourceInvokable {
import java.io.BufferedReader;
import java.io.FileReader;
private static final long serialVersionUID = 8983174839600079890L;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class SSSPSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
private BufferedReader br = null;
private StreamRecord outRecord = new StreamRecord(new Tuple2<Integer, Integer>());
@Override
public void invoke() throws Exception {
// TODO Auto-generated method stub
br = new BufferedReader(new FileReader(
"src/test/resources/testdata/ASTopology.data"));
while (true) {
String line = br.readLine();
if (line == null) {
break;
}
if (line != "") {
String[] link=line.split(":");
outRecord.setInteger(0, Integer.valueOf(link[0]));
outRecord.setInteger(0, Integer.valueOf(link[1]));
emit(outRecord);
performanceCounter.count();
}
line = br.readLine();
}
}
}
......@@ -13,22 +13,27 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative;
package eu.stratosphere.streaming.examples.iterative.sssp;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class IterativeParallel extends UserTaskInvokable {
private static final long serialVersionUID = -3042489460184024483L;
public IterativeParallel() {
}
public class SSSPTask extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private StreamRecord outRecord = new StreamRecord(new Tuple1<String>());
private Graph linkGraph = new Graph();
@Override
public void invoke(StreamRecord record) throws Exception {
// TODO Auto-generated method stub
Integer sourceNode = record.getInteger(0, 0);
Integer targetNode = record.getInteger(0, 1);
// set the input graph.
linkGraph.insertDirectedEdge(sourceNode, targetNode);
//outRecord.setString(0, line);
}
}
......@@ -14,134 +14,188 @@
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.ml;
import eu.stratosphere.api.java.functions.MapFunction;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.util.Collector;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.LogUtils;
public class IncrementalLearningSkeleton {
// Source for feeding new data for prediction
public static class NewDataSource extends SourceFunction<Tuple1<Integer>> {
public static class NewDataSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
StreamRecord record = new StreamRecord(new Tuple1<Integer>(1));
@Override
public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
public void invoke() throws Exception {
while (true) {
collector.collect(getNewData());
record.setTuple(getNewData());
emit(record);
}
}
// Method for pulling new data for prediction
private Tuple1<Integer> getNewData() throws InterruptedException {
private Tuple getNewData() throws InterruptedException {
return new Tuple1<Integer>(1);
}
}
// Source for feeding new training data for partial model building
public static class TrainingDataSource extends SourceFunction<Tuple1<Integer>> {
public static class TrainingDataSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
// Number of tuples grouped for building partial model
// TODO: batch training data
private final int BATCH_SIZE = 1000;
StreamRecord record = new StreamRecord(1, BATCH_SIZE);
@Override
public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
public void invoke() throws Exception {
record.initRecords();
while (true) {
// Group the predefined number of records in a streamrecord then
// emit for model building
collector.collect(getTrainingData());;
for (int i = 0; i < BATCH_SIZE; i++) {
record.setTuple(i, getTrainingData());
}
emit(record);
}
}
// Method for pulling new training data
private Tuple1<Integer> getTrainingData() throws InterruptedException {
private Tuple getTrainingData() throws InterruptedException {
return new Tuple1<Integer>(1);
}
}
// Task for building up-to-date partial models on new training data
public static class PartialModelBuilder extends MapFunction<Tuple1<Integer>, Tuple1<Integer>> {
public static class PartialModelBuilder extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
@Override
public Tuple1<Integer> map(Tuple1<Integer> inTuple) throws Exception {
return buildPartialModel(inTuple);
public void invoke(StreamRecord record) throws Exception {
emit(buildPartialModel(record));
}
// Method for building partial model on the grouped training data
protected Tuple1<Integer> buildPartialModel(Tuple1<Integer> inTuple) {
return new Tuple1<Integer>(1);
protected StreamRecord buildPartialModel(StreamRecord record) {
return new StreamRecord(new Tuple1<Integer>(1));
}
}
// Task for performing prediction using the model produced in
// batch-processing and the up-to-date partial model
public static class Predictor extends MapFunction<Tuple1<Integer>, Tuple1<Integer>> {
public static class Predictor extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
Tuple1<Integer> batchModel = null;
Tuple1<Integer> partialModel = null;
StreamRecord batchModel = null;
StreamRecord partialModel = null;
@Override
public Tuple1<Integer> map(Tuple1<Integer> inTuple) throws Exception {
if (isModel(inTuple)) {
partialModel = inTuple;
public void invoke(StreamRecord record) throws Exception {
if (isModel(record)) {
partialModel = record;
batchModel = getBatchModel();
return null; //TODO: fix
} else {
return predict(inTuple);
emit(predict(record));
}
}
// Pulls model built with batch-job on the old training data
protected Tuple1<Integer> getBatchModel() {
return new Tuple1<Integer>(1);
protected StreamRecord getBatchModel() {
return new StreamRecord(new Tuple1<Integer>(1));
}
// Checks whether the record is a model or a new data
protected boolean isModel(Tuple1<Integer> inTuple) {
protected boolean isModel(StreamRecord record) {
return true;
}
// Performs prediction using the two models
protected Tuple1<Integer> predict(Tuple1<Integer> inTuple) {
return new Tuple1<Integer>(0);
protected StreamRecord predict(StreamRecord record) {
return new StreamRecord(new Tuple1<Integer>(0));
}
}
public static class IMLSink extends SinkFunction<Tuple1<Integer>> {
public static class Sink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Tuple1<Integer> inTuple) {
public void invoke(StreamRecord record) throws Exception {
// do nothing
}
}
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalLearning");
graphBuilder.setSource("NewData", NewDataSource.class, 1, 1);
graphBuilder.setSource("TrainingData", TrainingDataSource.class, 1, 1);
graphBuilder.setTask("PartialModelBuilder", PartialModelBuilder.class, 1, 1);
graphBuilder.setTask("Predictor", Predictor.class, 1, 1);
graphBuilder.setSink("Sink", Sink.class, 1, 1);
graphBuilder.shuffleConnect("TrainingData", "PartialModelBuilder");
graphBuilder.shuffleConnect("NewData", "Predictor");
graphBuilder.broadcastConnect("PartialModelBuilder", "Predictor");
graphBuilder.shuffleConnect("Predictor", "Sink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
// set logging parameters for local run
LogUtils.initializeDefaultConsoleLogger(Level.INFO, Level.INFO);
DataStream<Tuple1<Integer>> model =
env.addSource(new TrainingDataSource())
.map(new PartialModelBuilder())
.broadcast();
DataStream<Tuple1<Integer>> prediction =
env.addSource(new NewDataSource())
.connectWith(model)
.map(new Predictor())
.addSink(new IMLSink());
try {
// generate JobGraph
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0 || args[0].equals("local")) {
System.out.println("Running in Local mode");
// start local cluster and submit JobGraph
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster mode");
// submit JobGraph to the running cluster
Client client = new Client(new InetSocketAddress("dell150", 6123), configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
}
}
\ No newline at end of file
......@@ -18,57 +18,73 @@ import java.util.Random;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.math.stat.regression.OLSMultipleLinearRegression;
import org.apache.log4j.Level;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.util.Collector;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class IncrementalOLS {
public static class NewDataSource extends SourceFunction<Tuple2<Boolean, Double[]>> {
public static class NewDataSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
StreamRecord record = new StreamRecord(2, 1);
Random rnd = new Random();
@Override
public void invoke(Collector<Tuple2<Boolean, Double[]>> collector) throws Exception {
public void invoke() throws Exception {
record.initRecords();
while (true) {
// pull new record from data source
collector.collect(getNewData());
record.setTuple(getNewData());
emit(record);
}
}
private Tuple2<Boolean, Double[]> getNewData() throws InterruptedException {
private Tuple getNewData() throws InterruptedException {
return new Tuple2<Boolean, Double[]>(false, new Double[] { rnd.nextDouble() * 3,
rnd.nextDouble() * 5 });
}
}
public static class TrainingDataSource extends SourceFunction<Tuple2<Double, Double[]>> {
public static class TrainingDataSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
// TODO: batch training data
private final int BATCH_SIZE = 1000;
StreamRecord record = new StreamRecord(2, BATCH_SIZE);
Random rnd = new Random();
@Override
public void invoke(Collector<Tuple2<Double, Double[]>> collector) throws Exception {
public void invoke() throws Exception {
record.initRecords();
while (true) {
collector.collect(getTrainingData());
for (int i = 0; i < BATCH_SIZE; i++) {
record.setTuple(i, getTrainingData());
}
emit(record);
}
}
private Tuple2<Double, Double[]> getTrainingData() throws InterruptedException {
private Tuple getTrainingData() throws InterruptedException {
return new Tuple2<Double, Double[]>(rnd.nextDouble() * 10, new Double[] {
rnd.nextDouble() * 3, rnd.nextDouble() * 5 });
......@@ -76,29 +92,25 @@ public class IncrementalOLS {
}
}
public static class PartialModelBuilder extends
MapFunction<Tuple2<Double, Double[]>, Tuple2<Boolean, Double[]>> {
public static class PartialModelBuilder extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Boolean, Double[]> map(Tuple2<Double, Double[]> inTuple) throws Exception {
return buildPartialModel(inTuple);
public void invoke(StreamRecord record) throws Exception {
emit(buildPartialModel(record));
}
// TODO: deal with batchsize
protected Tuple2<Boolean, Double[]> buildPartialModel(Tuple2<Double, Double[]> inTuple) {
protected StreamRecord buildPartialModel(StreamRecord record) {
// Integer numOfTuples = record.getNumOfTuples();
Integer numOfTuples = 1;
Integer numOfFeatures = ((Double[]) inTuple.getField(1)).length;
Integer numOfTuples = record.getNumOfTuples();
Integer numOfFeatures = ((Double[]) record.getField(1)).length;
double[][] x = new double[numOfTuples][numOfFeatures];
double[] y = new double[numOfTuples];
for (int i = 0; i < numOfTuples; i++) {
// Tuple t = record.getTuple(i);
Tuple t = inTuple;
Tuple t = record.getTuple(i);
Double[] x_i = (Double[]) t.getField(1);
y[i] = (Double) t.getField(0);
for (int j = 0; j < numOfFeatures; j++) {
......@@ -109,69 +121,90 @@ public class IncrementalOLS {
OLSMultipleLinearRegression ols = new OLSMultipleLinearRegression();
ols.newSampleData(y, x);
return new Tuple2<Boolean, Double[]>(true, (Double[]) ArrayUtils.toObject(ols
.estimateRegressionParameters()));
return new StreamRecord(new Tuple2<Boolean, Double[]>(true,
(Double[]) ArrayUtils.toObject(ols.estimateRegressionParameters())));
}
}
// TODO: How do I know the x for which I have predicted y?
public static class Predictor extends MapFunction<Tuple2<Boolean, Double[]>, Tuple1<Double>> {
public static class Predictor extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
// StreamRecord batchModel = null;
Double[] partialModel = new Double[] { 0.0, 0.0 };
@Override
public Tuple1<Double> map(Tuple2<Boolean, Double[]> inTuple) throws Exception {
if (isModel(inTuple)) {
partialModel = inTuple.f1;
public void invoke(StreamRecord record) throws Exception {
if (isModel(record)) {
partialModel = (Double[]) record.getField(1);
// batchModel = getBatchModel();
return null; //TODO: fix
} else {
return predict(inTuple);
emit(predict(record));
}
}
protected boolean isModel(Tuple2<Boolean, Double[]> inTuple) {
return inTuple.f0;
// protected StreamRecord getBatchModel() {
// return new StreamRecord(new Tuple1<Integer>(1));
// }
protected boolean isModel(StreamRecord record) {
return record.getBoolean(0);
}
protected Tuple1<Double> predict(Tuple2<Boolean, Double[]> inTuple) {
Double[] x = inTuple.f1;
protected StreamRecord predict(StreamRecord record) {
Double[] x = (Double[]) record.getField(1);
Double prediction = 0.0;
for (int i = 0; i < x.length; i++) {
prediction = prediction + x[i] * partialModel[i];
}
return new Tuple1<Double>(prediction);
return new StreamRecord(new Tuple1<Double>(prediction));
}
}
public static class IncOLSSink extends SinkFunction<Tuple1<Double>> {
public static class Sink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Tuple1<Double> inTuple) {
System.out.println(inTuple);
public void invoke(StreamRecord record) throws Exception {
}
}
public static void main(String[] args) {
private static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalOLS",
FaultToleranceType.NONE);
graphBuilder.setSource("NewData", new NewDataSource(), 1, 1);
graphBuilder.setSource("TrainingData",new TrainingDataSource(), 1, 1);
graphBuilder.setTask("PartialModelBuilder",new PartialModelBuilder(), 1, 1);
graphBuilder.setTask("Predictor",new Predictor(), 1, 1);
graphBuilder.setSink("Sink",new Sink(), 1, 1);
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
graphBuilder.shuffleConnect("TrainingData", "PartialModelBuilder");
graphBuilder.shuffleConnect("NewData", "Predictor");
graphBuilder.broadcastConnect("PartialModelBuilder", "Predictor");
graphBuilder.shuffleConnect("Predictor", "Sink");
DataStream<Tuple2<Boolean, Double[]>> model =
env.addSource(new TrainingDataSource())
.map(new PartialModelBuilder())
.broadcast();
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
// set logging parameters for local run
DataStream<Tuple1<Double>> prediction =
env.addSource(new NewDataSource())
.connectWith(model)
.map(new Predictor())
.addSink(new IncOLSSink());
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
ClusterUtil.runOnMiniCluster(getJobGraph());
} else if (args[0].equals("cluster")) {
ClusterUtil.runOnLocalCluster(getJobGraph(), "hadoop02.ilab.sztaki.hu", 6123);
}
}
}
......@@ -15,72 +15,32 @@
package eu.stratosphere.streaming.examples.window.join;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.api.java.tuple.Tuple4;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.streaming.examples.join.JoinSink;
import eu.stratosphere.streaming.util.LogUtils;
public class WindowJoinLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("WindowJoinSourceOne", WindowJoinSourceOne.class);
graphBuilder.setSource("WindowJoinSourceTwo", WindowJoinSourceTwo.class);
graphBuilder.setTask("WindowJoinTask", WindowJoinTask.class, 1, 1);
graphBuilder.setSink("WindowJoinSink", WindowJoinSink.class);
graphBuilder.fieldsConnect("WindowJoinSourceOne", "WindowJoinTask", 1);
graphBuilder.fieldsConnect("WindowJoinSourceTwo", "WindowJoinTask", 1);
graphBuilder.shuffleConnect("WindowJoinTask", "WindowJoinSink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
client.run(jG, true);
DataStream<Tuple4<String, String, Integer, Long>> source1 = context
.addSource(new WindowJoinSourceOne());
}
@SuppressWarnings("unused")
DataStream<Tuple3<String, Integer, Integer>> source2 = context
.addSource(new WindowJoinSourceTwo()).connectWith(source1).partitionBy(1)
.flatMap(new WindowJoinTask()).addSink(new JoinSink());
} catch (Exception e) {
System.out.println(e);
}
context.execute();
}
}
......@@ -18,30 +18,28 @@ package eu.stratosphere.streaming.examples.window.join;
import java.util.Random;
import eu.stratosphere.api.java.tuple.Tuple4;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.util.Collector;
public class WindowJoinSourceOne extends UserSourceInvokable {
public class WindowJoinSourceOne extends SourceFunction<Tuple4<String, String, Integer, Long>> {
private static final long serialVersionUID = 6670933703432267728L;
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace",
"sasa", "lawrance", "andrew", "jean", "richard", "smith", "gorge",
"black", "peter" };
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
private Random rand = new Random();
private StreamRecord outRecord = new StreamRecord(
new Tuple4<String, String, Integer, Long>());
private long progress = 0L;
private Tuple4<String, String, Integer, Long> outRecord = new Tuple4<String, String, Integer, Long>();
private Long progress = 0L;
@Override
public void invoke() throws Exception {
public void invoke(Collector<Tuple4<String, String, Integer, Long>> collector) throws Exception {
while (true) {
outRecord.setString(0, "salary");
outRecord.setString(1, names[rand.nextInt(names.length)]);
outRecord.setInteger(2, rand.nextInt(10000));
outRecord.setLong(3, progress);
emit(outRecord);
progress+=1;
outRecord.f0 = "salary";
outRecord.f1 = names[rand.nextInt(names.length)];
outRecord.f2 = rand.nextInt(10000);
outRecord.f3 = progress;
collector.collect(outRecord);
progress += 1;
}
}
}
......@@ -18,30 +18,28 @@ package eu.stratosphere.streaming.examples.window.join;
import java.util.Random;
import eu.stratosphere.api.java.tuple.Tuple4;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.util.Collector;
public class WindowJoinSourceTwo extends UserSourceInvokable {
public class WindowJoinSourceTwo extends SourceFunction<Tuple4<String, String, Integer, Long>> {
private static final long serialVersionUID = -5897483980082089771L;
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace",
"sasa", "lawrance", "andrew", "jean", "richard", "smith", "gorge",
"black", "peter" };
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
private Random rand = new Random();
private StreamRecord outRecord = new StreamRecord(
new Tuple4<String, String, String, Long>());
private long progress = 0L;
private Tuple4<String, String, Integer, Long> outRecord = new Tuple4<String, String, Integer, Long>();
private Long progress = 0L;
@Override
public void invoke() throws Exception {
public void invoke(Collector<Tuple4<String, String, Integer, Long>> collector) throws Exception {
while (true) {
outRecord.setString(0, "grade");
outRecord.setString(1, names[rand.nextInt(names.length)]);
outRecord.setString(2, String.valueOf((char)(rand.nextInt(26)+'A')));
outRecord.setLong(3, progress);
emit(outRecord);
progress+=1;
outRecord.f0 = "grade";
outRecord.f1 = names[rand.nextInt(names.length)];
outRecord.f2 = rand.nextInt(5) + 1;
outRecord.f3 = progress;
collector.collect(outRecord);
progress += 1;
}
}
}
......@@ -19,37 +19,38 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.api.java.tuple.Tuple4;
import eu.stratosphere.util.Collector;
public class WindowJoinTask extends UserTaskInvokable {
public class WindowJoinTask extends
FlatMapFunction<Tuple4<String, String, Integer, Long>, Tuple3<String, Integer, Integer>> {
class SalaryProgress {
public SalaryProgress(int salary, long progress) {
public SalaryProgress(Integer salary, Long progress) {
this.salary = salary;
this.progress = progress;
}
int salary;
long progress;
Integer salary;
Long progress;
}
class GradeProgress {
public GradeProgress(String grade, long progress) {
public GradeProgress(Integer grade, Long progress) {
this.grade = grade;
this.progress = progress;
}
String grade;
long progress;
Integer grade;
Long progress;
}
private static final long serialVersionUID = 749913336259789039L;
private int windowSize = 100;
private HashMap<String, LinkedList<GradeProgress>> gradeHashmap;
private HashMap<String, LinkedList<SalaryProgress>> salaryHashmap;
private StreamRecord outRecord = new StreamRecord(3);
public WindowJoinTask() {
gradeHashmap = new HashMap<String, LinkedList<GradeProgress>>();
......@@ -57,59 +58,49 @@ public class WindowJoinTask extends UserTaskInvokable {
}
@Override
public void invoke(StreamRecord record) throws Exception {
// TODO Auto-generated method stub
String streamId = record.getString(0);
String name = record.getString(1);
long progress = record.getLong(3);
public void flatMap(Tuple4<String, String, Integer, Long> value,
Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
String streamId = value.f0;
String name = value.f1;
Long progress = value.f3;
if (streamId.equals("grade")) {
if (salaryHashmap.containsKey(name)) {
Iterator<SalaryProgress> iterator = salaryHashmap.get(name)
.iterator();
Iterator<SalaryProgress> iterator = salaryHashmap.get(name).iterator();
while (iterator.hasNext()) {
SalaryProgress entry = iterator.next();
if (progress - entry.progress > windowSize) {
iterator.remove();
} else {
Tuple3<String, String, Integer> outputTuple = new Tuple3<String, String, Integer>(
name, record.getString(2), entry.salary);
outRecord.addTuple(outputTuple);
Tuple3<String, Integer, Integer> outputTuple = new Tuple3<String, Integer, Integer>(
name, value.f2, entry.salary);
out.collect(outputTuple);
}
}
if (outRecord.getNumOfTuples() != 0) {
emit(outRecord);
if (!gradeHashmap.containsKey(name)) {
gradeHashmap.put(name, new LinkedList<GradeProgress>());
}
outRecord.Clear();
}
if (!gradeHashmap.containsKey(name)) {
gradeHashmap.put(name, new LinkedList<GradeProgress>());
}
gradeHashmap.get(name).add(
new GradeProgress(record.getString(2), progress));
} else {
if (gradeHashmap.containsKey(name)) {
Iterator<GradeProgress> iterator = gradeHashmap.get(name)
.iterator();
while (iterator.hasNext()) {
GradeProgress entry = iterator.next();
if (progress - entry.progress > windowSize) {
iterator.remove();
} else {
Tuple3<String, String, Integer> outputTuple = new Tuple3<String, String, Integer>(
name, entry.grade, record.getInteger(2));
outRecord.addTuple(outputTuple);
gradeHashmap.get(name).add(new GradeProgress(value.f2, progress));
} else {
if (gradeHashmap.containsKey(name)) {
Iterator<GradeProgress> iterator = gradeHashmap.get(name).iterator();
while (iterator.hasNext()) {
GradeProgress entry = iterator.next();
if (progress - entry.progress > windowSize) {
iterator.remove();
} else {
Tuple3<String, Integer, Integer> outputTuple = new Tuple3<String, Integer, Integer>(
name, entry.grade, value.f2);
out.collect(outputTuple);
}
}
}
if (outRecord.getNumOfTuples() != 0) {
emit(outRecord);
if (!salaryHashmap.containsKey(name)) {
salaryHashmap.put(name, new LinkedList<SalaryProgress>());
}
outRecord.Clear();
salaryHashmap.get(name).add(new SalaryProgress(value.f2, progress));
}
if (!salaryHashmap.containsKey(name)) {
salaryHashmap.put(name, new LinkedList<SalaryProgress>());
}
salaryHashmap.get(name).add(
new SalaryProgress(record.getInteger(2), progress));
}
}
}
......@@ -15,90 +15,87 @@
package eu.stratosphere.streaming.examples.window.sum;
import java.util.ArrayList;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.SlidingWindowState;
import eu.stratosphere.util.Collector;
public class WindowSumAggregate extends UserTaskInvokable {
public class WindowSumAggregate extends
FlatMapFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>> {
private static final long serialVersionUID = 1L;
private int windowSize = 100;
private int slidingStep = 20;
private int computeGranularity = 10;
private int windowFieldId = 1;
private StreamRecord tempRecord;
private SlidingWindowState<Integer> window;
private ArrayList<Tuple2<Integer, Long>> tempTupleArray = null;
private Tuple2<Integer, Long> outTuple = new Tuple2<Integer, Long>();
private SlidingWindowState window;
private MutableTableState<String, Integer> sum;
private long initTimestamp = -1;
private long nextTimestamp = -1;
private StreamRecord outRecord = new StreamRecord(
new Tuple2<Integer, Long>());
public WindowSumAggregate() {
window = new SlidingWindowState<Integer>(windowSize, slidingStep,
window = new SlidingWindowState(windowSize, slidingStep,
computeGranularity);
sum = new MutableTableState<String, Integer>();
sum.put("sum", 0);
}
private void incrementCompute(StreamRecord record) {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
int number = record.getInteger(i, 0);
private void incrementCompute(ArrayList<Tuple2<Integer, Long>> tupleArray) {
for (int i = 0; i < tupleArray.size(); ++i) {
int number = tupleArray.get(i).f0;
sum.put("sum", sum.get("sum") + number);
}
}
private void decrementCompute(StreamRecord record) {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
int number = record.getInteger(i, 0);
private void decrementCompute(ArrayList<Tuple2<Integer, Long>> tupleArray) {
for (int i = 0; i < tupleArray.size(); ++i) {
int number = tupleArray.get(i).f0;
sum.put("sum", sum.get("sum") - number);
}
}
private void produceRecord(long progress){
outRecord.setInteger(0, sum.get("sum"));
outRecord.setLong(1, progress);
emit(outRecord);
}
private void produceOutput(long progress, Collector<Tuple2<Integer, Long>> out){
outTuple.f0 = sum.get("sum");
outTuple.f1 = progress;
out.collect(outTuple);
}
@Override
public void invoke(StreamRecord record) throws Exception {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
long progress = record.getLong(i, windowFieldId);
if (initTimestamp == -1) {
initTimestamp = progress;
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new StreamRecord(record.getNumOfFields());
} else {
if (progress >= nextTimestamp) {
public void flatMap(Tuple2<Integer, Long> value,
Collector<Tuple2<Integer, Long>> out) throws Exception {
// TODO Auto-generated method stub
long progress = value.f1;
if (initTimestamp == -1) {
initTimestamp = progress;
nextTimestamp = initTimestamp + computeGranularity;
tempTupleArray = new ArrayList<Tuple2<Integer, Long>>();
} else {
if (progress >= nextTimestamp) {
if (window.isFull()) {
ArrayList<Tuple2<Integer, Long>> expiredTupleArray = window.popFront();
incrementCompute(tempTupleArray);
decrementCompute(expiredTupleArray);
window.pushBack(tempTupleArray);
if (window.isEmittable()) {
produceOutput(progress, out);
}
} else {
incrementCompute(tempTupleArray);
window.pushBack(tempTupleArray);
if (window.isFull()) {
StreamRecord expiredRecord = window.popFront();
incrementCompute(tempRecord);
decrementCompute(expiredRecord);
window.pushBack(tempRecord);
if (window.isEmittable()) {
produceRecord(progress);
}
} else {
incrementCompute(tempRecord);
window.pushBack(tempRecord);
if (window.isFull()) {
produceRecord(progress);
}
produceOutput(progress, out);
}
initTimestamp = nextTimestamp;
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new StreamRecord(record.getNumOfFields());
}
initTimestamp = nextTimestamp;
nextTimestamp = initTimestamp + computeGranularity;
tempTupleArray = new ArrayList<Tuple2<Integer, Long>>();
}
tempRecord.addTuple(record.getTuple(i));
}
tempTupleArray.add(value);
}
}
......@@ -15,71 +15,21 @@
package eu.stratosphere.streaming.examples.window.sum;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.util.LogUtils;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
public class WindowSumLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WindowSumSource", WindowSumSource.class);
graphBuilder.setTask("WindowSumMultiple", WindowSumMultiple.class, 1, 1);
graphBuilder.setTask("WindowSumAggregate", WindowSumAggregate.class, 1, 1);
graphBuilder.setSink("WindowSumSink", WindowSumSink.class);
graphBuilder.shuffleConnect("WindowSumSource", "WindowSumMultiple");
graphBuilder.shuffleConnect("WindowSumMultiple", "WindowSumAggregate");
graphBuilder.shuffleConnect("WindowSumAggregate", "WindowSumSink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
@SuppressWarnings("unused")
DataStream<Tuple2<Integer, Long>> dataStream = context
.addSource(new WindowSumSource())
.map(new WindowSumMultiple())
.flatMap(new WindowSumAggregate())
.addSink(new WindowSumSink());
context.execute();
}
}
......@@ -15,21 +15,19 @@
package eu.stratosphere.streaming.examples.window.sum;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowSumMultiple extends UserTaskInvokable {
public class WindowSumMultiple extends MapFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>> {
private static final long serialVersionUID = 1L;
private StreamRecord outputRecord = new StreamRecord(new Tuple2<Integer, Long>());
private Tuple2<Integer, Long> outTuple = new Tuple2<Integer, Long>();
@Override
public void invoke(StreamRecord record) throws Exception {
Integer number = record.getInteger(0);
Long timestamp = record.getLong(1);
outputRecord.setInteger(0, number+1);
outputRecord.setLong(1, timestamp);
emit(outputRecord);
public Tuple2<Integer, Long> map(Tuple2<Integer, Long> inTuple) throws Exception {
// TODO Auto-generated method stub
outTuple.f0 = inTuple.f0 * 2;
outTuple.f1 = inTuple.f1;
return outTuple;
}
}
\ No newline at end of file
......@@ -15,21 +15,15 @@
package eu.stratosphere.streaming.examples.window.sum;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.SinkFunction;
public class WindowSumSink extends UserSinkInvokable {
public class WindowSumSink extends SinkFunction<Tuple2<Integer, Long>> {
private static final long serialVersionUID = 1L;
private Integer sum = 0;
private long timestamp = 0;
@Override
public void invoke(StreamRecord record) throws Exception {
sum = record.getInteger(0);
timestamp = record.getLong(1);
System.out.println("============================================");
System.out.println(sum + " " + timestamp);
System.out.println("============================================");
public void invoke(Tuple2<Integer, Long> inTuple) {
// TODO Auto-generated method stub
System.out.println(inTuple);
}
}
......@@ -16,23 +16,23 @@
package eu.stratosphere.streaming.examples.window.sum;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.util.Collector;
public class WindowSumSource extends UserSourceInvokable {
public class WindowSumSource extends SourceFunction<Tuple2<Integer, Long>> {
private static final long serialVersionUID = 1L;
private StreamRecord outRecord = new StreamRecord(
new Tuple2<Integer, Long>());
private Tuple2<Integer, Long> outRecord = new Tuple2<Integer, Long>();
private Long timestamp = 0L;
@Override
public void invoke() throws Exception {
public void invoke(Collector<Tuple2<Integer, Long>> collector) throws Exception {
// TODO Auto-generated method stub
for (int i = 0; i < 1000; ++i) {
outRecord.setInteger(0, i);
outRecord.setLong(1, timestamp);
outRecord.f0 = i;
outRecord.f1 = timestamp;
collector.collect(outRecord);
timestamp++;
emit(outRecord);
}
}
}
}
......@@ -15,40 +15,41 @@
package eu.stratosphere.streaming.examples.window.wordcount;
import java.util.ArrayList;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.MutableTableStateIterator;
import eu.stratosphere.streaming.state.SlidingWindowState;
import eu.stratosphere.util.Collector;
public class WindowWordCountCounter extends UserTaskInvokable {
public class WindowWordCountCounter extends
FlatMapFunction<Tuple2<String, Long>, Tuple3<String, Integer, Long>> {
private static final long serialVersionUID = 1L;
private int windowSize=10;
private int slidingStep=2;
private int computeGranularity=1;
private int windowFieldId=2;
private StreamRecord tempRecord;
private SlidingWindowState<Integer> window;
private int windowSize = 10;
private int slidingStep = 2;
private int computeGranularity = 1;
private ArrayList<Tuple2<String, Long>> tempTupleArray = null;
private Tuple3<String, Integer, Long> outTuple = new Tuple3<String, Integer, Long>();
private SlidingWindowState window;
private MutableTableState<String, Integer> wordCounts;
private long initTimestamp=-1;
private long nextTimestamp=-1;
private long initTimestamp = -1;
private long nextTimestamp = -1;
private Long timestamp = 0L;
private StreamRecord outRecord = new StreamRecord(3);
public WindowWordCountCounter() {
window = new SlidingWindowState<Integer>(windowSize, slidingStep,
window = new SlidingWindowState(windowSize, slidingStep,
computeGranularity);
wordCounts = new MutableTableState<String, Integer>();
}
private void incrementCompute(StreamRecord record) {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
String word = record.getString(i, 0);
private void incrementCompute(ArrayList<Tuple2<String, Long>> tupleArray) {
for (int i = 0; i < tupleArray.size(); ++i) {
String word = tupleArray.get(i).f0;
if (wordCounts.containsKey(word)) {
int count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
......@@ -58,10 +59,9 @@ public class WindowWordCountCounter extends UserTaskInvokable {
}
}
private void decrementCompute(StreamRecord record) {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
String word = record.getString(i, 0);
private void decrementCompute(ArrayList<Tuple2<String, Long>> tupleArray) {
for (int i = 0; i < tupleArray.size(); ++i) {
String word = tupleArray.get(i).f0;
int count = wordCounts.get(word) - 1;
if (count == 0) {
wordCounts.delete(word);
......@@ -71,51 +71,48 @@ public class WindowWordCountCounter extends UserTaskInvokable {
}
}
private void produceRecord(long progress){
outRecord.Clear();
MutableTableStateIterator<String, Integer> iterator = wordCounts
.getIterator();
private void produceOutput(long progress, Collector<Tuple3<String, Integer, Long>> out) {
MutableTableStateIterator<String, Integer> iterator = wordCounts.getIterator();
while (iterator.hasNext()) {
Tuple2<String, Integer> tuple = iterator.next();
Tuple3<String, Integer, Long> outputTuple = new Tuple3<String, Integer, Long>(
(String) tuple.getField(0), (Integer) tuple.getField(1), timestamp);
outRecord.addTuple(outputTuple);
outTuple.f0 = tuple.f0;
outTuple.f1 = tuple.f1;
outTuple.f2 = timestamp;
out.collect(outTuple);
}
emit(outRecord);
}
@Override
public void invoke(StreamRecord record) throws Exception {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
long progress = record.getLong(i, windowFieldId);
if (initTimestamp == -1) {
initTimestamp = progress;
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new StreamRecord(record.getNumOfFields());
} else {
if (progress >= nextTimestamp) {
public void flatMap(Tuple2<String, Long> value,
Collector<Tuple3<String, Integer, Long>> out) throws Exception {
// TODO Auto-generated method stub
timestamp = value.f1;
if (initTimestamp == -1) {
initTimestamp = timestamp;
nextTimestamp = initTimestamp + computeGranularity;
tempTupleArray = new ArrayList<Tuple2<String, Long>>();
} else {
if (timestamp >= nextTimestamp) {
if (window.isFull()) {
ArrayList<Tuple2<String, Long>> expiredTupleArray = window.popFront();
incrementCompute(tempTupleArray);
decrementCompute(expiredTupleArray);
window.pushBack(tempTupleArray);
if (window.isEmittable()) {
produceOutput(timestamp, out);
}
} else {
incrementCompute(tempTupleArray);
window.pushBack(tempTupleArray);
if (window.isFull()) {
StreamRecord expiredRecord = window.popFront();
incrementCompute(tempRecord);
decrementCompute(expiredRecord);
window.pushBack(tempRecord);
if (window.isEmittable()) {
produceRecord(progress);
}
} else {
incrementCompute(tempRecord);
window.pushBack(tempRecord);
if (window.isFull()) {
produceRecord(progress);
}
produceOutput(timestamp, out);
}
initTimestamp = nextTimestamp;
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new StreamRecord(record.getNumOfFields());
}
initTimestamp = nextTimestamp;
nextTimestamp = initTimestamp + computeGranularity;
tempTupleArray = new ArrayList<Tuple2<String, Long>>();
}
tempRecord.addTuple(record.getTuple(i));
}
tempTupleArray.add(value);
}
}
......@@ -15,72 +15,22 @@
package eu.stratosphere.streaming.examples.window.wordcount;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
public class WindowWordCountLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("WindowWordCountSource", WindowWordCountSource.class);
graphBuilder.setTask("WindowWordCountSplitter", WindowWordCountSplitter.class, 1, 1);
graphBuilder.setTask("WindowWordCountCounter", WindowWordCountCounter.class, 1, 1);
graphBuilder.setSink("WindowWordCountSink", WindowWordCountSink.class);
graphBuilder.shuffleConnect("WindowWordCountSource", "WindowWordCountSplitter");
graphBuilder.fieldsConnect("WindowWordCountSplitter", "WindowWordCountCounter", 0);
graphBuilder.shuffleConnect("WindowWordCountCounter", "WindowWordCountSink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
@SuppressWarnings("unused")
DataStream<Tuple3<String, Integer, Long>> dataStream = context
.addSource(new WindowWordCountSource())
.flatMap(new WindowWordCountSplitter())
.partitionBy(0)
.flatMap(new WindowWordCountCounter())
.addSink(new WindowWordCountSink());
context.execute();
}
}
......@@ -15,26 +15,15 @@
package eu.stratosphere.streaming.examples.window.wordcount;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.SinkFunction;
public class WindowWordCountSink extends UserSinkInvokable {
public class WindowWordCountSink extends SinkFunction<Tuple3<String, Integer, Long>> {
private static final long serialVersionUID = 1L;
private String word = "";
private Integer count = 0;
private Long timestamp = 0L;
@Override
public void invoke(StreamRecord record) throws Exception {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
word = record.getString(i, 0);
count = record.getInteger(i, 1);
timestamp = record.getLong(i, 2);
System.out.println("============================================");
System.out.println(word + " " + count + " " + timestamp);
System.out.println("============================================");
}
public void invoke(Tuple3<String, Integer, Long> inTuple) {
// TODO Auto-generated method stubs
System.out.println(inTuple);
}
}
......@@ -16,32 +16,23 @@
package eu.stratosphere.streaming.examples.window.wordcount;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.util.Collector;
public class WindowWordCountSource extends UserSourceInvokable {
public class WindowWordCountSource extends SourceFunction<Tuple2<String, Long>> {
private static final long serialVersionUID = 1L;
private BufferedReader br = null;
private String line = "";
private StreamRecord outRecord = new StreamRecord(new Tuple2<String, Long>());
private Tuple2<String, Long> outRecord = new Tuple2<String, Long>();
private Long timestamp = 0L;
public WindowWordCountSource() {
try {
br = new BufferedReader(new FileReader("src/test/resources/testdata/hamlet.txt"));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
timestamp = 0L;
}
@Override
public void invoke() throws Exception {
public void invoke(Collector<Tuple2<String, Long>> collector) throws Exception {
// TODO Auto-generated method stub
BufferedReader br = new BufferedReader(new FileReader("src/test/resources/testdata/hamlet.txt"));
while(true){
line = br.readLine();
if(line==null){
......@@ -49,11 +40,11 @@ public class WindowWordCountSource extends UserSourceInvokable {
}
if (line != "") {
line=line.replaceAll("[\\-\\+\\.\\^:,]", "");
outRecord.setString(0, line);
outRecord.setLong(1, timestamp);
outRecord.f0 = line;
outRecord.f1 = timestamp;
collector.collect(outRecord);
timestamp++;
emit(outRecord);
}
}
}
}
}
......@@ -15,26 +15,26 @@
package eu.stratosphere.streaming.examples.window.wordcount;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.util.Collector;
import eu.stratosphere.api.java.functions.FlatMapFunction;
public class WindowWordCountSplitter extends UserTaskInvokable {
public class WindowWordCountSplitter extends FlatMapFunction<Tuple2<String, Long>, Tuple2<String, Long>> {
private static final long serialVersionUID = 1L;
private String[] words = new String[] {};
private Long timestamp = 0L;
private StreamRecord outputRecord = new StreamRecord(3);
private Tuple2<String, Long> outTuple = new Tuple2<String, Long>();
@Override
public void invoke(StreamRecord record) throws Exception {
outputRecord.Clear();
words = record.getString(0).split(" ");
timestamp = record.getLong(1);
for (String word : words) {
Tuple3<String, Integer, Long> tuple =new Tuple3<String, Integer, Long>(word, 1, timestamp);
outputRecord.addTuple(tuple);
public void flatMap(Tuple2<String, Long> inTuple, Collector<Tuple2<String, Long>> out) throws Exception {
words=inTuple.f0.split(" ");
timestamp=inTuple.f1;
for(String word : words){
outTuple.f0 = word;
outTuple.f1 = timestamp;
out.collect(outTuple);
}
emit(outputRecord);
}
}
\ No newline at end of file
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.kafka;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class KafkaProducer {
static kafka.javaapi.producer.Producer<Integer, String> producer;
static Properties props = new Properties();
public static void ProducerPrepare(String brokerAddr) {
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", brokerAddr);
producer = new kafka.javaapi.producer.Producer<Integer, String>(
new ProducerConfig(props));
}
public static void main(String[] args) throws Exception{
if (args.length == 1) {
String infilename=args[0];
String topicId=args[1];
String brokerAddr=args[2];
ProducerPrepare(brokerAddr);
BufferedReader reader = new BufferedReader(new FileReader(infilename));
while (true) {
String line=reader.readLine();
if(line==null){
reader.close();
reader = new BufferedReader(new FileReader(infilename));
continue;
}
producer.send(new KeyedMessage<Integer, String>(
topicId, line));
}
}else{
System.out.println("please set filename!");
System.exit(-1);
}
}
}
......@@ -29,9 +29,8 @@ import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.util.Collector;
/**
* Source for reading messages from a Kafka queue. The source currently only
* support string messages. Other types will be added soon.
*
* Source for reading messages from a Kafka queue.
* The source currently only support string messages.
*/
public class KafkaSource extends SourceFunction<Tuple1<String>> {
private static final long serialVersionUID = 1L;
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state;
import java.util.ArrayList;
import java.util.HashMap;
import eu.stratosphere.streaming.index.IndexPair;
/**
* The log-structured key value store thats accept any modification operation by
* appending the value to the end of the state.
*/
public class LogTableState<K, V> implements TableState<K, V> {
private HashMap<K, IndexPair> hashMap = new HashMap<K, IndexPair>();
private HashMap<Integer, ArrayList<V>> blockList = new HashMap<Integer, ArrayList<V>>();
private final int perBlockEntryCount = 1000;
private IndexPair nextInsertPos = new IndexPair(-1, -1);
public LogTableState() {
blockList.put(0, new ArrayList<V>());
nextInsertPos.setIndexPair(0, 0);
}
@Override
public void put(K key, V value) {
// TODO Auto-generated method stub
if (nextInsertPos.entryId == perBlockEntryCount) {
blockList.put(nextInsertPos.blockId + 1, new ArrayList<V>());
nextInsertPos.IncrementBlock();
}
blockList.get(nextInsertPos.blockId).add(value);
hashMap.put(key, new IndexPair(nextInsertPos));
nextInsertPos.entryId += 1;
}
@Override
public V get(K key) {
// TODO Auto-generated method stub
IndexPair index = hashMap.get(key);
if (index == null) {
return null;
} else {
return blockList.get(index.blockId).get(index.entryId);
}
}
@Override
public void delete(K key) {
// TODO Auto-generated method stub
hashMap.remove(key);
}
@Override
public boolean containsKey(K key) {
// TODO Auto-generated method stub
return hashMap.containsKey(key);
}
@Override
public String serialize() {
// TODO Auto-generated method stub
return null;
}
@Override
public void deserialize(String str) {
// TODO Auto-generated method stub
}
@Override
public TableStateIterator<K, V> getIterator() {
// TODO Auto-generated method stub
return new LogTableStateIterator<K, V>(hashMap.entrySet().iterator(), blockList);
}
}
......@@ -15,13 +15,14 @@
package eu.stratosphere.streaming.state;
import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* The most general internal state that stores data in a mutable map.
*/
public class MutableTableState<K, V> implements TableState<K, V> {
public class MutableTableState<K, V> implements TableState<K, V>, Serializable {
private Map<K, V> state=new LinkedHashMap<K, V>();
@Override
......
......@@ -15,9 +15,10 @@
package eu.stratosphere.streaming.state;
import org.apache.commons.collections.buffer.CircularFifoBuffer;
import java.io.Serializable;
import java.util.ArrayList;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import org.apache.commons.collections.buffer.CircularFifoBuffer;
/**
* The window state for window operator. To be general enough, this class
......@@ -25,7 +26,8 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
* compose time based window operator by extending this class by splitting the
* stream into multiple mini batches.
*/
public class SlidingWindowState<K> {
public class SlidingWindowState implements Serializable{
private static final long serialVersionUID = -2376149970115888901L;
private int currentRecordCount;
private int fullRecordCount;
private int slideRecordCount;
......@@ -41,13 +43,13 @@ public class SlidingWindowState<K> {
this.buffer = new CircularFifoBuffer(fullRecordCount);
}
public void pushBack(StreamRecord record) {
buffer.add(record);
public void pushBack(ArrayList tupleArray) {
buffer.add(tupleArray);
currentRecordCount += 1;
}
public StreamRecord popFront() {
StreamRecord frontRecord = (StreamRecord) buffer.get();
public ArrayList popFront() {
ArrayList frontRecord = (ArrayList) buffer.get();
buffer.remove();
return frontRecord;
}
......
......@@ -18,7 +18,7 @@ package eu.stratosphere.streaming.state;
/**
* An internal state interface that supports stateful operator.
*/
public interface TableState<K, V> {
public interface TableState<K, V>{
public void put(K key, V value);
public V get(K key);
public void delete(K key);
......
......@@ -15,13 +15,10 @@
package eu.stratosphere.streaming.api;
import static org.junit.Assert.fail;
import java.util.Iterator;
import org.junit.Test;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.util.Collector;
......@@ -72,8 +69,9 @@ public class BatchReduceTest {
@Test
public void test() throws Exception {
StreamExecutionEnvironment context = new StreamExecutionEnvironment(4);
DataStream<Tuple1<Double>> dataStream0 = context.addSource(new MySource()).batchReduce(new MyBatchReduce()).addSink(new MySink());
StreamExecutionEnvironment context = new StreamExecutionEnvironment(4, 1000);
DataStream<Tuple1<Double>> dataStream0 = context.addSource(new MySource())
.batchReduce(new MyBatchReduce()).addSink(new MySink());
context.execute();
}
......
package eu.stratosphere.streaming.api;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.util.Collector;
public class BatchTest {
private static int count = 0;
private static final class MySource extends SourceFunction<Tuple1<String>> {
private Tuple1<String> outTuple = new Tuple1<String>();
@Override
public void invoke(Collector<Tuple1<String>> collector) throws Exception {
for (int i = 0; i < 20; i++) {
outTuple.f0 = "string #" + i;
collector.collect(outTuple);
}
}
}
private static final class MyMap extends FlatMapFunction<Tuple1<String>, Tuple1<String>> {
@Override
public void flatMap(Tuple1<String> value, Collector<Tuple1<String>> out) throws Exception {
out.collect(value);
}
}
private static final class MySink extends SinkFunction<Tuple1<String>> {
@Override
public void invoke(Tuple1<String> tuple) {
count++;
}
}
@Test
public void test() throws Exception {
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
DataStream<Tuple1<String>> dataStream = context
.addSource(new MySource())
.flatMap(new MyMap()).batch(4)
.flatMap(new MyMap()).batch(2)
.flatMap(new MyMap()).batch(5)
.flatMap(new MyMap()).batch(4)
.addSink(new MySink());
context.execute();
assertEquals(20, count);
}
}
......@@ -71,12 +71,17 @@ public class FlatMapTest {
public void test() throws Exception {
try {
StreamExecutionEnvironment context2 = new StreamExecutionEnvironment(0);
StreamExecutionEnvironment context2 = new StreamExecutionEnvironment(0, 1000);
fail();
} catch (IllegalArgumentException e) {
try {
StreamExecutionEnvironment context2 = new StreamExecutionEnvironment(1, 0);
fail();
} catch (IllegalArgumentException e2) {
}
}
StreamExecutionEnvironment context = new StreamExecutionEnvironment(2);
StreamExecutionEnvironment context = new StreamExecutionEnvironment(2, 1000);
DataStream<Tuple1<String>> dataStream0 = context.addSource(new MySource());
DataStream<Tuple1<String>> dataStream1 = context.addDummySource().connectWith(dataStream0)
......@@ -96,7 +101,7 @@ public class FlatMapTest {
FlatMapFunction<Tuple, Tuple> f = (FlatMapFunction<Tuple, Tuple>) in.readObject();
StreamCollector<Tuple> s = new StreamCollector<Tuple>(1, 1, null);
StreamCollector<Tuple> s = new StreamCollector<Tuple>(1, 1000, 1, null);
Tuple t = new Tuple1<String>("asd");
f.flatMap(t, s);
......
......@@ -72,7 +72,7 @@ public class MapTest {
MapFunction<Tuple, Tuple> f = (MapFunction<Tuple, Tuple>) in.readObject();
StreamCollector<Tuple> s = new StreamCollector<Tuple>(1, 1, null);
StreamCollector<Tuple> s = new StreamCollector<Tuple>(1, 1000, 1, null);
Tuple t = new Tuple1<String>("asd");
s.collect(f.map(t));
......
......@@ -25,13 +25,13 @@ public class StreamCollectorTest {
@Test
public void testStreamCollector() {
StreamCollector collector = new StreamCollector(10, 0, null);
StreamCollector collector = new StreamCollector(10, 1000, 0, null);
assertEquals(10, collector.batchSize);
}
@Test
public void testCollect() {
StreamCollector collector = new StreamCollector(2, 0, null);
StreamCollector collector = new StreamCollector(2, 1000, 0, null);
collector.collect(new Tuple1<Integer>(3));
collector.collect(new Tuple1<Integer>(4));
collector.collect(new Tuple1<Integer>(5));
......@@ -39,6 +39,20 @@ public class StreamCollectorTest {
}
@Test
public void testBatchSize() throws InterruptedException {
System.out.println("---------------");
StreamCollector collector = new StreamCollector(3, 100, 0, null);
collector.collect(new Tuple1<Integer>(0));
collector.collect(new Tuple1<Integer>(0));
collector.collect(new Tuple1<Integer>(0));
Thread.sleep(200);
collector.collect(new Tuple1<Integer>(2));
collector.collect(new Tuple1<Integer>(3));
System.out.println("---------------");
}
@Test
public void testClose() {
}
......
......@@ -97,4 +97,17 @@ public class ArrayStreamRecordTest {
}
@Test
public void truncatedSizeTest() {
StreamRecord record = new ArrayStreamRecord(4);
record.setTuple(0, new Tuple1<Integer>(0));
record.setTuple(1, new Tuple1<Integer>(1));
record.setTuple(2, new Tuple1<Integer>(2));
record.setTuple(3, new Tuple1<Integer>(3));
StreamRecord truncatedRecord = new ArrayStreamRecord(record, 2);
assertEquals(2, truncatedRecord.batchSize);
assertEquals(0, truncatedRecord.getTuple(0).getField(0));
assertEquals(1, truncatedRecord.getTuple(1).getField(0));
}
}
......@@ -18,7 +18,6 @@ package eu.stratosphere.streaming.state;
import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.state.LogTableState;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.TableStateIterator;
import eu.stratosphere.streaming.state.SlidingWindowState;
......@@ -53,34 +52,6 @@ public class InternalStateTest {
}
}
@Test
public void LogTableStateTest(){
LogTableState<String, String> state=new LogTableState<String, String>();
state.put("abc", "hello");
state.put("test", "world");
state.put("state", "mutable");
state.put("streaming", "persist");
String s=state.get("streaming");
if(s==null){
System.out.println("key does not exist!");
}
else{
System.out.println("value="+s);
}
s=state.get("null");
if(s==null){
System.out.println("key does not exist!");
}
else{
System.out.println("value="+s);
}
TableStateIterator<String, String> iterator=state.getIterator();
while(iterator.hasNext()){
Tuple2<String, String> tuple=iterator.next();
System.out.println(tuple.getField(0)+", "+tuple.getField(1));
}
}
@Test
public void WindowStateTest(){
SlidingWindowState<String> state=new SlidingWindowState<String>(100, 20, 10);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册