diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java index a641138cc368e6d0ce49c87b9b728f1379f0de95..587ccf272d5c28dd4c17e42817181ddc85ab1b3b 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java @@ -86,6 +86,16 @@ public class StreamRecord implements IOReadableWritable, Serializable { public StreamRecord() { } + public StreamRecord(StreamRecord record) { + this.numOfFields = record.getNumOfFields(); + this.numOfTuples = 0; + tupleBatch = new ArrayList(); + this.uid = new UID(Arrays.copyOf(record.getId().getId(), 20)); + for (int i = 0; i < record.getNumOfTuples(); ++i) { + this.tupleBatch.add(copyTuple(record.getTuple(i))); + } + } + /** * Creates empty StreamRecord with number of fields set * @@ -95,7 +105,8 @@ public class StreamRecord implements IOReadableWritable, Serializable { public StreamRecord(int numOfFields) { this.numOfFields = numOfFields; this.numOfTuples = 0; - tupleBatch = new ArrayList(); + this.batchSize = 1; + tupleBatch = new ArrayList(batchSize); } /** @@ -113,16 +124,6 @@ public class StreamRecord implements IOReadableWritable, Serializable { tupleBatch = new ArrayList(batchSize); } - public StreamRecord(StreamRecord record) { - this.numOfFields = record.getNumOfFields(); - this.numOfTuples = 0; - tupleBatch = new ArrayList(); - this.uid = new UID(Arrays.copyOf(record.getId().getId(), 20)); - for (int i = 0; i < record.getNumOfTuples(); ++i) { - this.tupleBatch.add(copyTuple(record.getTuple(i))); - } - } - /** * Creates a new batch of records containing only the given Tuple as element * and sets desired batch size. @@ -138,6 +139,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { this.batchSize = batchSize; tupleBatch = new ArrayList(batchSize); tupleBatch.add(tuple); + System.out.println("here, the tuple batch size is"+tupleBatch.size()); } /** @@ -147,7 +149,6 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @param tupleList * Tuples to bes stored in the StreamRecord */ - public StreamRecord(List tupleList) { numOfFields = tupleList.get(0).getArity(); numOfTuples = tupleList.size(); @@ -166,6 +167,59 @@ public class StreamRecord implements IOReadableWritable, Serializable { this(tuple, 1); } + /** + * Checks if the number of fields are equal to the batch field size then + * adds the Tuple to the end of the batch + * + * @param tuple + * Tuple to be added as the next record of the batch + * @throws TupleSizeMismatchException + * Tuple specified has illegal size + */ + public void addTuple(Tuple tuple) throws TupleSizeMismatchException { + addTuple(numOfTuples, tuple); + } + + /** + * Checks if the number of fields are equal to the batch field size then + * inserts the Tuple to the given position into the recordbatch + * + * @param index + * Position of the added tuple + * @param tuple + * Tuple to be added as the next record of the batch + * @throws TupleSizeMismatchException + * Tuple specified has illegal size + */ + public void addTuple(int index, Tuple tuple) throws TupleSizeMismatchException { + if (tuple.getArity() == numOfFields) { + tupleBatch.add(index, tuple); + numOfTuples++; + } else { + throw new TupleSizeMismatchException(); + } + } + + /** + * Removes the tuple at the given position from the batch and returns it + * + * @param index + * Index of tuple to remove + * @return Removed tuple + * @throws TupleSizeMismatchException + * Tuple specified has illegal size + */ + public Tuple removeTuple(int index) throws TupleSizeMismatchException { + if (index < numOfTuples) { + numOfTuples--; + return tupleBatch.remove(index); + } else { + throw new TupleSizeMismatchException(); + } + } + + + public boolean isEmpty() { return (this.numOfTuples == 0); } @@ -941,57 +995,6 @@ public class StreamRecord implements IOReadableWritable, Serializable { } } - /** - * Checks if the number of fields are equal to the batch field size then - * adds the Tuple to the end of the batch - * - * @param tuple - * Tuple to be added as the next record of the batch - * @throws TupleSizeMismatchException - * Tuple specified has illegal size - */ - public void addTuple(Tuple tuple) throws TupleSizeMismatchException { - addTuple(numOfTuples, tuple); - } - - /** - * Checks if the number of fields are equal to the batch field size then - * inserts the Tuple to the given position into the recordbatch - * - * @param index - * Position of the added tuple - * @param tuple - * Tuple to be added as the next record of the batch - * @throws TupleSizeMismatchException - * Tuple specified has illegal size - */ - public void addTuple(int index, Tuple tuple) throws TupleSizeMismatchException { - if (tuple.getArity() == numOfFields) { - tupleBatch.add(index, tuple); - numOfTuples++; - } else { - throw new TupleSizeMismatchException(); - } - } - - /** - * Removes the tuple at the given position from the batch and returns it - * - * @param index - * Index of tuple to remove - * @return Removed tuple - * @throws TupleSizeMismatchException - * Tuple specified has illegal size - */ - public Tuple removeTuple(int index) throws TupleSizeMismatchException { - if (index < numOfTuples) { - numOfTuples--; - return tupleBatch.remove(index); - } else { - throw new TupleSizeMismatchException(); - } - } - /** * Creates a copy of the StreamRecord object by Serializing and * deserializing it diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumAggregate.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumAggregate.java new file mode 100644 index 0000000000000000000000000000000000000000..6ef415b10a2f82854ac56dfb4c62ef3646584b32 --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumAggregate.java @@ -0,0 +1,88 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.examples.window.sum; + +import eu.stratosphere.api.java.tuple.Tuple2; +import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; +import eu.stratosphere.streaming.api.streamrecord.StreamRecord; +import eu.stratosphere.streaming.state.MutableTableState; +import eu.stratosphere.streaming.state.WindowState; + +public class WindowSumAggregate extends UserTaskInvokable { + + private int windowSize; + private int slidingStep; + private int computeGranularity; + private int windowFieldId; + + private WindowState window; + private MutableTableState sum; + + private Integer number = 0; + private Integer timestamp = 0; + private StreamRecord outRecord = new StreamRecord(new Tuple2()); + + public WindowSumAggregate() { + windowSize = 100; + slidingStep = 20; + computeGranularity = 10; + windowFieldId = 1; + window = new WindowState(windowSize, slidingStep, + computeGranularity, windowFieldId); + sum = new MutableTableState(); + sum.put("sum", 0); + } + + private void incrementCompute(StreamRecord record) { + int numTuple = record.getNumOfTuples(); + for (int i = 0; i < numTuple; ++i) { + number = record.getInteger(i, 0); + sum.put("sum", sum.get("sum")+number); + } + } + + private void decrementCompute(StreamRecord record) { + int numTuple = record.getNumOfTuples(); + for (int i = 0; i < numTuple; ++i) { + number = record.getInteger(i, 0); + sum.put("sum", sum.get("sum")-number); + } + } + + @Override + public void invoke(StreamRecord record) throws Exception { + if (window.isFull()) { + StreamRecord expiredRecord = window.popFront(); + incrementCompute(record); + decrementCompute(expiredRecord); + window.pushBack(record); + if (window.isComputable()) { + outRecord.setInteger(0, sum.get("sum")); + outRecord.setInteger(1, record.getInteger(1)); + emit(outRecord); + } + } else { + incrementCompute(record); + window.pushBack(record); + if (window.isFull()) { + outRecord.setInteger(0, sum.get("sum")); + outRecord.setInteger(1, record.getInteger(1)); + emit(outRecord); + } + } + + } +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumLocal.java new file mode 100644 index 0000000000000000000000000000000000000000..2df95b4f150cb8c3c45a6a17bc27782a24d1d8ce --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumLocal.java @@ -0,0 +1,86 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.examples.window.sum; + +import java.net.InetSocketAddress; + +import org.apache.log4j.Level; + +import eu.stratosphere.client.minicluster.NepheleMiniCluster; +import eu.stratosphere.client.program.Client; +import eu.stratosphere.configuration.Configuration; +import eu.stratosphere.nephele.jobgraph.JobGraph; +import eu.stratosphere.streaming.api.JobGraphBuilder; +import eu.stratosphere.streaming.util.LogUtils; + +//TODO: window operator remains unfinished. +public class WindowSumLocal { + + public static JobGraph getJobGraph() { + JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); + graphBuilder.setSource("WindowSumSource", WindowSumSource.class); + graphBuilder.setTask("WindowSumMultiple", WindowSumMultiple.class, 1, 1); + graphBuilder.setTask("WindowSumAggregate", WindowSumAggregate.class, 1, 1); + graphBuilder.setSink("WindowSumSink", WindowSumSink.class); + + graphBuilder.shuffleConnect("WindowSumSource", "WindowSumMultiple"); + graphBuilder.shuffleConnect("WindowSumMultiple", "WindowSumAggregate"); + graphBuilder.shuffleConnect("WindowSumAggregate", "WindowSumSink"); + + return graphBuilder.getJobGraph(); + } + + public static void main(String[] args) { + + LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO); + + try { + + JobGraph jG = getJobGraph(); + Configuration configuration = jG.getJobConfiguration(); + + if (args.length == 0) { + args = new String[] { "local" }; + } + + if (args[0].equals("local")) { + System.out.println("Running in Local mode"); + NepheleMiniCluster exec = new NepheleMiniCluster(); + + exec.start(); + + Client client = new Client(new InetSocketAddress("localhost", 6498), configuration); + + client.run(jG, true); + + exec.stop(); + + } else if (args[0].equals("cluster")) { + System.out.println("Running in Cluster2 mode"); + + Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123), + configuration); + + client.run(jG, true); + + } + + } catch (Exception e) { + System.out.println(e); + } + + } +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumMultiple.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumMultiple.java new file mode 100644 index 0000000000000000000000000000000000000000..82f053df3d431d02b814ef4e7265a0ff6bd88790 --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumMultiple.java @@ -0,0 +1,39 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.examples.window.sum; + +import eu.stratosphere.api.java.tuple.Tuple2; +import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; +import eu.stratosphere.streaming.api.streamrecord.StreamRecord; + +public class WindowSumMultiple extends UserTaskInvokable { + + private StreamRecord outputRecord = new StreamRecord(new Tuple2()); + + private Integer number = 0; + private Integer timestamp = 0; + + @Override + public void invoke(StreamRecord record) throws Exception { + number = record.getInteger(0); + timestamp = record.getInteger(1); + System.out.println("number=" + number + ", timestamp=" + timestamp); + + outputRecord.setInteger(0, number); + outputRecord.setInteger(1, timestamp); + emit(outputRecord); + } +} \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumSink.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumSink.java new file mode 100644 index 0000000000000000000000000000000000000000..d29eef1a5e704b20157fdf2f050055b1ab7386aa --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumSink.java @@ -0,0 +1,34 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.examples.window.sum; + +import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; +import eu.stratosphere.streaming.api.streamrecord.StreamRecord; + +public class WindowSumSink extends UserSinkInvokable { + + private Integer sum = 0; + private Integer timestamp = 0; + + @Override + public void invoke(StreamRecord record) throws Exception { + sum = record.getInteger(0, 0); + timestamp = record.getInteger(0, 1); + System.out.println("============================================"); + System.out.println(sum + " " + timestamp); + System.out.println("============================================"); + } +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumSource.java new file mode 100644 index 0000000000000000000000000000000000000000..bb55768065c6758b01291cf25154c3d657d6ec5c --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumSource.java @@ -0,0 +1,37 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.examples.window.sum; + +import eu.stratosphere.api.java.tuple.Tuple2; +import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; +import eu.stratosphere.streaming.api.streamrecord.StreamRecord; + +public class WindowSumSource extends UserSourceInvokable { + + private StreamRecord outRecord = new StreamRecord( + new Tuple2()); + private Integer timestamp = 0; + + @Override + public void invoke() throws Exception { + for (int i = 0; i < 1000; ++i) { + outRecord.setInteger(0, i); + outRecord.setInteger(1, timestamp); + timestamp++; + emit(outRecord); + } + } +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountCounter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountCounter.java index 6f8b95586a87abf504fa9393dfcbd03ee03b0dab..23209ee3289d4f16325817ba591a6c8ec5228b15 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountCounter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountCounter.java @@ -36,7 +36,6 @@ public class WindowWordCountCounter extends UserTaskInvokable { private String word = ""; private Integer count = 0; private Long timestamp = 0L; - private StreamRecord outRecord = new StreamRecord(3); public WindowWordCountCounter() { windowSize = 100; @@ -77,6 +76,7 @@ public class WindowWordCountCounter extends UserTaskInvokable { @Override public void invoke(StreamRecord record) throws Exception { + StreamRecord outRecord = new StreamRecord(3); if (window.isFull()) { StreamRecord expiredRecord = window.popFront(); incrementCompute(record); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSplitter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSplitter.java index a2c80c54d2bf70b0456c7cac082ae2fefd3c7494..94c7aa8f69f5ffe24c33707140b8cfdb1b72cb4b 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSplitter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSplitter.java @@ -21,12 +21,12 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord; public class WindowWordCountSplitter extends UserTaskInvokable { private String[] words = new String[] {}; - private StreamRecord outputRecord = new StreamRecord(3); private Long timestamp = 0L; @Override public void invoke(StreamRecord record) throws Exception { + StreamRecord outputRecord = new StreamRecord(3); words = record.getString(0).split(" "); timestamp = record.getLong(1); System.out.println("sentence=" + record.getString(0) + ", timestamp=" diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSink.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSink.java index 1d51c3d117b8367093a6a3b550468f1f6af97ad2..5645249e905866e69f543e6dbe69593f1f1114c3 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSink.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSink.java @@ -20,9 +20,20 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord; public class WordCountSink extends UserSinkInvokable { + private String word = ""; + private Integer count = 0; + private Long timestamp = 0L; + @Override public void invoke(StreamRecord record) throws Exception { - + int numTuple = record.getNumOfTuples(); + for (int i = 0; i < numTuple; ++i) { + word = record.getString(i, 0); + count = record.getInteger(i, 1); + timestamp = record.getLong(i, 2); + System.out.println("============================================"); + System.out.println(word + " " + count + " " + timestamp); + System.out.println("============================================"); + } } - } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/WindowState.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/WindowState.java index 732a0ffb6ab733f2e6fa396c5c1d081bec5a2c34..26e4d62d6fce2f8975e6172425f2e227830a2b12 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/WindowState.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/WindowState.java @@ -62,6 +62,14 @@ public class WindowState { this.windowIndex = new HashMap(); this.buffer = new CircularFifoBuffer(fullRecordCount); } + + public int getWindowSize(){ + return windowSize; + } + + public int getSlidingStep(){ + return slidingStep; + } public void pushBack(StreamRecord record) { if (initTimestamp == -1) { diff --git a/flink-addons/flink-streaming/src/test/resources/Performance/PerformanceTracker.py b/flink-addons/flink-streaming/src/test/resources/Performance/PerformanceTracker.py index 423daa83e8c72ff35471035f268f66d54e5fd09e..ad15e1b3504cf39e96d2f622ac1f99d010865371 100755 --- a/flink-addons/flink-streaming/src/test/resources/Performance/PerformanceTracker.py +++ b/flink-addons/flink-streaming/src/test/resources/Performance/PerformanceTracker.py @@ -10,107 +10,112 @@ import pandas as pd import os import operator + linestyles = ['_', '-', '--', ':'] markers=['D','s', '|', '', 'x', '_', '^', ' ', 'd', 'h', '+', '*', ',', 'o', '.', '1', 'p', 'H', 'v', '>']; colors = ['b', 'g', 'r', 'c', 'm', 'y', 'k'] def readFiles(csv_dir): - counters=[] - + dataframes=[] + + for fname in os.listdir(csv_dir): if '.csv' in fname: - counters.append((fname.rstrip('.csv'),int(fname.rstrip('.csv').split('-')[-1])-1,pd.read_csv(os.path.join(csv_dir,fname),index_col='Time'))) - return counters + dataframes.append((fname.rstrip('.csv'),int(fname.rstrip('.csv').split('-')[-1])-1,pd.read_csv(os.path.join(csv_dir,fname),index_col='Time'))) + return dataframes -def plotCounter(csv_dir, sname='', smooth=5,savePath=''): - counters= readFiles(csv_dir) - addSpeed(counters) +def plotCounter(csv_dir,name='', smooth=5): + dataframes= readFiles(csv_dir) + + for dataframe in dataframes: + df=dataframe[2] + speed=[0] + values=list(df.ix[:,0]) + for i in range(1,len(values)): + speed.append(float(values[i]-values[i-1])/float(df.index[i]-df.index[i-1]+0.01)) + df['speed']=speed - + plt.figure(figsize=(12, 8), dpi=80) + plt.title('Counter') + if name=='': - selectedCounters=[] - for (name, number, df) in counters: - if sname in name: - selectedCounters.append((name, number, df)) - if sname=='': - sname='counters' - save=savePath!='' - - plotDfs(selectedCounters,smooth,save,savePath+'/'+sname) - -def plotDfs(counters,smooth,save,saveFile): + for dataframe in dataframes: + + m=markers[dataframe[1]%len(markers)] + + dataframe[2].ix[:,0].plot(marker=m,markevery=10,markersize=10) + plt.legend([x[0] for x in dataframes]) + plt.figure(figsize=(12, 8), dpi=80) - plt.title('Counter') - for (name, number, df) in counters: + plt.title('dC/dT') + + for dataframe in dataframes: + + m=markers[dataframe[1]%len(markers)] - m=markers[number%len(markers)] + pd.rolling_mean(dataframe[2].speed,smooth).plot(marker=m,markevery=10,markersize=10) + plt.legend([x[0] for x in dataframes]) + else: + df2=[] + for dataframe in dataframes: + if name in dataframe[0]: + df2.append(dataframe) + + + for dataframe in df2: + + m=markers[dataframe[1]%len(markers)] - df.ix[:,0].plot(marker=m,markevery=10,markersize=10) - plt.legend([x[0] for x in counters]) - if save: - plt.savefig(saveFile+'C.png') - - + dataframe[2].ix[:,0].plot(marker=m,markevery=10,markersize=10) + plt.legend([x[0] for x in df2]) + plt.figure(figsize=(12, 8), dpi=80) plt.title('dC/dT') - - for (name, number, df) in counters: + for dataframe in df2: - m=markers[number%len(markers)] + m=markers[dataframe[1]%len(markers)] - pd.rolling_mean(df.speed,smooth).plot(marker=m,markevery=10,markersize=10) - plt.legend([x[0] for x in counters]) - if save: - plt.savefig(saveFile+'D.png') - -def addSpeed(counters): - for (tname, number, df) in counters: - speed=[0] - values=list(df.ix[:,0]) - for i in range(1,len(values)): - speed.append(float(values[i]-values[i-1])/float(df.index[i]-df.index[i-1]+0.01)) - df['speed']=speed - return counters + pd.rolling_mean(dataframe[2].speed,smooth).plot(marker=m,markevery=10,markersize=10) + plt.legend([x[0] for x in df2]) + +def plotThroughput(csv_dir,taskname, smooth=5): + dataframes= readFiles(csv_dir) + + for dataframe in dataframes: + df=dataframe[2] + speed=[0] + values=list(df.ix[:,0]) + for i in range(1,len(values)): + speed.append(float(values[i]-values[i-1])/float(df.index[i]-df.index[i-1]+0.01)) + df['speed']=speed + + selected={} + + for df in dataframes: + if taskname in df[0]: + if df[1] in selected: + selected[df[1]].append(df[2]) + else: + selected[df[1]]=[df[2]] + plt.figure() + plt.title(taskname) + for i in selected: + selected[i]=reduce(operator.add,selected[i]) + m=markers[i%len(markers)] + selected[i].ix[:,0].plot(marker=m,markevery=10,markersize=10) + + + plt.legend(selected.keys()) + + plt.figure() + plt.title(taskname+" - dC/dT") + for i in selected: + m=markers[i%len(markers)] + pd.rolling_mean(selected[i].speed,smooth).plot(marker=m,markevery=10,markersize=10) -def plotThroughput(csv_dir,tasknames, smooth=5,savePath=''): - if type(tasknames)!=list: - tasknames=[tasknames] - for taskname in tasknames: - counters= readFiles(csv_dir) - addSpeed(counters) - selected={} - - for (tname, number, df) in counters: - if taskname in tname: - if number in selected: - selected[number].append(df) - else: - selected[number]=[df] - plt.figure() - plt.title(taskname) - for i in selected: - if len(selected[i])>1: - selected[i]=reduce(operator.add,selected[i]) - else: - selected[i]=selected[i][0] - m=markers[i%len(markers)] - selected[i].ix[:,0].plot(marker=m,markevery=10,markersize=10) - - - plt.legend(selected.keys()) - if savePath !='': - plt.savefig(savePath+'/'+taskname+'C.png') - plt.figure() - plt.title(taskname+" - dC/dT") - for i in selected: - m=markers[i%len(markers)] - pd.rolling_mean(selected[i].speed,smooth).plot(marker=m,markevery=10,markersize=10) - - plt.legend(selected.keys()) - if savePath !='': - plt.savefig(savePath+'/'+taskname+'D.png') + plt.legend(selected.keys()) def plotTimer(csv_dir,smooth=5,std=50): dataframes= readFiles(csv_dir)