提交 d15a8f04 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] basic RabbitMQ tolopolgy add to test future support for RabbitMQ

上级 683277fc
......@@ -78,6 +78,11 @@
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.3.1</version>
</dependency>
</dependencies>
<build>
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api.streamcomponent;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.WindowState;
public class StreamWindowTask extends UserTaskInvokable {
private int computeGranularity;
private int windowFieldId = 1;
private StreamRecord tempRecord;
private WindowState<Integer> window;
private MutableTableState<String, Integer> sum;
private long initTimestamp = -1;
private long nextTimestamp = -1;
public StreamWindowTask(int windowSize, int slidingStep,
int computeGranularity, int windowFieldId) {
this.computeGranularity = computeGranularity;
this.windowFieldId = windowFieldId;
window = new WindowState<Integer>(windowSize, slidingStep,
computeGranularity);
sum = new MutableTableState<String, Integer>();
sum.put("sum", 0);
}
private void incrementCompute(StreamRecord record){}
private void decrementCompute(StreamRecord record){}
private void produceRecord(long progress){}
@Override
public void invoke(StreamRecord record) throws Exception {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
long progress = record.getLong(i, windowFieldId);
if (initTimestamp == -1) {
initTimestamp = progress;
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new StreamRecord(record.getNumOfFields());
} else {
if (progress > nextTimestamp) {
if (window.isFull()) {
StreamRecord expiredRecord = window.popFront();
incrementCompute(tempRecord);
decrementCompute(expiredRecord);
window.pushBack(tempRecord);
if (window.isEmittable()) {
produceRecord(progress);
}
} else {
incrementCompute(tempRecord);
window.pushBack(tempRecord);
if (window.isFull()) {
produceRecord(progress);
}
}
initTimestamp = nextTimestamp;
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new StreamRecord(record.getNumOfFields());
}
tempRecord.addTuple(record.getTuple(i));
}
}
}
}
......@@ -86,16 +86,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
public StreamRecord() {
}
public StreamRecord(StreamRecord record) {
this.numOfFields = record.getNumOfFields();
this.numOfTuples = 0;
tupleBatch = new ArrayList<Tuple>();
this.uid = new UID(Arrays.copyOf(record.getId().getId(), 20));
for (int i = 0; i < record.getNumOfTuples(); ++i) {
this.tupleBatch.add(copyTuple(record.getTuple(i)));
}
}
/**
* Creates empty StreamRecord with number of fields set
*
......@@ -105,8 +95,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
public StreamRecord(int numOfFields) {
this.numOfFields = numOfFields;
this.numOfTuples = 0;
this.batchSize = 1;
tupleBatch = new ArrayList<Tuple>(batchSize);
tupleBatch = new ArrayList<Tuple>();
}
/**
......@@ -124,6 +113,16 @@ public class StreamRecord implements IOReadableWritable, Serializable {
tupleBatch = new ArrayList<Tuple>(batchSize);
}
public StreamRecord(StreamRecord record) {
this.numOfFields = record.getNumOfFields();
this.numOfTuples = 0;
tupleBatch = new ArrayList<Tuple>();
this.uid = new UID(Arrays.copyOf(record.getId().getId(), 20));
for (int i = 0; i < record.getNumOfTuples(); ++i) {
this.tupleBatch.add(copyTuple(record.getTuple(i)));
}
}
/**
* Creates a new batch of records containing only the given Tuple as element
* and sets desired batch size.
......@@ -148,6 +147,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param tupleList
* Tuples to bes stored in the StreamRecord
*/
public StreamRecord(List<Tuple> tupleList) {
numOfFields = tupleList.get(0).getArity();
numOfTuples = tupleList.size();
......@@ -166,59 +166,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
this(tuple, 1);
}
/**
* Checks if the number of fields are equal to the batch field size then
* adds the Tuple to the end of the batch
*
* @param tuple
* Tuple to be added as the next record of the batch
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public void addTuple(Tuple tuple) throws TupleSizeMismatchException {
addTuple(numOfTuples, tuple);
}
/**
* Checks if the number of fields are equal to the batch field size then
* inserts the Tuple to the given position into the recordbatch
*
* @param index
* Position of the added tuple
* @param tuple
* Tuple to be added as the next record of the batch
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public void addTuple(int index, Tuple tuple) throws TupleSizeMismatchException {
if (tuple.getArity() == numOfFields) {
tupleBatch.add(index, tuple);
numOfTuples++;
} else {
throw new TupleSizeMismatchException();
}
}
/**
* Removes the tuple at the given position from the batch and returns it
*
* @param index
* Index of tuple to remove
* @return Removed tuple
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public Tuple removeTuple(int index) throws TupleSizeMismatchException {
if (index < numOfTuples) {
numOfTuples--;
return tupleBatch.remove(index);
} else {
throw new TupleSizeMismatchException();
}
}
public boolean isEmpty() {
return (this.numOfTuples == 0);
}
......@@ -994,6 +941,57 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
}
/**
* Checks if the number of fields are equal to the batch field size then
* adds the Tuple to the end of the batch
*
* @param tuple
* Tuple to be added as the next record of the batch
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public void addTuple(Tuple tuple) throws TupleSizeMismatchException {
addTuple(numOfTuples, tuple);
}
/**
* Checks if the number of fields are equal to the batch field size then
* inserts the Tuple to the given position into the recordbatch
*
* @param index
* Position of the added tuple
* @param tuple
* Tuple to be added as the next record of the batch
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public void addTuple(int index, Tuple tuple) throws TupleSizeMismatchException {
if (tuple.getArity() == numOfFields) {
tupleBatch.add(index, tuple);
numOfTuples++;
} else {
throw new TupleSizeMismatchException();
}
}
/**
* Removes the tuple at the given position from the batch and returns it
*
* @param index
* Index of tuple to remove
* @return Removed tuple
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public Tuple removeTuple(int index) throws TupleSizeMismatchException {
if (index < numOfTuples) {
numOfTuples--;
return tupleBatch.remove(index);
} else {
throw new TupleSizeMismatchException();
}
}
/**
* Creates a copy of the StreamRecord object by Serializing and
* deserializing it
......
......@@ -46,7 +46,7 @@ public class IncrementalOLS {
@Override
public void invoke() throws Exception {
record.initRecords();
for (int j = 0; j < 100; j++) {
while(true) {
// pull new record from data source
record.setTuple(getNewData());
emit(record);
......@@ -63,7 +63,7 @@ public class IncrementalOLS {
public static class TrainingDataSource extends UserSourceInvokable {
private final int BATCH_SIZE = 10;
private final int BATCH_SIZE = 1000;
StreamRecord record = new StreamRecord(2, BATCH_SIZE);
......@@ -74,7 +74,7 @@ public class IncrementalOLS {
record.initRecords();
for (int j = 0; j < 1000; j++) {
while(true) {
for (int i = 0; i < BATCH_SIZE; i++) {
record.setTuple(i, getTrainingData());
}
......@@ -109,8 +109,8 @@ public class IncrementalOLS {
for (int i = 0; i < numOfTuples; i++) {
Tuple t = record.getTuple(i);
Double[] x_i = t.getField(1);
y[i] = t.getField(0);
Double[] x_i = (Double[]) t.getField(1);
y[i] = (Double) t.getField(0);
for (int j = 0; j < numOfFeatures; j++) {
x[i][j] = x_i[j];
}
......@@ -165,7 +165,6 @@ public class IncrementalOLS {
@Override
public void invoke(StreamRecord record) throws Exception {
System.out.println(record.getTuple());
}
}
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.sum;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.WindowState;
public class WindowSumAggregate extends UserTaskInvokable {
private int windowSize = 100;
private int slidingStep = 20;
private int computeGranularity = 10;
private int windowFieldId = 1;
private StreamRecord tempRecord;
private WindowState<Integer> window;
private MutableTableState<String, Integer> sum;
private long initTimestamp = -1;
private long nextTimestamp = -1;
private StreamRecord outRecord = new StreamRecord(
new Tuple2<Integer, Long>());
public WindowSumAggregate() {
window = new WindowState<Integer>(windowSize, slidingStep,
computeGranularity);
sum = new MutableTableState<String, Integer>();
sum.put("sum", 0);
}
private void incrementCompute(StreamRecord record) {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
int number = record.getInteger(i, 0);
sum.put("sum", sum.get("sum") + number);
}
}
private void decrementCompute(StreamRecord record) {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
int number = record.getInteger(i, 0);
sum.put("sum", sum.get("sum") - number);
}
}
private void produceRecord(long progress){
outRecord.setInteger(0, sum.get("sum"));
outRecord.setLong(1, progress);
emit(outRecord);
}
@Override
public void invoke(StreamRecord record) throws Exception {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
long progress = record.getLong(i, windowFieldId);
if (initTimestamp == -1) {
initTimestamp = progress;
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new StreamRecord(record.getNumOfFields());
} else {
if (progress >= nextTimestamp) {
if (window.isFull()) {
StreamRecord expiredRecord = window.popFront();
incrementCompute(tempRecord);
decrementCompute(expiredRecord);
window.pushBack(tempRecord);
if (window.isEmittable()) {
produceRecord(progress);
}
} else {
incrementCompute(tempRecord);
window.pushBack(tempRecord);
if (window.isFull()) {
produceRecord(progress);
}
}
initTimestamp = nextTimestamp;
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new StreamRecord(record.getNumOfFields());
}
}
tempRecord.addTuple(record.getTuple(i));
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.sum;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.util.LogUtils;
//TODO: window operator remains unfinished.
public class WindowSumLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WindowSumSource", WindowSumSource.class);
graphBuilder.setTask("WindowSumMultiple", WindowSumMultiple.class, 1, 1);
graphBuilder.setTask("WindowSumAggregate", WindowSumAggregate.class, 1, 1);
graphBuilder.setSink("WindowSumSink", WindowSumSink.class);
graphBuilder.shuffleConnect("WindowSumSource", "WindowSumMultiple");
graphBuilder.shuffleConnect("WindowSumMultiple", "WindowSumAggregate");
graphBuilder.shuffleConnect("WindowSumAggregate", "WindowSumSink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.sum;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowSumMultiple extends UserTaskInvokable {
private StreamRecord outputRecord = new StreamRecord(new Tuple2<Integer, Long>());
@Override
public void invoke(StreamRecord record) throws Exception {
Integer number = record.getInteger(0);
Long timestamp = record.getLong(1);
outputRecord.setInteger(0, number+1);
outputRecord.setLong(1, timestamp);
emit(outputRecord);
}
}
\ No newline at end of file
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.sum;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowSumSink extends UserSinkInvokable {
private Integer sum = 0;
private long timestamp = 0;
@Override
public void invoke(StreamRecord record) throws Exception {
sum = record.getInteger(0);
timestamp = record.getLong(1);
System.out.println("============================================");
System.out.println(sum + " " + timestamp);
System.out.println("============================================");
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.sum;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowSumSource extends UserSourceInvokable {
private StreamRecord outRecord = new StreamRecord(
new Tuple2<Integer, Long>());
private Long timestamp = 0L;
@Override
public void invoke() throws Exception {
for (int i = 0; i < 1000; ++i) {
outRecord.setInteger(0, i);
outRecord.setLong(1, timestamp);
timestamp++;
emit(outRecord);
}
}
}
......@@ -25,32 +25,38 @@ import eu.stratosphere.streaming.state.WindowState;
public class WindowWordCountCounter extends UserTaskInvokable {
private int windowSize=10;
private int slidingStep=2;
private int computeGranularity=1;
private int windowFieldId=2;
private int windowSize;
private int slidingStep;
private int computeGranularity;
private int windowFieldId;
private StreamRecord tempRecord;
private WindowState<Integer> window;
private MutableTableState<String, Integer> wordCounts;
private long initTimestamp=-1;
private long nextTimestamp=-1;
private String word = "";
private Integer count = 0;
private Long timestamp = 0L;
private StreamRecord outRecord = new StreamRecord(3);
public WindowWordCountCounter() {
windowSize = 100;
slidingStep = 20;
computeGranularity = 10;
windowFieldId = 2;
window = new WindowState<Integer>(windowSize, slidingStep,
computeGranularity);
computeGranularity, windowFieldId);
wordCounts = new MutableTableState<String, Integer>();
}
private void incrementCompute(StreamRecord record) {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
String word = record.getString(i, 0);
word = record.getString(i, 0);
if (wordCounts.containsKey(word)) {
int count = wordCounts.get(word) + 1;
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
} else {
count = 1;
wordCounts.put(word, 1);
}
}
......@@ -59,8 +65,8 @@ public class WindowWordCountCounter extends UserTaskInvokable {
private void decrementCompute(StreamRecord record) {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
String word = record.getString(i, 0);
int count = wordCounts.get(word) - 1;
word = record.getString(i, 0);
count = wordCounts.get(word) - 1;
if (count == 0) {
wordCounts.delete(word);
} else {
......@@ -69,51 +75,39 @@ public class WindowWordCountCounter extends UserTaskInvokable {
}
}
private void produceRecord(long progress){
StreamRecord outRecord = new StreamRecord(3);
MutableTableStateIterator<String, Integer> iterator = wordCounts
.getIterator();
while (iterator.hasNext()) {
Tuple2<String, Integer> tuple = iterator.next();
Tuple3<String, Integer, Long> outputTuple = new Tuple3<String, Integer, Long>(
(String) tuple.getField(0), (Integer) tuple.getField(1), timestamp);
outRecord.addTuple(outputTuple);
}
emit(outRecord);
}
@Override
public void invoke(StreamRecord record) throws Exception {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
long progress = record.getLong(i, windowFieldId);
if (initTimestamp == -1) {
initTimestamp = progress;
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new StreamRecord(record.getNumOfFields());
} else {
if (progress >= nextTimestamp) {
if (window.isFull()) {
StreamRecord expiredRecord = window.popFront();
incrementCompute(tempRecord);
decrementCompute(expiredRecord);
window.pushBack(tempRecord);
if (window.isEmittable()) {
produceRecord(progress);
}
} else {
incrementCompute(tempRecord);
window.pushBack(tempRecord);
if (window.isFull()) {
produceRecord(progress);
}
}
initTimestamp = nextTimestamp;
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new StreamRecord(record.getNumOfFields());
if (window.isFull()) {
StreamRecord expiredRecord = window.popFront();
incrementCompute(record);
decrementCompute(expiredRecord);
window.pushBack(record);
if (window.isComputable()) {
MutableTableStateIterator<String, Integer> iterator = wordCounts
.getIterator();
while (iterator.hasNext()) {
Tuple2<String, Integer> tuple = iterator.next();
Tuple3<String, Integer, Long> outputTuple = new Tuple3<String, Integer, Long>(
(String) tuple.getField(0), (Integer) tuple.getField(1), timestamp);
outRecord.addTuple(outputTuple);
}
emit(outRecord);
}
} else {
incrementCompute(record);
window.pushBack(record);
if (window.isFull()) {
MutableTableStateIterator<String, Integer> iterator = wordCounts
.getIterator();
while (iterator.hasNext()) {
Tuple2<String, Integer> tuple = iterator.next();
Tuple3<String, Integer, Long> outputTuple = new Tuple3<String, Integer, Long>(
(String) tuple.getField(0), (Integer) tuple.getField(1), timestamp);
outRecord.addTuple(outputTuple);
}
emit(outRecord);
}
tempRecord.addTuple(record.getTuple(i));
}
}
}
......@@ -41,13 +41,14 @@ public class WindowWordCountSource extends UserSourceInvokable {
@Override
public void invoke() throws Exception {
while(true){
for(int i=0; i<10; ++i) {
line = br.readLine();
if(line==null){
break;
}
if (line != "") {
line=line.replaceAll("[\\-\\+\\.\\^:,]", "");
System.out.println("line="+line);
outRecord.setString(0, line);
outRecord.setLong(1, timestamp);
timestamp++;
......
......@@ -21,14 +21,16 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowWordCountSplitter extends UserTaskInvokable {
private String[] words = new String[] {};
private StreamRecord outputRecord = new StreamRecord(3);
private Long timestamp = 0L;
@Override
public void invoke(StreamRecord record) throws Exception {
StreamRecord outputRecord = new StreamRecord(3);
words = record.getString(0).split(" ");
timestamp = record.getLong(1);
System.out.println("sentence=" + record.getString(0) + ", timestamp="
+ record.getLong(1));
for (String word : words) {
Tuple3<String, Integer, Long> tuple =new Tuple3<String, Integer, Long>(word, 1, timestamp);
outputRecord.addTuple(tuple);
......
......@@ -20,20 +20,9 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WordCountSink extends UserSinkInvokable {
private String word = "";
private Integer count = 0;
private Long timestamp = 0L;
@Override
public void invoke(StreamRecord record) throws Exception {
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
word = record.getString(i, 0);
count = record.getInteger(i, 1);
timestamp = record.getLong(i, 2);
System.out.println("============================================");
System.out.println(word + " " + count + " " + timestamp);
System.out.println("============================================");
}
}
}
package eu.stratosphere.streaming.rabbitmq;
import java.net.InetSocketAddress;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class RMQTopology {
public static class RMQSource extends UserSourceInvokable {
StreamRecord record = new StreamRecord(new Tuple1<String>());
@Override
public void invoke() throws Exception {
String QUEUE_NAME = "hello";
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
if(message.equals("quit")){
break;
}
record.setString(0, message);
emit(record);
}
connection.close();
}
}
public static class Sink extends UserSinkInvokable {
@Override
public void invoke(StreamRecord record) throws Exception {
System.out.println(record);
}
}
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("RMQ");
graphBuilder.setSource("Source", RMQSource.class, 1, 1);
graphBuilder.setSink("Sink", Sink.class, 1, 1);
graphBuilder.shuffleConnect("Source", "Sink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} catch (Exception e) {
}
}
}
......@@ -15,9 +15,12 @@
package eu.stratosphere.streaming.state;
import java.util.HashMap;
import org.apache.commons.collections.buffer.CircularFifoBuffer;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.index.IndexPair;
/**
* The window state for window operator. To be general enough, this class
......@@ -26,24 +29,54 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
* stream into multiple mini batches.
*/
public class WindowState<K> {
private int windowSize;
private int slidingStep;
private int computeGranularity;
private int windowFieldId;
private int initTimestamp;
private int nextTimestamp;
private int currentRecordCount;
private int fullRecordCount;
private int slideRecordCount;
HashMap<K, IndexPair> windowIndex;
CircularFifoBuffer buffer;
StreamRecord tempRecord;
public WindowState(int windowSize, int slidingStep, int computeGranularity,
int windowFieldId) {
this.windowSize = windowSize;
this.slidingStep = slidingStep;
this.computeGranularity = computeGranularity;
this.windowFieldId = windowFieldId;
public WindowState(int windowSize, int slidingStep, int computeGranularity) {
this.initTimestamp = -1;
this.nextTimestamp = -1;
this.currentRecordCount = 0;
// here we assume that windowSize and slidingStep is divisible by
// computeGranularity.
this.fullRecordCount = windowSize / computeGranularity;
this.slideRecordCount = slidingStep / computeGranularity;
this.windowIndex = new HashMap<K, IndexPair>();
this.buffer = new CircularFifoBuffer(fullRecordCount);
}
public void pushBack(StreamRecord record) {
buffer.add(record);
currentRecordCount += 1;
if (initTimestamp == -1) {
initTimestamp = (Integer) record.getTuple(0).getField(windowFieldId);
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new StreamRecord(record.getNumOfFields());
}
for (int i = 0; i < record.getNumOfTuples(); ++i) {
while ((Integer) record.getTuple(i).getField(windowFieldId) > nextTimestamp) {
buffer.add(tempRecord);
currentRecordCount += 1;
tempRecord = new StreamRecord(record.getNumOfFields());
}
tempRecord.addTuple(record.getTuple(i));
}
}
public StreamRecord popFront() {
......@@ -56,7 +89,7 @@ public class WindowState<K> {
return currentRecordCount >= fullRecordCount;
}
public boolean isEmittable() {
public boolean isComputable() {
if (currentRecordCount == fullRecordCount + slideRecordCount) {
currentRecordCount -= slideRecordCount;
return true;
......
......@@ -83,7 +83,7 @@ public class InternalStateTest {
@Test
public void WindowStateTest(){
WindowState<String> state=new WindowState<String>(100, 20, 10);
WindowState<String> state=new WindowState<String>(100, 20, 10, 2);
}
}
......@@ -10,112 +10,107 @@ import pandas as pd
import os
import operator
linestyles = ['_', '-', '--', ':']
markers=['D','s', '|', '', 'x', '_', '^', ' ', 'd', 'h', '+', '*', ',', 'o', '.', '1', 'p', 'H', 'v', '>'];
colors = ['b', 'g', 'r', 'c', 'm', 'y', 'k']
def readFiles(csv_dir):
dataframes=[]
counters=[]
for fname in os.listdir(csv_dir):
if '.csv' in fname:
dataframes.append((fname.rstrip('.csv'),int(fname.rstrip('.csv').split('-')[-1])-1,pd.read_csv(os.path.join(csv_dir,fname),index_col='Time')))
return dataframes
counters.append((fname.rstrip('.csv'),int(fname.rstrip('.csv').split('-')[-1])-1,pd.read_csv(os.path.join(csv_dir,fname),index_col='Time')))
return counters
def plotCounter(csv_dir,name='', smooth=5):
dataframes= readFiles(csv_dir)
def plotCounter(csv_dir, sname='', smooth=5,savePath=''):
counters= readFiles(csv_dir)
addSpeed(counters)
for dataframe in dataframes:
df=dataframe[2]
speed=[0]
values=list(df.ix[:,0])
for i in range(1,len(values)):
speed.append(float(values[i]-values[i-1])/float(df.index[i]-df.index[i-1]+0.01))
df['speed']=speed
plt.figure(figsize=(12, 8), dpi=80)
plt.title('Counter')
if name=='':
for dataframe in dataframes:
m=markers[dataframe[1]%len(markers)]
dataframe[2].ix[:,0].plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in dataframes])
selectedCounters=[]
for (name, number, df) in counters:
if sname in name:
selectedCounters.append((name, number, df))
if sname=='':
sname='counters'
save=savePath!=''
plotDfs(selectedCounters,smooth,save,savePath+'/'+sname)
def plotDfs(counters,smooth,save,saveFile):
plt.figure(figsize=(12, 8), dpi=80)
plt.title('dC/dT')
for dataframe in dataframes:
m=markers[dataframe[1]%len(markers)]
plt.title('Counter')
for (name, number, df) in counters:
pd.rolling_mean(dataframe[2].speed,smooth).plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in dataframes])
else:
df2=[]
for dataframe in dataframes:
if name in dataframe[0]:
df2.append(dataframe)
for dataframe in df2:
m=markers[dataframe[1]%len(markers)]
m=markers[number%len(markers)]
dataframe[2].ix[:,0].plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in df2])
df.ix[:,0].plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in counters])
if save:
plt.savefig(saveFile+'C.png')
plt.figure(figsize=(12, 8), dpi=80)
plt.title('dC/dT')
for dataframe in df2:
for (name, number, df) in counters:
m=markers[dataframe[1]%len(markers)]
m=markers[number%len(markers)]
pd.rolling_mean(dataframe[2].speed,smooth).plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in df2])
def plotThroughput(csv_dir,taskname, smooth=5):
dataframes= readFiles(csv_dir)
for dataframe in dataframes:
df=dataframe[2]
speed=[0]
values=list(df.ix[:,0])
for i in range(1,len(values)):
speed.append(float(values[i]-values[i-1])/float(df.index[i]-df.index[i-1]+0.01))
df['speed']=speed
selected={}
for df in dataframes:
if taskname in df[0]:
if df[1] in selected:
selected[df[1]].append(df[2])
else:
selected[df[1]]=[df[2]]
plt.figure()
plt.title(taskname)
for i in selected:
selected[i]=reduce(operator.add,selected[i])
m=markers[i%len(markers)]
selected[i].ix[:,0].plot(marker=m,markevery=10,markersize=10)
plt.legend(selected.keys())
plt.figure()
plt.title(taskname+" - dC/dT")
for i in selected:
m=markers[i%len(markers)]
pd.rolling_mean(selected[i].speed,smooth).plot(marker=m,markevery=10,markersize=10)
pd.rolling_mean(df.speed,smooth).plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in counters])
if save:
plt.savefig(saveFile+'D.png')
def addSpeed(counters):
for (tname, number, df) in counters:
speed=[0]
values=list(df.ix[:,0])
for i in range(1,len(values)):
speed.append(float(values[i]-values[i-1])/float(df.index[i]-df.index[i-1]+0.01))
df['speed']=speed
return counters
plt.legend(selected.keys())
def plotThroughput(csv_dir,tasknames, smooth=5,savePath=''):
if type(tasknames)!=list:
tasknames=[tasknames]
for taskname in tasknames:
counters= readFiles(csv_dir)
addSpeed(counters)
selected={}
for (tname, number, df) in counters:
if taskname in tname:
if number in selected:
selected[number].append(df)
else:
selected[number]=[df]
plt.figure()
plt.title(taskname)
for i in selected:
if len(selected[i])>1:
selected[i]=reduce(operator.add,selected[i])
else:
selected[i]=selected[i][0]
m=markers[i%len(markers)]
selected[i].ix[:,0].plot(marker=m,markevery=10,markersize=10)
plt.legend(selected.keys())
if savePath !='':
plt.savefig(savePath+'/'+taskname+'C.png')
plt.figure()
plt.title(taskname+" - dC/dT")
for i in selected:
m=markers[i%len(markers)]
pd.rolling_mean(selected[i].speed,smooth).plot(marker=m,markevery=10,markersize=10)
plt.legend(selected.keys())
if savePath !='':
plt.savefig(savePath+'/'+taskname+'D.png')
def plotTimer(csv_dir,smooth=5,std=50):
dataframes= readFiles(csv_dir)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册