提交 6ec36031 编写于 作者: G gaborhermann 提交者: Stephan Ewen

[streaming] batch StreamRecord Refactored

上级 c147c355
......@@ -130,7 +130,7 @@ public class FaultTolerancyBuffer {
ackCounter.put(recordID, ackCount);
}
}
timeoutRecords(System.currentTimeMillis());
//timeoutRecords(System.currentTimeMillis());
}
public void failRecord(String recordID) {
......
......@@ -30,13 +30,13 @@ public class StreamRecord implements IOReadableWritable, Serializable {
this.numOfFields = length;
recordBatch = new ArrayList<Value[]>();
}
public StreamRecord(AtomRecord record){
Value[] fields=record.getFields();
public StreamRecord(AtomRecord record) {
Value[] fields = record.getFields();
numOfFields = fields.length;
recordBatch = new ArrayList<Value[]>();
recordBatch.add(fields);
numOfRecords=recordBatch.size();
numOfRecords = recordBatch.size();
}
public int getNumOfFields() {
......@@ -60,8 +60,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
public Value getField(int recordNumber, int fieldNumber) {
return recordBatch.get(recordNumber)[fieldNumber];
}
public AtomRecord getRecord(int recordNumber){
public AtomRecord getRecord(int recordNumber) {
return new AtomRecord(recordBatch.get(recordNumber));
}
......@@ -132,10 +132,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
}
public StreamRecord newInstance() {
return new StreamRecord(0);
}
// TODO: fix this method to work properly for non StringValue types
public String toString() {
StringBuilder outputString = new StringBuilder();
......@@ -146,9 +142,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
output = (StringValue) recordBatch.get(k)[i];
outputString.append(output.getValue() + "*");
} catch (ClassCastException e) {
outputString.append("PRINT_ERROR*");
outputString.append("NON-STRING*");
}
}
}
return outputString.toString();
......
......@@ -87,6 +87,7 @@ public class StreamTask extends AbstractTask {
streamTaskHelper.threadSafePublish(new AckEvent(id), input);
} catch (Exception e) {
streamTaskHelper.threadSafePublish(new FailEvent(id), input);
e.printStackTrace();
}
}
}
......
......@@ -24,8 +24,8 @@ public class MyBatchStream extends TestBase2{
@Override
public JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("StreamSource", StreamSource.class);
graphBuilder.setSink("StreamSink", StreamSink.class);
graphBuilder.setSource("StreamSource", MyBatchStreamSource.class);
graphBuilder.setSink("StreamSink", MyBatchStreamSink.class);
graphBuilder.broadcastConnect("StreamSource", "StreamSink");
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test.batch;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.StringValue;
public class StreamSink implements UserSinkInvokable {
private StringValue word = new StringValue("");
private IntValue count = new IntValue(1);
@Override
public void invoke(StreamRecord record) throws Exception {
word = (StringValue) record.getField(0, 0);
// count = (IntValue) record.getField(1);
System.out.println("========" + word.getValue() + "=========");
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test.batch;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.types.StringValue;
public class StreamSource extends UserSourceInvokable {
// private final String motto = "Stratosphere Big Data looks tiny from here";
private final String motto = "Gyuszi Gabor Big Marci Gyuszi";
private final AtomRecord record=new AtomRecord();
private final StreamRecord mottoRecord=new StreamRecord();
@Override
public void invoke() throws Exception {
record.setField(0, new StringValue(motto));
mottoRecord.addRecord(record);
for (int i = 0; i < 100; i++) {
emit(mottoRecord);
}
}
}
\ No newline at end of file
......@@ -15,10 +15,6 @@
package eu.stratosphere.streaming.test.batch.wordcount;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册