提交 aff4455c 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] Test restructure + added tests for streamrecord and collectors

上级 646190d3
......@@ -126,11 +126,11 @@ public class JobGraphBuilder {
* Number of parallel instances created
*/
public void addSource(String componentName,
UserSourceInvokable<? extends Tuple> InvokableObject,
String operatorName, byte[] serializedFunction, int parallelism) {
UserSourceInvokable<? extends Tuple> InvokableObject, String operatorName,
byte[] serializedFunction, int parallelism) {
addComponent(componentName, StreamSource.class, InvokableObject,
operatorName, serializedFunction, parallelism);
addComponent(componentName, StreamSource.class, InvokableObject, operatorName,
serializedFunction, parallelism);
if (log.isDebugEnabled()) {
log.debug("SOURCE: " + componentName);
......@@ -147,11 +147,9 @@ public class JobGraphBuilder {
* @param parallelism
* Number of parallel instances created
*/
public void addIterationSource(String componentName, String iterationHead,
int parallelism) {
public void addIterationSource(String componentName, String iterationHead, int parallelism) {
addComponent(componentName, StreamIterationSource.class, null, null,
null, parallelism);
addComponent(componentName, StreamIterationSource.class, null, null, null, parallelism);
setBytesFrom(iterationHead, componentName);
......@@ -174,13 +172,12 @@ public class JobGraphBuilder {
* @param parallelism
* Number of parallel instances created
*/
public void addTask(
String componentName,
public void addTask(String componentName,
UserTaskInvokable<? extends Tuple, ? extends Tuple> TaskInvokableObject,
String operatorName, byte[] serializedFunction, int parallelism) {
addComponent(componentName, StreamTask.class, TaskInvokableObject,
operatorName, serializedFunction, parallelism);
addComponent(componentName, StreamTask.class, TaskInvokableObject, operatorName,
serializedFunction, parallelism);
if (log.isDebugEnabled()) {
log.debug("TASK: " + componentName);
......@@ -201,12 +198,11 @@ public class JobGraphBuilder {
* @param parallelism
* Number of parallel instances created
*/
public void addSink(String componentName,
UserSinkInvokable<? extends Tuple> InvokableObject,
public void addSink(String componentName, UserSinkInvokable<? extends Tuple> InvokableObject,
String operatorName, byte[] serializedFunction, int parallelism) {
addComponent(componentName, StreamSink.class, InvokableObject,
operatorName, serializedFunction, parallelism);
addComponent(componentName, StreamSink.class, InvokableObject, operatorName,
serializedFunction, parallelism);
if (log.isDebugEnabled()) {
log.debug("SINK: " + componentName);
......@@ -226,11 +222,10 @@ public class JobGraphBuilder {
* @param directName
* Id of the output direction
*/
public void addIterationSink(String componentName, String iterationTail,
int parallelism, String directName) {
public void addIterationSink(String componentName, String iterationTail, int parallelism,
String directName) {
addComponent(componentName, StreamIterationSink.class, null, null,
null, parallelism);
addComponent(componentName, StreamIterationSink.class, null, null, null, parallelism);
setBytesFrom(iterationTail, componentName);
......@@ -269,19 +264,16 @@ public class JobGraphBuilder {
operatorNames.put(componentName, operatorName);
serializedFunctions.put(componentName, serializedFunction);
edgeList.put(componentName, new ArrayList<String>());
connectionTypes
.put(componentName,
new ArrayList<Class<? extends ChannelSelector<StreamRecord<Tuple>>>>());
connectionTypes.put(componentName,
new ArrayList<Class<? extends ChannelSelector<StreamRecord<Tuple>>>>());
connectionParams.put(componentName, new ArrayList<Integer>());
}
private void createVertex(String componentName) {
// Get vertex attributes
Class<? extends AbstractInvokable> componentClass = componentClasses
.get(componentName);
StreamComponentInvokable invokableObject = invokableObjects
.get(componentName);
Class<? extends AbstractInvokable> componentClass = componentClasses.get(componentName);
StreamComponentInvokable invokableObject = invokableObjects.get(componentName);
String operatorName = operatorNames.get(componentName);
byte[] serializedFunction = serializedFunctions.get(componentName);
int parallelism = componentParallelism.get(componentName);
......@@ -303,8 +295,7 @@ public class JobGraphBuilder {
component.setInvokableClass(componentClass);
component.setNumberOfSubtasks(parallelism);
Configuration config = new TaskConfig(component.getConfiguration())
.getConfiguration();
Configuration config = new TaskConfig(component.getConfiguration()).getConfiguration();
// Set vertex config
if (invokableObject != null) {
......@@ -348,8 +339,7 @@ public class JobGraphBuilder {
* JobVertex configuration to which the serialized invokable will
* be added
*/
private void addSerializedObject(Serializable InvokableObject,
Configuration config) {
private void addSerializedObject(Serializable InvokableObject, Configuration config) {
ByteArrayOutputStream baos = null;
ObjectOutputStream oos = null;
......@@ -362,9 +352,10 @@ public class JobGraphBuilder {
config.setBytes("serializedudf", baos.toByteArray());
} catch (Exception e) {
e.printStackTrace();
System.out.println("Serialization error "
+ InvokableObject.getClass());
if (log.isErrorEnabled()) {
log.error("Serialization error " + InvokableObject.getClass() + " "
+ e.getMessage());
}
}
}
......@@ -390,8 +381,7 @@ public class JobGraphBuilder {
}
}
public void setEdge(String upStreamComponentName,
String downStreamComponentName,
public void setEdge(String upStreamComponentName, String downStreamComponentName,
Class<? extends ChannelSelector<StreamRecord<Tuple>>> partitionerClass,
int partitionerParam) {
edgeList.get(upStreamComponentName).add(downStreamComponentName);
......@@ -411,12 +401,9 @@ public class JobGraphBuilder {
* Name of the downstream component, that will receive the
* records
*/
public void broadcastConnect(String upStreamComponentName,
String downStreamComponentName) {
setEdge(upStreamComponentName, downStreamComponentName,
BroadcastPartitioner.class, 0);
log.info("Broadcastconnected: " + upStreamComponentName + " to "
+ downStreamComponentName);
public void broadcastConnect(String upStreamComponentName, String downStreamComponentName) {
setEdge(upStreamComponentName, downStreamComponentName, BroadcastPartitioner.class, 0);
log.info("Broadcastconnected: " + upStreamComponentName + " to " + downStreamComponentName);
}
/**
......@@ -434,11 +421,11 @@ public class JobGraphBuilder {
* @param keyPosition
* Position of key in the tuple
*/
public void fieldsConnect(String upStreamComponentName,
String downStreamComponentName, int keyPosition) {
public void fieldsConnect(String upStreamComponentName, String downStreamComponentName,
int keyPosition) {
setEdge(upStreamComponentName, downStreamComponentName,
FieldsPartitioner.class, keyPosition);
setEdge(upStreamComponentName, downStreamComponentName, FieldsPartitioner.class,
keyPosition);
}
......@@ -453,12 +440,9 @@ public class JobGraphBuilder {
* @param downStreamComponentName
* Name of the downstream component, that will receive the tuples
*/
public void globalConnect(String upStreamComponentName,
String downStreamComponentName) {
setEdge(upStreamComponentName, downStreamComponentName,
GlobalPartitioner.class, 0);
log.info("Globalconnected: " + upStreamComponentName + " to "
+ downStreamComponentName);
public void globalConnect(String upStreamComponentName, String downStreamComponentName) {
setEdge(upStreamComponentName, downStreamComponentName, GlobalPartitioner.class, 0);
log.info("Globalconnected: " + upStreamComponentName + " to " + downStreamComponentName);
}
......@@ -473,12 +457,9 @@ public class JobGraphBuilder {
* @param downStreamComponentName
* Name of the downstream component, that will receive the tuples
*/
public void shuffleConnect(String upStreamComponentName,
String downStreamComponentName) {
setEdge(upStreamComponentName, downStreamComponentName,
ShufflePartitioner.class, 0);
log.info("Shuffleconnected: " + upStreamComponentName + " to "
+ downStreamComponentName);
public void shuffleConnect(String upStreamComponentName, String downStreamComponentName) {
setEdge(upStreamComponentName, downStreamComponentName, ShufflePartitioner.class, 0);
log.info("Shuffleconnected: " + upStreamComponentName + " to " + downStreamComponentName);
}
/**
......@@ -494,33 +475,27 @@ public class JobGraphBuilder {
* @param partitionerParam
* Parameter of the partitioner
*/
private void connect(String upStreamComponentName,
String downStreamComponentName,
private void connect(String upStreamComponentName, String downStreamComponentName,
Class<? extends ChannelSelector<StreamRecord<Tuple>>> PartitionerClass,
int partitionerParam) {
AbstractJobVertex upStreamComponent = components
.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components
.get(downStreamComponentName);
AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
Configuration config = new TaskConfig(
upStreamComponent.getConfiguration()).getConfiguration();
Configuration config = new TaskConfig(upStreamComponent.getConfiguration())
.getConfiguration();
try {
upStreamComponent.connectTo(downStreamComponent,
ChannelType.NETWORK);
upStreamComponent.connectTo(downStreamComponent, ChannelType.NETWORK);
if (log.isDebugEnabled()) {
log.debug("CONNECTED: " + PartitionerClass.getSimpleName()
+ " - " + upStreamComponentName + " -> "
+ downStreamComponentName);
log.debug("CONNECTED: " + PartitionerClass.getSimpleName() + " - "
+ upStreamComponentName + " -> " + downStreamComponentName);
}
} catch (JobGraphDefinitionException e1) {
if (log.isErrorEnabled()) {
log.error("Cannot connect components by field: "
+ upStreamComponentName + " to "
log.error("Cannot connect components by field: " + upStreamComponentName + " to "
+ downStreamComponentName, e1);
}
}
......@@ -531,13 +506,11 @@ public class JobGraphBuilder {
config.setBoolean("isPartitionedOutput_" + outputIndex, true);
}
putOutputNameToConfig(upStreamComponentName, downStreamComponentName,
outputIndex, config);
putOutputNameToConfig(upStreamComponentName, downStreamComponentName, outputIndex, config);
config.setClass("partitionerClass_" + outputIndex, PartitionerClass);
config.setInteger("partitionerIntParam_" + outputIndex,
partitionerParam);
config.setInteger("partitionerIntParam_" + outputIndex, partitionerParam);
config.setInteger("numOfOutputs_" + outputIndex,
componentParallelism.get(downStreamComponentName));
......@@ -594,13 +567,11 @@ public class JobGraphBuilder {
*/
private void setAutomaticInstanceSharing() {
AbstractJobVertex maxParallelismVertex = components
.get(maxParallelismVertexName);
AbstractJobVertex maxParallelismVertex = components.get(maxParallelismVertexName);
for (String componentName : components.keySet()) {
if (componentName != maxParallelismVertexName) {
components.get(componentName).setVertexToShareInstancesWith(
maxParallelismVertex);
components.get(componentName).setVertexToShareInstancesWith(maxParallelismVertex);
}
}
......@@ -639,8 +610,7 @@ public class JobGraphBuilder {
for (String upStreamComponentName : edgeList.keySet()) {
int i = 0;
for (String downStreamComponentName : edgeList
.get(upStreamComponentName)) {
for (String downStreamComponentName : edgeList.get(upStreamComponentName)) {
connect(upStreamComponentName, downStreamComponentName,
connectionTypes.get(upStreamComponentName).get(i),
connectionParams.get(upStreamComponentName).get(i));
......
......@@ -21,6 +21,8 @@ package org.apache.flink.streaming.api.collector;
import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
......@@ -28,9 +30,10 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
public class DirectedStreamCollector<T extends Tuple> extends StreamCollector<T> {
OutputSelector<T> outputSelector;
private static final Log log = LogFactory.getLog(DirectedStreamCollector.class);
public DirectedStreamCollector(int channelID,
SerializationDelegate<T> serializationDelegate, OutputSelector<T> outputSelector) {
public DirectedStreamCollector(int channelID, SerializationDelegate<T> serializationDelegate,
OutputSelector<T> outputSelector) {
super(channelID, serializationDelegate);
this.outputSelector = outputSelector;
......@@ -49,11 +52,10 @@ public class DirectedStreamCollector<T extends Tuple> extends StreamCollector<T>
try {
outputMap.get(outputName).emit(streamRecord);
} catch (Exception e) {
e.printStackTrace();
System.out.println("emit fail");
if (log.isErrorEnabled()) {
log.error("Emit failed: " + outputName + " " + e.getMessage());
}
}
}
outputNames.clear();
}
}
......@@ -34,11 +34,8 @@ public abstract class OutputSelector<T extends Tuple> implements Serializable {
outputs = new ArrayList<String>();
}
void clearList() {
outputs.clear();
}
Collection<String> getOutputs(T tuple) {
outputs.clear();
select(tuple, outputs);
return outputs;
}
......
......@@ -24,6 +24,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
......@@ -32,6 +34,8 @@ import org.apache.flink.util.Collector;
public class StreamCollector<T extends Tuple> implements Collector<T> {
private static final Log log = LogFactory.getLog(StreamCollector.class);
protected StreamRecord<T> streamRecord;
protected int channelID;
private List<RecordWriter<StreamRecord<T>>> outputs;
......@@ -65,8 +69,9 @@ public class StreamCollector<T extends Tuple> implements Collector<T> {
try {
output.emit(streamRecord);
} catch (Exception e) {
e.printStackTrace();
System.out.println("emit fail");
if (log.isErrorEnabled()) {
log.error("Emit failed: " + output + " " + e.getMessage());
}
}
}
}
......
......@@ -61,26 +61,22 @@ public class TestDataUtil {
bufferedReader.close();
fileReader.close();
} catch (FileNotFoundException e1) {
if (log.isErrorEnabled()) {
log.error("File not found: " + file.getAbsolutePath() + " " + e1.getMessage());
}
// TODO Auto-generated catch block
e1.printStackTrace();
} catch (IOException e2) {
if (log.isErrorEnabled()) {
log.error("Cannot read file: " + file.getAbsolutePath() + " " + e2.getMessage());
}
// TODO Auto-generated catch block
e2.printStackTrace();
}
if (file.exists()) {
if (log.isInfoEnabled()) {
log.info(fileName + " already exists.");
}
try {
checkSumActaul = DigestUtils.md5Hex(FileUtils.readFileToByteArray(file));
} catch (IOException e) {
if (log.isErrorEnabled()) {
log.error("IOException: " + e.getMessage());
}
// TODO Auto-generated catch block
e.printStackTrace();
}
if (!checkSumActaul.equals(checkSumDesired)) {
if (log.isInfoEnabled()) {
......@@ -100,7 +96,7 @@ public class TestDataUtil {
}
public static void download(String fileName) {
System.out.println("downloading " + fileName);
log.info("downloading " + fileName);
try {
URL website = new URL(testRepoUrl + fileName);
......@@ -115,13 +111,9 @@ public class TestDataUtil {
}
bWriter.close();
} catch (MalformedURLException e1) {
if (log.isErrorEnabled()) {
log.error("MalformedURLException: " + e1.getMessage());
}
e1.printStackTrace();
} catch (IOException e) {
if (log.isErrorEnabled()) {
log.error("IOException: " + e.getMessage());
}
e.printStackTrace();
}
}
}
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.flink.streaming.api;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.flink.streaming.api.DataStream;
import org.apache.flink.streaming.api.LocalStreamEnvironment;
import org.apache.flink.streaming.api.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.SinkFunction;
import org.apache.flink.streaming.api.function.SourceFunction;
import org.apache.flink.streaming.util.LogUtils;
import org.junit.Test;
import org.apache.flink.api.java.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.util.Collector;
import org.apache.log4j.Level;
public class BatchTest {
private static final long MEMORYSIZE = 32;
private static final int SOURCE_PARALLELISM = 1;
private static final int SINK_PARALLELISM = 2;
private static int count = 0;
private static boolean partitionCorrect = true;
private static final class MySource extends SourceFunction<Tuple1<String>> {
private static final long serialVersionUID = 1L;
private Tuple1<String> outTuple = new Tuple1<String>();
@Override
public void invoke(Collector<Tuple1<String>> collector) throws Exception {
for (int i = 0; i < 20; i++) {
outTuple.f0 = "string #" + i;
collector.collect(outTuple);
}
}
}
private static final class MyMap extends FlatMapFunction<Tuple1<String>, Tuple1<String>> {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(Tuple1<String> value, Collector<Tuple1<String>> out) throws Exception {
out.collect(value);
}
}
private static final class MySink extends SinkFunction<Tuple1<String>> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Tuple1<String> tuple) {
count++;
}
}
private static final class MyPartitionSink extends SinkFunction<Tuple1<String>> {
int hash = -1000;
private static final long serialVersionUID = 1L;
@Override
public void invoke(Tuple1<String> tuple) {
if (hash == -1000)
hash = tuple.f0.hashCode() % SINK_PARALLELISM;
else {
if (hash != tuple.f0.hashCode() % SINK_PARALLELISM)
partitionCorrect = false;
}
}
}
@Test
public void test() throws Exception {
LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(SINK_PARALLELISM);
@SuppressWarnings("unused")
DataStream<Tuple1<String>> dataStream1 = env
.addSource(new MySource(), SOURCE_PARALLELISM)
.flatMap(new MyMap()).setParallelism(1)
.flatMap(new MyMap()).setParallelism(1)
.addSink(new MySink()).setParallelism(1);
// partitionTest
@SuppressWarnings("unused")
DataStream<Tuple1<String>> dataStream2 = env.addSource(new MySource(), SOURCE_PARALLELISM)
.flatMap(new MyMap()).setParallelism(1).partitionBy(0)
.addSink(new MyPartitionSink()).setParallelism(SINK_PARALLELISM);
env.executeTest(MEMORYSIZE);
assertEquals(20, count);
assertTrue(partitionCorrect);
}
}
......@@ -17,7 +17,7 @@
*
*/
package org.apache.flink.streaming.api;
package org.apache.flink.streaming.api.collector;
import static org.junit.Assert.assertEquals;
......@@ -27,10 +27,10 @@ import java.util.HashSet;
import org.apache.flink.api.java.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.DataStream;
import org.apache.flink.streaming.api.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.function.SinkFunction;
import org.apache.flink.streaming.util.LogUtils;
import org.apache.log4j.Level;
import org.junit.Test;
public class DirectedOutputTest {
......@@ -92,8 +92,6 @@ public class DirectedOutputTest {
@SuppressWarnings("unused")
@Test
public void directOutputTest() {
LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
DataStream<Tuple1<Long>> s = env.generateSequence(1, 6).directTo(new MySelector());
DataStream<Tuple1<Long>> ds1 = s.map(new PlusTwo()).name("ds1").addSink(new EvenSink());
......@@ -109,23 +107,4 @@ public class DirectedOutputTest {
assertEquals(expectedOdd, oddSet);
}
@SuppressWarnings("unused")
@Test
public void directOutputPartitionedTest() {
LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2);
DataStream<Tuple1<Long>> s = env.generateSequence(1, 6).directTo(new MySelector());
DataStream<Tuple1<Long>> ds1 = s.map(new PlusTwo()).name("ds1").partitionBy(0).addSink(new EvenSink());
DataStream<Tuple1<Long>> ds2 = s.map(new PlusTwo()).name("ds2").addSink(new OddSink());
DataStream<Tuple1<Long>> ds3 = s.map(new PlusTwo()).name("ds3").addSink(new OddSink());
env.execute();
HashSet<Long> expectedEven = new HashSet<Long>(Arrays.asList(4L, 6L, 8L));
HashSet<Long> expectedOdd = new HashSet<Long>(Arrays.asList(3L, 5L, 7L));
assertEquals(expectedEven, evenSet);
assertEquals(expectedOdd, oddSet);
}
}
......@@ -17,53 +17,40 @@
*
*/
package org.apache.flink.streaming.api;
package org.apache.flink.streaming.api.collector;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple1;
import org.junit.Test;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.types.TypeInformation;
public class OutputSelectorTest {
public class TypeExtractTest {
static final class MyOutputSelector extends OutputSelector<Tuple1<Integer>> {
public static class MySuperlass<T> implements Serializable {
private static final long serialVersionUID = 1L;
@Override
public void select(Tuple1<Integer> tuple, Collection<String> outputs) {
for (Integer i = 0; i < tuple.f0; i++) {
outputs.add(i.toString());
}
}
}
public static class Myclass extends MySuperlass<Integer> {
private static final long serialVersionUID = 1L;
}
@SuppressWarnings("unused")
@Test
public void test() throws IOException, ClassNotFoundException {
Myclass f = new Myclass();
TypeInformation<?> ts = TypeExtractor.createTypeInfo(MySuperlass.class, f.getClass(), 0,
null, null);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos;
oos = new ObjectOutputStream(baos);
oos.writeObject(f);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
assertTrue(true);
public void testGetOutputs() {
OutputSelector<Tuple1<Integer>> selector = new MyOutputSelector();
List<String> expectedOutputs = new ArrayList<String>();
expectedOutputs.add("0");
expectedOutputs.add("1");
assertEquals(expectedOutputs, selector.getOutputs(new Tuple1<Integer>(2)));
expectedOutputs.add("2");
assertEquals(expectedOutputs, selector.getOutputs(new Tuple1<Integer>(3)));
}
}
......@@ -17,7 +17,7 @@
*
*/
package org.apache.flink.streaming.api;
package org.apache.flink.streaming.api.invokable.operator;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
......
......@@ -17,7 +17,7 @@
*
*/
package org.apache.flink.streaming.api;
package org.apache.flink.streaming.api.invokable.operator;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
......
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.flink.streaming.api.streamrecord;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.runtime.io.network.serialization.DataInputDeserializer;
import org.apache.flink.runtime.io.network.serialization.DataOutputSerializer;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.junit.Test;
public class StreamRecordTest {
@Test
public void testReadWrite() throws IOException {
StreamRecord<Tuple2<Integer, String>> streamRecord = new StreamRecord<Tuple2<Integer, String>>();
Tuple2<Integer, String> tuple = new Tuple2<Integer, String>(2, "a");
streamRecord.setTuple(tuple).setId(1);
TupleSerializer<Tuple2<Integer, String>> ts = (TupleSerializer<Tuple2<Integer, String>>) TypeExtractor
.getForObject(tuple).createSerializer();
SerializationDelegate<Tuple2<Integer, String>> sd = new SerializationDelegate<Tuple2<Integer, String>>(
ts);
streamRecord.setSeralizationDelegate(sd);
DataOutputSerializer out = new DataOutputSerializer(64);
streamRecord.write(out);
ByteBuffer buff = out.wrapAsByteBuffer();
DataInputDeserializer in = new DataInputDeserializer(buff);
StreamRecord<Tuple2<Integer, String>> streamRecord2 = new StreamRecord<Tuple2<Integer, String>>();
streamRecord2.setDeseralizationDelegate(
new DeserializationDelegate<Tuple2<Integer, String>>(ts), ts);
streamRecord2.read(in);
assertEquals(streamRecord.getTuple(), streamRecord2.getTuple());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册