From c816191113d813156467f3e33856636ef0bcce38 Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Thu, 7 Dec 2017 10:03:32 +0100 Subject: [PATCH] [FLINK-8220][network-benchmarks] Define network benchmarks in Flink project --- .../benchmark/LongRecordWriterThread.java | 94 ++++++ .../NetworkBenchmarkEnvironment.java | 278 ++++++++++++++++++ .../benchmark/NetworkThroughputBenchmark.java | 90 ++++++ .../NetworkThroughputBenchmarkTests.java | 74 +++++ .../io/network/benchmark/ReceiverThread.java | 98 ++++++ .../benchmark/SerializingLongReceiver.java | 57 ++++ 6 files changed, 691 insertions(+) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java new file mode 100644 index 00000000000..6018e5528a1 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/LongRecordWriterThread.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.benchmark; + +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.types.LongValue; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Wrapping thread around {@link RecordWriter} that sends a fixed number of LongValue(0) + * records. + */ +public class LongRecordWriterThread extends CheckedThread { + private final RecordWriter recordWriter; + + /** + * Future to wait on a definition of the number of records to send. + */ + private CompletableFuture recordsToSend = new CompletableFuture<>(); + + private volatile boolean running = true; + + public LongRecordWriterThread(RecordWriter recordWriter) { + this.recordWriter = checkNotNull(recordWriter); + } + + public void shutdown() { + running = false; + recordsToSend.complete(0L); + } + + /** + * Initializes the record writer thread with this many numbers to send. + * + *

If the thread was already started, if may now continue. + * + * @param records + * number of records to send + */ + public synchronized void setRecordsToSend(long records) { + checkState(!recordsToSend.isDone()); + recordsToSend.complete(records); + } + + private synchronized CompletableFuture getRecordsToSend() { + return recordsToSend; + } + + private synchronized void finishSendingRecords() { + recordsToSend = new CompletableFuture<>(); + } + + @Override + public void go() throws Exception { + while (running) { + sendRecords(getRecordsToSend().get()); + } + } + + private void sendRecords(long records) throws IOException, InterruptedException { + LongValue value = new LongValue(0); + + for (int i = 1; i < records; i++) { + recordWriter.emit(value); + } + value.setValue(records); + recordWriter.broadcastEmit(value); + recordWriter.flush(); + + finishSendingRecords(); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java new file mode 100644 index 00000000000..ff06187fc7b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkBenchmarkEnvironment.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.benchmark; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionLocation; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.io.network.ConnectionID; +import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.io.network.TaskEventDispatcher; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.netty.NettyConfig; +import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; +import org.apache.flink.runtime.io.network.partition.ResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.taskmanager.TaskActions; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Arrays; + +import static org.apache.flink.util.ExceptionUtils.suppressExceptions; + +/** + * Context for network benchmarks executed by the external + * flink-benchmarks project. + */ +public class NetworkBenchmarkEnvironment { + + private static final int BUFFER_SIZE = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(); + + private static final int NUM_SLOTS_AND_THREADS = 1; + + private static final InetAddress LOCAL_ADDRESS; + + static { + try { + LOCAL_ADDRESS = InetAddress.getLocalHost(); + } catch (UnknownHostException e) { + throw new Error(e); + } + } + + protected final JobID jobId = new JobID(); + protected final IntermediateDataSetID dataSetID = new IntermediateDataSetID(); + protected final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID(); + + protected NetworkEnvironment senderEnv; + protected NetworkEnvironment receiverEnv; + protected IOManager ioManager; + + protected int channels; + + protected ResultPartitionID[] partitionIds; + + public void setUp(int writers, int channels) throws Exception { + this.channels = channels; + this.partitionIds = new ResultPartitionID[writers]; + + int bufferPoolSize = Math.max(2048, writers * channels * 4); + senderEnv = createNettyNetworkEnvironment(bufferPoolSize); + receiverEnv = createNettyNetworkEnvironment(bufferPoolSize); + ioManager = new IOManagerAsync(); + + senderEnv.start(); + receiverEnv.start(); + + generatePartitionIds(); + } + + public void tearDown() { + suppressExceptions(senderEnv::shutdown); + suppressExceptions(receiverEnv::shutdown); + suppressExceptions(ioManager::shutdown); + } + + public SerializingLongReceiver createReceiver() throws Exception { + TaskManagerLocation senderLocation = new TaskManagerLocation( + ResourceID.generate(), + LOCAL_ADDRESS, + senderEnv.getConnectionManager().getDataPort()); + + InputGate receiverGate = createInputGate( + jobId, + dataSetID, + executionAttemptID, + senderLocation, + receiverEnv, + channels); + + SerializingLongReceiver receiver = new SerializingLongReceiver(receiverGate, channels * partitionIds.length); + + receiver.start(); + return receiver; + } + + public RecordWriter createRecordWriter(int partitionIndex) throws Exception { + ResultPartitionWriter sender = createResultPartition(jobId, partitionIds[partitionIndex], senderEnv, channels); + return new RecordWriter<>(sender); + } + + private void generatePartitionIds() throws Exception { + for (int writer = 0; writer < partitionIds.length; writer++) { + partitionIds[writer] = new ResultPartitionID(); + } + } + + private NetworkEnvironment createNettyNetworkEnvironment( + @SuppressWarnings("SameParameterValue") int bufferPoolSize) throws Exception { + + final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, BUFFER_SIZE); + + final NettyConnectionManager nettyConnectionManager = new NettyConnectionManager( + new NettyConfig(LOCAL_ADDRESS, 0, BUFFER_SIZE, NUM_SLOTS_AND_THREADS, new Configuration())); + + return new NetworkEnvironment( + bufferPool, + nettyConnectionManager, + new ResultPartitionManager(), + new TaskEventDispatcher(), + new KvStateRegistry(), + null, + null, + IOMode.SYNC, + TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(), + TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(), + TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(), + TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue()); + } + + protected ResultPartitionWriter createResultPartition( + JobID jobId, + ResultPartitionID partitionId, + NetworkEnvironment environment, + int channels) throws Exception { + + ResultPartition resultPartition = new ResultPartition( + "sender task", + new NoOpTaskActions(), + jobId, + partitionId, + ResultPartitionType.PIPELINED_BOUNDED, + channels, + 1, + environment.getResultPartitionManager(), + new NoOpResultPartitionConsumableNotifier(), + ioManager, + false); + + // similar to NetworkEnvironment#registerTask() + int numBuffers = resultPartition.getNumberOfSubpartitions() * + TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() + + TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue(); + + BufferPool bufferPool = environment.getNetworkBufferPool().createBufferPool(channels, numBuffers); + resultPartition.registerBufferPool(bufferPool); + + environment.getResultPartitionManager().registerResultPartition(resultPartition); + + return resultPartition; + } + + private InputGate createInputGate( + JobID jobId, + IntermediateDataSetID dataSetID, + ExecutionAttemptID executionAttemptID, + final TaskManagerLocation senderLocation, + NetworkEnvironment environment, + final int channels) throws IOException { + + InputGate[] gates = new InputGate[channels]; + for (int channel = 0; channel < channels; ++channel) { + int finalChannel = channel; + InputChannelDeploymentDescriptor[] channelDescriptors = Arrays.stream(partitionIds) + .map(partitionId -> new InputChannelDeploymentDescriptor( + partitionId, + ResultPartitionLocation.createRemote(new ConnectionID(senderLocation, finalChannel)))) + .toArray(InputChannelDeploymentDescriptor[]::new); + + final InputGateDeploymentDescriptor gateDescriptor = new InputGateDeploymentDescriptor( + dataSetID, + ResultPartitionType.PIPELINED_BOUNDED, + channel, + channelDescriptors); + + SingleInputGate gate = SingleInputGate.create( + "receiving task[" + channel + "]", + jobId, + executionAttemptID, + gateDescriptor, + environment, + new NoOpTaskActions(), + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()); + + // similar to NetworkEnvironment#registerTask() + int numBuffers = gate.getNumberOfInputChannels() * + TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() + + TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue(); + + BufferPool bufferPool = + environment.getNetworkBufferPool().createBufferPool(gate.getNumberOfInputChannels(), numBuffers); + + gate.setBufferPool(bufferPool); + gates[channel] = gate; + } + + if (channels > 1) { + return new UnionInputGate(gates); + } else { + return gates[0]; + } + } + + // ------------------------------------------------------------------------ + // Mocks + // ------------------------------------------------------------------------ + + /** + * A dummy implementation of the {@link TaskActions}. We implement this here rather than using Mockito + * to avoid using mockito in this benchmark class. + */ + private static final class NoOpTaskActions implements TaskActions { + + @Override + public void triggerPartitionProducerStateCheck( + JobID jobId, + IntermediateDataSetID intermediateDataSetId, + ResultPartitionID resultPartitionId) {} + + @Override + public void failExternally(Throwable cause) {} + } + + private static final class NoOpResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier { + + @Override + public void notifyPartitionConsumable(JobID j, ResultPartitionID p, TaskActions t) {} + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java new file mode 100644 index 00000000000..799b7c3093c --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmark.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.benchmark; + +import org.apache.flink.types.LongValue; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +/** + * Network throughput benchmarks executed by the external + * flink-benchmarks project. + */ +public class NetworkThroughputBenchmark { + private static final long RECEIVER_TIMEOUT = 30_000; + + private NetworkBenchmarkEnvironment environment; + private ReceiverThread receiver; + private LongRecordWriterThread[] writerThreads; + + /** + * Executes the throughput benchmark with the given number of records. + * + * @param records + * records to pass through the network stack + */ + public void executeBenchmark(long records) throws Exception { + final LongValue value = new LongValue(); + value.setValue(0); + + long lastRecord = records / writerThreads.length; + CompletableFuture recordsReceived = receiver.setExpectedRecord(lastRecord); + + for (LongRecordWriterThread writerThread : writerThreads) { + writerThread.setRecordsToSend(lastRecord); + } + + recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS); + } + + /** + * Initializes the throughput benchmark with the given parameters. + * + * @param recordWriters + * number of senders, i.e. + * {@link org.apache.flink.runtime.io.network.api.writer.RecordWriter} instances + * @param channels + * number of outgoing channels / receivers + */ + public void setUp(int recordWriters, int channels) throws Exception { + environment = new NetworkBenchmarkEnvironment<>(); + environment.setUp(recordWriters, channels); + receiver = environment.createReceiver(); + writerThreads = new LongRecordWriterThread[recordWriters]; + for (int writer = 0; writer < recordWriters; writer++) { + writerThreads[writer] = new LongRecordWriterThread(environment.createRecordWriter(writer)); + writerThreads[writer].start(); + } + } + + /** + * Shuts down a benchmark previously set up via {@link #setUp}. + * + *

This will wait for all senders to finish but timeout with an exception after 5 seconds. + */ + public void tearDown() throws Exception { + for (LongRecordWriterThread writerThread : writerThreads) { + writerThread.shutdown(); + writerThread.sync(5000); + } + environment.tearDown(); + receiver.shutdown(); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java new file mode 100644 index 00000000000..c84743be843 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/NetworkThroughputBenchmarkTests.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.benchmark; + +import org.junit.Test; + +/** + * Tests for various network benchmarks based on {@link NetworkThroughputBenchmark}. + */ +public class NetworkThroughputBenchmarkTests { + @Test + public void pointToPointBenchmark() throws Exception { + NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark(); + benchmark.setUp(1, 1); + try { + benchmark.executeBenchmark(1_000); + } + finally { + benchmark.tearDown(); + } + } + + @Test + public void pointToMultiPointBenchmark() throws Exception { + NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark(); + benchmark.setUp(1, 100); + try { + benchmark.executeBenchmark(1_000); + } + finally { + benchmark.tearDown(); + } + } + + @Test + public void multiPointToPointBenchmark() throws Exception { + NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark(); + benchmark.setUp(4, 1); + try { + benchmark.executeBenchmark(1_000); + } + finally { + benchmark.tearDown(); + } + } + + @Test + public void multiPointToMultiPointBenchmark() throws Exception { + NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark(); + benchmark.setUp(4, 100); + try { + benchmark.executeBenchmark(1_000); + } + finally { + benchmark.tearDown(); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java new file mode 100644 index 00000000000..be1c80f36ce --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/ReceiverThread.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.benchmark; + +import org.apache.flink.core.testutils.CheckedThread; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This class waits for {@code expectedRepetitionsOfExpectedRecord} number of occurrences of the + * {@code expectedRecord}. {@code expectedRepetitionsOfExpectedRecord} is correlated with number of input channels. + */ +public abstract class ReceiverThread extends CheckedThread { + protected static final Logger LOG = LoggerFactory.getLogger(ReceiverThread.class); + + protected final int expectedRepetitionsOfExpectedRecord; + + protected int expectedRecordCounter; + protected CompletableFuture expectedRecord = new CompletableFuture<>(); + protected CompletableFuture recordsProcessed = new CompletableFuture<>(); + + protected volatile boolean running; + + ReceiverThread(int expectedRepetitionsOfExpectedRecord) { + setName(this.getClass().getName()); + + this.expectedRepetitionsOfExpectedRecord = expectedRepetitionsOfExpectedRecord; + this.running = true; + } + + public synchronized CompletableFuture setExpectedRecord(long record) { + checkState(!expectedRecord.isDone()); + checkState(!recordsProcessed.isDone()); + expectedRecord.complete(record); + expectedRecordCounter = 0; + return recordsProcessed; + } + + private synchronized CompletableFuture getExpectedRecord() { + return expectedRecord; + } + + private synchronized void finishProcessingExpectedRecords() { + checkState(expectedRecord.isDone()); + checkState(!recordsProcessed.isDone()); + + recordsProcessed.complete(null); + expectedRecord = new CompletableFuture<>(); + recordsProcessed = new CompletableFuture<>(); + } + + @Override + public void go() throws Exception { + try { + while (running) { + readRecords(getExpectedRecord().get()); + finishProcessingExpectedRecords(); + } + } + catch (InterruptedException e) { + if (running) { + throw e; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + protected abstract void readRecords(long lastExpectedRecord) throws Exception; + + public void shutdown() { + running = false; + interrupt(); + expectedRecord.complete(0L); + } + +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java new file mode 100644 index 00000000000..848c018a5b7 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.benchmark; + +import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.types.LongValue; + +/** + * {@link ReceiverThread} that deserialize incoming messages. + */ +public class SerializingLongReceiver extends ReceiverThread { + + private final MutableRecordReader reader; + + @SuppressWarnings("WeakerAccess") + public SerializingLongReceiver(InputGate inputGate, int expectedRepetitionsOfExpectedRecord) { + super(expectedRepetitionsOfExpectedRecord); + this.reader = new MutableRecordReader<>( + inputGate, + new String[]{ + EnvironmentInformation.getTemporaryFileDirectory() + }); + } + + protected void readRecords(long lastExpectedRecord) throws Exception { + LOG.debug("readRecords(lastExpectedRecord = {})", lastExpectedRecord); + final LongValue value = new LongValue(); + + while (running && reader.next(value)) { + final long ts = value.getValue(); + if (ts == lastExpectedRecord) { + expectedRecordCounter++; + if (expectedRecordCounter == expectedRepetitionsOfExpectedRecord) { + break; + } + } + } + } +} -- GitLab