提交 7b780ca6 编写于 作者: Y Yingjun Wu 提交者: Stephan Ewen

[streaming] add window sum example

上级 ee68f63a
......@@ -86,6 +86,16 @@ public class StreamRecord implements IOReadableWritable, Serializable {
public StreamRecord() {
}
public StreamRecord(StreamRecord record) {
this.numOfFields = record.getNumOfFields();
this.numOfTuples = 0;
tupleBatch = new ArrayList<Tuple>();
this.uid = new UID(Arrays.copyOf(record.getId().getId(), 20));
for (int i = 0; i < record.getNumOfTuples(); ++i) {
this.tupleBatch.add(copyTuple(record.getTuple(i)));
}
}
/**
* Creates empty StreamRecord with number of fields set
*
......@@ -95,7 +105,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
public StreamRecord(int numOfFields) {
this.numOfFields = numOfFields;
this.numOfTuples = 0;
tupleBatch = new ArrayList<Tuple>();
this.batchSize = 1;
tupleBatch = new ArrayList<Tuple>(batchSize);
}
/**
......@@ -113,16 +124,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
tupleBatch = new ArrayList<Tuple>(batchSize);
}
public StreamRecord(StreamRecord record) {
this.numOfFields = record.getNumOfFields();
this.numOfTuples = 0;
tupleBatch = new ArrayList<Tuple>();
this.uid = new UID(Arrays.copyOf(record.getId().getId(), 20));
for (int i = 0; i < record.getNumOfTuples(); ++i) {
this.tupleBatch.add(copyTuple(record.getTuple(i)));
}
}
/**
* Creates a new batch of records containing only the given Tuple as element
* and sets desired batch size.
......@@ -138,6 +139,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
this.batchSize = batchSize;
tupleBatch = new ArrayList<Tuple>(batchSize);
tupleBatch.add(tuple);
System.out.println("here, the tuple batch size is"+tupleBatch.size());
}
/**
......@@ -147,7 +149,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param tupleList
* Tuples to bes stored in the StreamRecord
*/
public StreamRecord(List<Tuple> tupleList) {
numOfFields = tupleList.get(0).getArity();
numOfTuples = tupleList.size();
......@@ -166,6 +167,59 @@ public class StreamRecord implements IOReadableWritable, Serializable {
this(tuple, 1);
}
/**
* Checks if the number of fields are equal to the batch field size then
* adds the Tuple to the end of the batch
*
* @param tuple
* Tuple to be added as the next record of the batch
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public void addTuple(Tuple tuple) throws TupleSizeMismatchException {
addTuple(numOfTuples, tuple);
}
/**
* Checks if the number of fields are equal to the batch field size then
* inserts the Tuple to the given position into the recordbatch
*
* @param index
* Position of the added tuple
* @param tuple
* Tuple to be added as the next record of the batch
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public void addTuple(int index, Tuple tuple) throws TupleSizeMismatchException {
if (tuple.getArity() == numOfFields) {
tupleBatch.add(index, tuple);
numOfTuples++;
} else {
throw new TupleSizeMismatchException();
}
}
/**
* Removes the tuple at the given position from the batch and returns it
*
* @param index
* Index of tuple to remove
* @return Removed tuple
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public Tuple removeTuple(int index) throws TupleSizeMismatchException {
if (index < numOfTuples) {
numOfTuples--;
return tupleBatch.remove(index);
} else {
throw new TupleSizeMismatchException();
}
}
public boolean isEmpty() {
return (this.numOfTuples == 0);
}
......@@ -941,57 +995,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
}
/**
* Checks if the number of fields are equal to the batch field size then
* adds the Tuple to the end of the batch
*
* @param tuple
* Tuple to be added as the next record of the batch
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public void addTuple(Tuple tuple) throws TupleSizeMismatchException {
addTuple(numOfTuples, tuple);
}
/**
* Checks if the number of fields are equal to the batch field size then
* inserts the Tuple to the given position into the recordbatch
*
* @param index
* Position of the added tuple
* @param tuple
* Tuple to be added as the next record of the batch
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public void addTuple(int index, Tuple tuple) throws TupleSizeMismatchException {
if (tuple.getArity() == numOfFields) {
tupleBatch.add(index, tuple);
numOfTuples++;
} else {
throw new TupleSizeMismatchException();
}
}
/**
* Removes the tuple at the given position from the batch and returns it
*
* @param index
* Index of tuple to remove
* @return Removed tuple
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public Tuple removeTuple(int index) throws TupleSizeMismatchException {
if (index < numOfTuples) {
numOfTuples--;
return tupleBatch.remove(index);
} else {
throw new TupleSizeMismatchException();
}
}
/**
* Creates a copy of the StreamRecord object by Serializing and
* deserializing it
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.sum;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.WindowState;
public class WindowSumAggregate extends UserTaskInvokable {
private int windowSize;
private int slidingStep;
private int computeGranularity;
private int windowFieldId;
private WindowState<Integer> window;
private MutableTableState<String, Integer> sum;
private Integer number = 0;
private Integer timestamp = 0;
private StreamRecord outRecord = new StreamRecord(new Tuple2<Integer, Integer>());
public WindowSumAggregate() {
windowSize = 100;
slidingStep = 20;
computeGranularity = 10;
windowFieldId = 1;
window = new WindowState<Integer>(windowSize, slidingStep,
computeGranularity, windowFieldId);
sum = new MutableTableState<String, Integer>();
sum.put("sum", 0);
}
private void incrementCompute(StreamRecord record) {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
number = record.getInteger(i, 0);
sum.put("sum", sum.get("sum")+number);
}
}
private void decrementCompute(StreamRecord record) {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
number = record.getInteger(i, 0);
sum.put("sum", sum.get("sum")-number);
}
}
@Override
public void invoke(StreamRecord record) throws Exception {
if (window.isFull()) {
StreamRecord expiredRecord = window.popFront();
incrementCompute(record);
decrementCompute(expiredRecord);
window.pushBack(record);
if (window.isComputable()) {
outRecord.setInteger(0, sum.get("sum"));
outRecord.setInteger(1, record.getInteger(1));
emit(outRecord);
}
} else {
incrementCompute(record);
window.pushBack(record);
if (window.isFull()) {
outRecord.setInteger(0, sum.get("sum"));
outRecord.setInteger(1, record.getInteger(1));
emit(outRecord);
}
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.sum;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.util.LogUtils;
//TODO: window operator remains unfinished.
public class WindowSumLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WindowSumSource", WindowSumSource.class);
graphBuilder.setTask("WindowSumMultiple", WindowSumMultiple.class, 1, 1);
graphBuilder.setTask("WindowSumAggregate", WindowSumAggregate.class, 1, 1);
graphBuilder.setSink("WindowSumSink", WindowSumSink.class);
graphBuilder.shuffleConnect("WindowSumSource", "WindowSumMultiple");
graphBuilder.shuffleConnect("WindowSumMultiple", "WindowSumAggregate");
graphBuilder.shuffleConnect("WindowSumAggregate", "WindowSumSink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.sum;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowSumMultiple extends UserTaskInvokable {
private StreamRecord outputRecord = new StreamRecord(new Tuple2<Integer, Integer>());
private Integer number = 0;
private Integer timestamp = 0;
@Override
public void invoke(StreamRecord record) throws Exception {
number = record.getInteger(0);
timestamp = record.getInteger(1);
System.out.println("number=" + number + ", timestamp=" + timestamp);
outputRecord.setInteger(0, number);
outputRecord.setInteger(1, timestamp);
emit(outputRecord);
}
}
\ No newline at end of file
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.sum;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowSumSink extends UserSinkInvokable {
private Integer sum = 0;
private Integer timestamp = 0;
@Override
public void invoke(StreamRecord record) throws Exception {
sum = record.getInteger(0, 0);
timestamp = record.getInteger(0, 1);
System.out.println("============================================");
System.out.println(sum + " " + timestamp);
System.out.println("============================================");
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.sum;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowSumSource extends UserSourceInvokable {
private StreamRecord outRecord = new StreamRecord(
new Tuple2<Integer, Integer>());
private Integer timestamp = 0;
@Override
public void invoke() throws Exception {
for (int i = 0; i < 1000; ++i) {
outRecord.setInteger(0, i);
outRecord.setInteger(1, timestamp);
timestamp++;
emit(outRecord);
}
}
}
......@@ -36,7 +36,6 @@ public class WindowWordCountCounter extends UserTaskInvokable {
private String word = "";
private Integer count = 0;
private Long timestamp = 0L;
private StreamRecord outRecord = new StreamRecord(3);
public WindowWordCountCounter() {
windowSize = 100;
......@@ -77,6 +76,7 @@ public class WindowWordCountCounter extends UserTaskInvokable {
@Override
public void invoke(StreamRecord record) throws Exception {
StreamRecord outRecord = new StreamRecord(3);
if (window.isFull()) {
StreamRecord expiredRecord = window.popFront();
incrementCompute(record);
......
......@@ -21,12 +21,12 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowWordCountSplitter extends UserTaskInvokable {
private String[] words = new String[] {};
private StreamRecord outputRecord = new StreamRecord(3);
private Long timestamp = 0L;
@Override
public void invoke(StreamRecord record) throws Exception {
StreamRecord outputRecord = new StreamRecord(3);
words = record.getString(0).split(" ");
timestamp = record.getLong(1);
System.out.println("sentence=" + record.getString(0) + ", timestamp="
......
......@@ -20,9 +20,20 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WordCountSink extends UserSinkInvokable {
private String word = "";
private Integer count = 0;
private Long timestamp = 0L;
@Override
public void invoke(StreamRecord record) throws Exception {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
word = record.getString(i, 0);
count = record.getInteger(i, 1);
timestamp = record.getLong(i, 2);
System.out.println("============================================");
System.out.println(word + " " + count + " " + timestamp);
System.out.println("============================================");
}
}
}
......@@ -62,6 +62,14 @@ public class WindowState<K> {
this.windowIndex = new HashMap<K, IndexPair>();
this.buffer = new CircularFifoBuffer(fullRecordCount);
}
public int getWindowSize(){
return windowSize;
}
public int getSlidingStep(){
return slidingStep;
}
public void pushBack(StreamRecord record) {
if (initTimestamp == -1) {
......
......@@ -10,107 +10,112 @@ import pandas as pd
import os
import operator
linestyles = ['_', '-', '--', ':']
markers=['D','s', '|', '', 'x', '_', '^', ' ', 'd', 'h', '+', '*', ',', 'o', '.', '1', 'p', 'H', 'v', '>'];
colors = ['b', 'g', 'r', 'c', 'm', 'y', 'k']
def readFiles(csv_dir):
counters=[]
dataframes=[]
for fname in os.listdir(csv_dir):
if '.csv' in fname:
counters.append((fname.rstrip('.csv'),int(fname.rstrip('.csv').split('-')[-1])-1,pd.read_csv(os.path.join(csv_dir,fname),index_col='Time')))
return counters
dataframes.append((fname.rstrip('.csv'),int(fname.rstrip('.csv').split('-')[-1])-1,pd.read_csv(os.path.join(csv_dir,fname),index_col='Time')))
return dataframes
def plotCounter(csv_dir, sname='', smooth=5,savePath=''):
counters= readFiles(csv_dir)
addSpeed(counters)
def plotCounter(csv_dir,name='', smooth=5):
dataframes= readFiles(csv_dir)
for dataframe in dataframes:
df=dataframe[2]
speed=[0]
values=list(df.ix[:,0])
for i in range(1,len(values)):
speed.append(float(values[i]-values[i-1])/float(df.index[i]-df.index[i-1]+0.01))
df['speed']=speed
plt.figure(figsize=(12, 8), dpi=80)
plt.title('Counter')
if name=='':
selectedCounters=[]
for (name, number, df) in counters:
if sname in name:
selectedCounters.append((name, number, df))
if sname=='':
sname='counters'
save=savePath!=''
plotDfs(selectedCounters,smooth,save,savePath+'/'+sname)
def plotDfs(counters,smooth,save,saveFile):
for dataframe in dataframes:
m=markers[dataframe[1]%len(markers)]
dataframe[2].ix[:,0].plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in dataframes])
plt.figure(figsize=(12, 8), dpi=80)
plt.title('Counter')
for (name, number, df) in counters:
plt.title('dC/dT')
for dataframe in dataframes:
m=markers[dataframe[1]%len(markers)]
m=markers[number%len(markers)]
pd.rolling_mean(dataframe[2].speed,smooth).plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in dataframes])
else:
df2=[]
for dataframe in dataframes:
if name in dataframe[0]:
df2.append(dataframe)
for dataframe in df2:
m=markers[dataframe[1]%len(markers)]
df.ix[:,0].plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in counters])
if save:
plt.savefig(saveFile+'C.png')
dataframe[2].ix[:,0].plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in df2])
plt.figure(figsize=(12, 8), dpi=80)
plt.title('dC/dT')
for (name, number, df) in counters:
for dataframe in df2:
m=markers[number%len(markers)]
m=markers[dataframe[1]%len(markers)]
pd.rolling_mean(df.speed,smooth).plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in counters])
if save:
plt.savefig(saveFile+'D.png')
def addSpeed(counters):
for (tname, number, df) in counters:
speed=[0]
values=list(df.ix[:,0])
for i in range(1,len(values)):
speed.append(float(values[i]-values[i-1])/float(df.index[i]-df.index[i-1]+0.01))
df['speed']=speed
return counters
pd.rolling_mean(dataframe[2].speed,smooth).plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in df2])
def plotThroughput(csv_dir,taskname, smooth=5):
dataframes= readFiles(csv_dir)
for dataframe in dataframes:
df=dataframe[2]
speed=[0]
values=list(df.ix[:,0])
for i in range(1,len(values)):
speed.append(float(values[i]-values[i-1])/float(df.index[i]-df.index[i-1]+0.01))
df['speed']=speed
selected={}
for df in dataframes:
if taskname in df[0]:
if df[1] in selected:
selected[df[1]].append(df[2])
else:
selected[df[1]]=[df[2]]
plt.figure()
plt.title(taskname)
for i in selected:
selected[i]=reduce(operator.add,selected[i])
m=markers[i%len(markers)]
selected[i].ix[:,0].plot(marker=m,markevery=10,markersize=10)
plt.legend(selected.keys())
plt.figure()
plt.title(taskname+" - dC/dT")
for i in selected:
m=markers[i%len(markers)]
pd.rolling_mean(selected[i].speed,smooth).plot(marker=m,markevery=10,markersize=10)
def plotThroughput(csv_dir,tasknames, smooth=5,savePath=''):
if type(tasknames)!=list:
tasknames=[tasknames]
for taskname in tasknames:
counters= readFiles(csv_dir)
addSpeed(counters)
selected={}
for (tname, number, df) in counters:
if taskname in tname:
if number in selected:
selected[number].append(df)
else:
selected[number]=[df]
plt.figure()
plt.title(taskname)
for i in selected:
if len(selected[i])>1:
selected[i]=reduce(operator.add,selected[i])
else:
selected[i]=selected[i][0]
m=markers[i%len(markers)]
selected[i].ix[:,0].plot(marker=m,markevery=10,markersize=10)
plt.legend(selected.keys())
if savePath !='':
plt.savefig(savePath+'/'+taskname+'C.png')
plt.figure()
plt.title(taskname+" - dC/dT")
for i in selected:
m=markers[i%len(markers)]
pd.rolling_mean(selected[i].speed,smooth).plot(marker=m,markevery=10,markersize=10)
plt.legend(selected.keys())
if savePath !='':
plt.savefig(savePath+'/'+taskname+'D.png')
plt.legend(selected.keys())
def plotTimer(csv_dir,smooth=5,std=50):
dataframes= readFiles(csv_dir)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册