提交 6b0c0e4c 编写于 作者: M Maximilian Michels

[core][runtime] move SerializedValueTest from runtime to core

- move createCopySerializable to core's CommonTestUtils
- rename CommonTestUtils createCopy to createCopyWritable
- adapt the tests to use core's CommonTestUtils where applicable

This closes #1081.
上级 ac9a9117
......@@ -54,7 +54,7 @@ public class ConfigurationTest extends TestLogger {
orig.setBytes("bytes sequence", new byte[] { 1, 2, 3, 4, 5 } );
orig.setClass("myclass", this.getClass());
final Configuration copy = (Configuration) CommonTestUtils.createCopy(orig);
final Configuration copy = CommonTestUtils.createCopyWritable(orig);
assertEquals("myvalue", copy.getString("mykey", "null"));
assertEquals(100, copy.getInteger("mynumber", 0));
assertEquals(478236947162389746L, copy.getLong("longvalue", 0L));
......
......@@ -27,6 +27,8 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
......@@ -98,7 +100,7 @@ public class CommonTestUtils {
* thrown if an error occurs while creating the copy of the object
*/
@SuppressWarnings("unchecked")
public static <T extends IOReadableWritable> T createCopy(final T original) throws IOException {
public static <T extends IOReadableWritable> T createCopyWritable(final T original) throws IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream dos = new DataOutputStream(baos);
......@@ -142,4 +144,32 @@ public class CommonTestUtils {
return copy;
}
public static <T extends java.io.Serializable> T createCopySerializable(T original) throws IOException {
if (original == null) {
throw new IllegalArgumentException();
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(original);
oos.close();
baos.close();
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
ObjectInputStream ois = new ObjectInputStream(bais);
T copy;
try {
copy = (T) ois.readObject();
}
catch (ClassNotFoundException e) {
throw new IOException(e);
}
ois.close();
bais.close();
return copy;
}
}
......@@ -36,7 +36,7 @@ public class AbstractIDTest extends TestLogger {
public void testSerialization() {
final AbstractID origID = new AbstractID();
try {
final AbstractID copyID = CommonTestUtils.createCopy(origID);
final AbstractID copyID = CommonTestUtils.createCopyWritable(origID);
assertEquals(origID.hashCode(), copyID.hashCode());
assertEquals(origID, copyID);
......@@ -83,16 +83,16 @@ public class AbstractIDTest extends TestLogger {
AbstractID id10 = new AbstractID(Long.MIN_VALUE, Long.MAX_VALUE);
// test self equality
assertEquals(0, id1.compareTo(CommonTestUtils.createCopy(id1)));
assertEquals(0, id2.compareTo(CommonTestUtils.createCopy(id2)));
assertEquals(0, id3.compareTo(CommonTestUtils.createCopy(id3)));
assertEquals(0, id4.compareTo(CommonTestUtils.createCopy(id4)));
assertEquals(0, id5.compareTo(CommonTestUtils.createCopy(id5)));
assertEquals(0, id6.compareTo(CommonTestUtils.createCopy(id6)));
assertEquals(0, id7.compareTo(CommonTestUtils.createCopy(id7)));
assertEquals(0, id8.compareTo(CommonTestUtils.createCopy(id8)));
assertEquals(0, id9.compareTo(CommonTestUtils.createCopy(id9)));
assertEquals(0, id10.compareTo(CommonTestUtils.createCopy(id10)));
assertEquals(0, id1.compareTo(CommonTestUtils.createCopyWritable(id1)));
assertEquals(0, id2.compareTo(CommonTestUtils.createCopyWritable(id2)));
assertEquals(0, id3.compareTo(CommonTestUtils.createCopyWritable(id3)));
assertEquals(0, id4.compareTo(CommonTestUtils.createCopyWritable(id4)));
assertEquals(0, id5.compareTo(CommonTestUtils.createCopyWritable(id5)));
assertEquals(0, id6.compareTo(CommonTestUtils.createCopyWritable(id6)));
assertEquals(0, id7.compareTo(CommonTestUtils.createCopyWritable(id7)));
assertEquals(0, id8.compareTo(CommonTestUtils.createCopyWritable(id8)));
assertEquals(0, id9.compareTo(CommonTestUtils.createCopyWritable(id9)));
assertEquals(0, id10.compareTo(CommonTestUtils.createCopyWritable(id10)));
// test order
assertCompare(id1, id2, -1);
......
......@@ -16,10 +16,9 @@
* limitations under the License.
*/
package org.apache.flink.runtime.util;
package org.apache.flink.util;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
......
......@@ -27,7 +27,7 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.util.StringUtils;
import org.junit.Test;
......
......@@ -20,7 +20,7 @@ package org.apache.flink.runtime.client;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.util.SerializedValue;
import org.junit.Test;
......
......@@ -32,7 +32,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.RegularPactTask;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.junit.Test;
public class TaskDeploymentDescriptorTest {
......
......@@ -26,7 +26,7 @@ import java.io.IOException;
import java.util.Iterator;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.junit.Test;
/**
......
......@@ -25,7 +25,7 @@ import static org.junit.Assert.fail;
import java.net.InetAddress;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.junit.Assert;
import org.junit.Test;
......
......@@ -23,7 +23,7 @@ import static org.junit.Assert.*;
import java.util.List;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.junit.Test;
public class JobGraphTest {
......
......@@ -26,7 +26,7 @@ import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.util.SerializedValue;
import org.junit.Test;
......
......@@ -27,7 +27,7 @@ import java.io.PrintWriter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.junit.Test;
......
......@@ -44,82 +44,6 @@ import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
*/
public class CommonTestUtils {
/**
* Creates a copy of the given {@link IOReadableWritable} object by an in-memory serialization and subsequent
* deserialization.
*
* @param original
* the original object to be copied
* @return the copy of original object created by the original object's serialization/deserialization methods
* @throws IOException
* thrown if an error occurs while creating the copy of the object
*/
@SuppressWarnings("unchecked")
public static <T extends IOReadableWritable> T createCopyWritable(final T original) throws IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream dos = new DataOutputStream(baos);
original.write(new OutputViewDataOutputStreamWrapper(dos));
final String className = original.getClass().getName();
Class<T> clazz = null;
try {
clazz = (Class<T>) Class.forName(className);
} catch (ClassNotFoundException e) {
fail(e.getMessage());
}
T copy = null;
try {
copy = clazz.newInstance();
} catch (Throwable t) {
t.printStackTrace();
fail(t.getMessage());
}
final ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
final DataInputStream dis = new DataInputStream(bais);
copy.read(new InputViewDataInputStreamWrapper(dis));
if (dis.available() > 0) {
throw new IOException("The coped result was not fully consumed.");
}
return copy;
}
@SuppressWarnings("unchecked")
public static <T extends java.io.Serializable> T createCopySerializable(T original) throws IOException {
if (original == null) {
throw new IllegalArgumentException();
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(original);
oos.close();
baos.close();
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
ObjectInputStream ois = new ObjectInputStream(bais);
T copy;
try {
copy = (T) ois.readObject();
}
catch (ClassNotFoundException e) {
throw new IOException(e);
}
ois.close();
bais.close();
return copy;
}
/**
* Sleeps for a given set of milliseconds, uninterruptibly. If interrupt is called,
* the sleep will continue nonetheless.
......
......@@ -19,7 +19,7 @@
package org.apache.flink.runtime.util;
import org.apache.flink.core.memory.MemoryUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.InstantiationUtil;
......
......@@ -18,13 +18,12 @@
package org.apache.flink.streaming.api;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
import org.apache.flink.streaming.util.SourceFunctionUtil;
......
......@@ -26,7 +26,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.types.Value;
......
......@@ -21,7 +21,7 @@ package org.apache.flink.streaming.util;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
import org.junit.Test;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册