diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java index e3b75b67e8e59828aa10c7b361d7f851ddf98f63..5f0947dfd4fc6d0a69efc5657fa6aa75bb731b78 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java @@ -54,7 +54,7 @@ public class ConfigurationTest extends TestLogger { orig.setBytes("bytes sequence", new byte[] { 1, 2, 3, 4, 5 } ); orig.setClass("myclass", this.getClass()); - final Configuration copy = (Configuration) CommonTestUtils.createCopy(orig); + final Configuration copy = CommonTestUtils.createCopyWritable(orig); assertEquals("myvalue", copy.getString("mykey", "null")); assertEquals(100, copy.getInteger("mynumber", 0)); assertEquals(478236947162389746L, copy.getLong("longvalue", 0L)); diff --git a/flink-core/src/test/java/org/apache/flink/core/testutils/CommonTestUtils.java b/flink-core/src/test/java/org/apache/flink/core/testutils/CommonTestUtils.java index c876940204c0895a0eadf2c492dcc0ddd290ef69..5b7afaa1d7aaa3ab329a964d91ea4bfaa8099bc0 100644 --- a/flink-core/src/test/java/org/apache/flink/core/testutils/CommonTestUtils.java +++ b/flink-core/src/test/java/org/apache/flink/core/testutils/CommonTestUtils.java @@ -27,6 +27,8 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.GlobalConfiguration; @@ -98,7 +100,7 @@ public class CommonTestUtils { * thrown if an error occurs while creating the copy of the object */ @SuppressWarnings("unchecked") - public static T createCopy(final T original) throws IOException { + public static T createCopyWritable(final T original) throws IOException { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final DataOutputStream dos = new DataOutputStream(baos); @@ -142,4 +144,32 @@ public class CommonTestUtils { return copy; } + + public static T createCopySerializable(T original) throws IOException { + if (original == null) { + throw new IllegalArgumentException(); + } + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(original); + oos.close(); + baos.close(); + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + ObjectInputStream ois = new ObjectInputStream(bais); + + T copy; + try { + copy = (T) ois.readObject(); + } + catch (ClassNotFoundException e) { + throw new IOException(e); + } + + ois.close(); + bais.close(); + + return copy; + } } diff --git a/flink-core/src/test/java/org/apache/flink/util/AbstractIDTest.java b/flink-core/src/test/java/org/apache/flink/util/AbstractIDTest.java index 8134a687f8003d3cd984b2e8b627783b66d61256..ba7d60c612879095fdab35ba384d8e77e0f814e7 100644 --- a/flink-core/src/test/java/org/apache/flink/util/AbstractIDTest.java +++ b/flink-core/src/test/java/org/apache/flink/util/AbstractIDTest.java @@ -36,7 +36,7 @@ public class AbstractIDTest extends TestLogger { public void testSerialization() { final AbstractID origID = new AbstractID(); try { - final AbstractID copyID = CommonTestUtils.createCopy(origID); + final AbstractID copyID = CommonTestUtils.createCopyWritable(origID); assertEquals(origID.hashCode(), copyID.hashCode()); assertEquals(origID, copyID); @@ -83,16 +83,16 @@ public class AbstractIDTest extends TestLogger { AbstractID id10 = new AbstractID(Long.MIN_VALUE, Long.MAX_VALUE); // test self equality - assertEquals(0, id1.compareTo(CommonTestUtils.createCopy(id1))); - assertEquals(0, id2.compareTo(CommonTestUtils.createCopy(id2))); - assertEquals(0, id3.compareTo(CommonTestUtils.createCopy(id3))); - assertEquals(0, id4.compareTo(CommonTestUtils.createCopy(id4))); - assertEquals(0, id5.compareTo(CommonTestUtils.createCopy(id5))); - assertEquals(0, id6.compareTo(CommonTestUtils.createCopy(id6))); - assertEquals(0, id7.compareTo(CommonTestUtils.createCopy(id7))); - assertEquals(0, id8.compareTo(CommonTestUtils.createCopy(id8))); - assertEquals(0, id9.compareTo(CommonTestUtils.createCopy(id9))); - assertEquals(0, id10.compareTo(CommonTestUtils.createCopy(id10))); + assertEquals(0, id1.compareTo(CommonTestUtils.createCopyWritable(id1))); + assertEquals(0, id2.compareTo(CommonTestUtils.createCopyWritable(id2))); + assertEquals(0, id3.compareTo(CommonTestUtils.createCopyWritable(id3))); + assertEquals(0, id4.compareTo(CommonTestUtils.createCopyWritable(id4))); + assertEquals(0, id5.compareTo(CommonTestUtils.createCopyWritable(id5))); + assertEquals(0, id6.compareTo(CommonTestUtils.createCopyWritable(id6))); + assertEquals(0, id7.compareTo(CommonTestUtils.createCopyWritable(id7))); + assertEquals(0, id8.compareTo(CommonTestUtils.createCopyWritable(id8))); + assertEquals(0, id9.compareTo(CommonTestUtils.createCopyWritable(id9))); + assertEquals(0, id10.compareTo(CommonTestUtils.createCopyWritable(id10))); // test order assertCompare(id1, id2, -1); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedValueTest.java b/flink-core/src/test/java/org/apache/flink/util/SerializedValueTest.java similarity index 93% rename from flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedValueTest.java rename to flink-core/src/test/java/org/apache/flink/util/SerializedValueTest.java index 0d19613c3fa2dd2d862158c84440a22c470ef7b2..fda368a68884f202e6dfb1f87fdae79ca75493af 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedValueTest.java +++ b/flink-core/src/test/java/org/apache/flink/util/SerializedValueTest.java @@ -16,10 +16,9 @@ * limitations under the License. */ -package org.apache.flink.runtime.util; +package org.apache.flink.util; -import org.apache.flink.runtime.testutils.CommonTestUtils; -import org.apache.flink.util.SerializedValue; +import org.apache.flink.core.testutils.CommonTestUtils; import org.junit.Test; import static org.junit.Assert.assertEquals; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java index 03d370e90b04c60933b5bb69d9c75f4f7ae82226..6f873de423bf6f1c156a5f04b817a86d6c86b926 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java @@ -27,7 +27,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; -import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.util.StringUtils; import org.junit.Test; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java index a22ed13a0e540c532a55c634c4019ac7197f7968..b3bac5802b9a748a0d308ff2657085909f61b6d9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.client; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.util.SerializedValue; import org.junit.Test; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java index 50b154e727801cc1c5c7442b6972c8f758d54c13..3a36fe867f83673485030ab3f67c97f206a7cd85 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java @@ -32,7 +32,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.operators.RegularPactTask; -import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.core.testutils.CommonTestUtils; import org.junit.Test; public class TaskDeploymentDescriptorTest { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java index b508923cd36374b81c6072e7be5b2c718a9c39a0..d659b45d7aeaeef57a7ac49c8c67b1249edf6e16 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java @@ -26,7 +26,7 @@ import java.io.IOException; import java.util.Iterator; import org.apache.flink.runtime.event.AbstractEvent; -import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.core.testutils.CommonTestUtils; import org.junit.Test; /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java index 2769183fa6ad9b6b7efa25a65864bdfedd3499c1..bc8cd630461f9e2e02efe7afe329fd0fbec3104a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java @@ -25,7 +25,7 @@ import static org.junit.Assert.fail; import java.net.InetAddress; -import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.core.testutils.CommonTestUtils; import org.junit.Assert; import org.junit.Test; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java index 9f88bd54ca60050a250138b96ab150f7fdb38ac1..ca047e8a60301159b130c7237d2ae741994789bf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java @@ -23,7 +23,7 @@ import static org.junit.Assert.*; import java.util.List; import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.core.testutils.CommonTestUtils; import org.junit.Test; public class JobGraphTest { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java index 1e5b12af8a89e6d6e48fdadae92dc200b48dfaad..68575e5b09a25d984adcda7dbbae33e5d7e5fe75 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java @@ -26,7 +26,7 @@ import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint; import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.util.SerializedValue; import org.junit.Test; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java index b3f24566d2f99ddb8e7845536e4b4b2c23afd7fb..f6c119e5271d50b986df360d574ed27fa1ae5150 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java @@ -27,7 +27,7 @@ import java.io.PrintWriter; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.core.testutils.CommonTestUtils; import org.junit.Test; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java index ca05416d8aacbec3c8f5138e6fb3c10f44bf38db..61b1f7aeea793b338acb63113c46c4f15881b294 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java @@ -44,82 +44,6 @@ import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper; */ public class CommonTestUtils { - /** - * Creates a copy of the given {@link IOReadableWritable} object by an in-memory serialization and subsequent - * deserialization. - * - * @param original - * the original object to be copied - * @return the copy of original object created by the original object's serialization/deserialization methods - * @throws IOException - * thrown if an error occurs while creating the copy of the object - */ - @SuppressWarnings("unchecked") - public static T createCopyWritable(final T original) throws IOException { - - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final DataOutputStream dos = new DataOutputStream(baos); - - original.write(new OutputViewDataOutputStreamWrapper(dos)); - - final String className = original.getClass().getName(); - - Class clazz = null; - - try { - clazz = (Class) Class.forName(className); - } catch (ClassNotFoundException e) { - fail(e.getMessage()); - } - - T copy = null; - try { - copy = clazz.newInstance(); - } catch (Throwable t) { - t.printStackTrace(); - fail(t.getMessage()); - } - - final ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); - final DataInputStream dis = new DataInputStream(bais); - - copy.read(new InputViewDataInputStreamWrapper(dis)); - if (dis.available() > 0) { - throw new IOException("The coped result was not fully consumed."); - } - - return copy; - } - - @SuppressWarnings("unchecked") - public static T createCopySerializable(T original) throws IOException { - if (original == null) { - throw new IllegalArgumentException(); - } - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); - oos.writeObject(original); - oos.close(); - baos.close(); - - ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); - ObjectInputStream ois = new ObjectInputStream(bais); - - T copy; - try { - copy = (T) ois.readObject(); - } - catch (ClassNotFoundException e) { - throw new IOException(e); - } - - ois.close(); - bais.close(); - - return copy; - } - /** * Sleeps for a given set of milliseconds, uninterruptibly. If interrupt is called, * the sleep will continue nonetheless. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java index 3dca362217ca7d2a17f03f1776da351f32d05be3..50efd5247dbf5796fda69bf2cc6ab6073003890d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.util; import org.apache.flink.core.memory.MemoryUtils; -import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.InstantiationUtil; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java index f0fe63d91ae6045fe4bf2c75c5ef24b074b8507a..b53649aa3594577c6db832a66b22771ae96446f4 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java @@ -18,13 +18,12 @@ package org.apache.flink.streaming.api; import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; import java.util.Arrays; import java.util.List; import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.streaming.api.functions.source.FromElementsFunction; import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; import org.apache.flink.streaming.util.SourceFunctionUtil; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java index 9c3653bee13a486b0457ed3d8c35a1fadb643176..41bd381d7055b766ab47201290e03de23b7753c7 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java @@ -26,7 +26,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.ValueTypeInfo; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.streaming.api.functions.source.FromElementsFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.types.Value; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java index 8c847d3ba7be7ec93fc9b2b7ff20c44622605f9d..1c0f850b38cef9ac4a759c42b1b7831ef772772e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java @@ -21,7 +21,7 @@ package org.apache.flink.streaming.util; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; import org.junit.Test;