From 0888bb622e275ac6ff2408c2ae5014fd787b5dbd Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Wed, 6 Dec 2017 16:02:26 +0100 Subject: [PATCH] [FLINK-8214][streaming-tests] Collect results into proper mock in StreamMockEnvironment --- ...rEventCollectingResultPartitionWriter.java | 103 +++++++++++++++++ .../runtime/tasks/StreamMockEnvironment.java | 108 ++---------------- 2 files changed, 111 insertions(+), 100 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java new file mode 100644 index 00000000000..c9ec6dfaa4a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api.writer; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.event.AbstractEvent; +import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer; +import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.plugable.DeserializationDelegate; +import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; + +import java.io.IOException; +import java.util.Collection; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * {@link ResultPartitionWriter} that collects records or events on the List. + */ +public class RecordOrEventCollectingResultPartitionWriter implements ResultPartitionWriter { + private final Collection output; + private final BufferProvider bufferProvider; + private final NonReusingDeserializationDelegate delegate; + private final RecordDeserializer> deserializer = new AdaptiveSpanningRecordDeserializer<>(); + + public RecordOrEventCollectingResultPartitionWriter( + Collection output, + BufferProvider bufferProvider, + TypeSerializer serializer) { + + this.output = checkNotNull(output); + this.bufferProvider = checkNotNull(bufferProvider); + this.delegate = new NonReusingDeserializationDelegate<>(checkNotNull(serializer)); + } + + @Override + public BufferProvider getBufferProvider() { + return bufferProvider; + } + + @Override + public ResultPartitionID getPartitionId() { + return new ResultPartitionID(); + } + + @Override + public int getNumberOfSubpartitions() { + return 1; + } + + @Override + public int getNumTargetKeyGroups() { + return 1; + } + + @Override + public void writeBuffer(Buffer buffer, int targetChannel) throws IOException { + checkState(targetChannel < getNumberOfSubpartitions()); + + if (buffer.isBuffer()) { + deserializer.setNextBuffer(buffer); + + while (deserializer.hasUnfinishedData()) { + RecordDeserializer.DeserializationResult result = + deserializer.getNextRecord(delegate); + + if (result.isFullRecord()) { + output.add(delegate.getInstance()); + } + + if (result == RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER + || result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) { + break; + } + } + } else { + // is event + AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader()); + output.add(event); + } + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java index ee7337ce08a..71371f0653a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -24,54 +24,39 @@ import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; -import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; -import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.io.network.TaskEventDispatcher; -import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer; -import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; -import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; +import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferProvider; -import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.runtime.plugable.DeserializationDelegate; -import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - +import java.util.Collection; import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.concurrent.Future; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; /** * Mock {@link Environment}. @@ -144,51 +129,12 @@ public class StreamMockEnvironment implements Environment { inputs.add(gate); } - public void addOutput(final Queue outputList, final TypeSerializer serializer) { + public void addOutput(final Collection outputList, final TypeSerializer serializer) { try { - // The record-oriented writers wrap the buffer writer. We mock it - // to collect the returned buffers and deserialize the content to - // the output list - BufferProvider mockBufferProvider = mock(BufferProvider.class); - when(mockBufferProvider.requestBufferBlocking()).thenAnswer(new Answer() { - - @Override - public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable { - return new Buffer( - MemorySegmentFactory.allocateUnpooledSegment(bufferSize), - mock(BufferRecycler.class)); - } - }); - - ResultPartitionWriter mockWriter = mock(ResultPartitionWriter.class); - when(mockWriter.getNumberOfSubpartitions()).thenReturn(1); - when(mockWriter.getBufferProvider()).thenReturn(mockBufferProvider); - - final RecordDeserializer> recordDeserializer = new AdaptiveSpanningRecordDeserializer>(); - final NonReusingDeserializationDelegate delegate = new NonReusingDeserializationDelegate(serializer); - - // Add records and events from the buffer to the output list - doAnswer(new Answer() { - - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - Buffer buffer = (Buffer) invocationOnMock.getArguments()[0]; - addBufferToOutputList(recordDeserializer, delegate, buffer, outputList); - return null; - } - }).when(mockWriter).writeBuffer(any(Buffer.class), anyInt()); - - doAnswer(new Answer() { - - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - Buffer buffer = (Buffer) invocationOnMock.getArguments()[0]; - addBufferToOutputList(recordDeserializer, delegate, buffer, outputList); - return null; - } - }).when(mockWriter).writeBufferToAllSubpartitions(any(Buffer.class)); - - outputs.add(mockWriter); + outputs.add(new RecordOrEventCollectingResultPartitionWriter( + outputList, + new TestPooledBufferProvider(Integer.MAX_VALUE), + serializer)); } catch (Throwable t) { t.printStackTrace(); @@ -196,44 +142,6 @@ public class StreamMockEnvironment implements Environment { } } - /** - * Adds the object behind the given buffer to the outputList. - * - * @param recordDeserializer de-serializer to use for the buffer - * @param delegate de-serialization delegate to use for non-event buffers - * @param buffer the buffer to add - * @param outputList the output list to add the object to - * @param type of the objects behind the non-event buffers - * - * @throws java.io.IOException - */ - private void addBufferToOutputList( - RecordDeserializer> recordDeserializer, - NonReusingDeserializationDelegate delegate, Buffer buffer, - final Queue outputList) throws java.io.IOException { - if (buffer.isBuffer()) { - recordDeserializer.setNextBuffer(buffer); - - while (recordDeserializer.hasUnfinishedData()) { - RecordDeserializer.DeserializationResult result = - recordDeserializer.getNextRecord(delegate); - - if (result.isFullRecord()) { - outputList.add(delegate.getInstance()); - } - - if (result == RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER - || result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) { - break; - } - } - } else { - // is event - AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader()); - outputList.add(event); - } - } - @Override public Configuration getTaskConfiguration() { return this.taskConfiguration; -- GitLab