提交 c8161911 编写于 作者: P Piotr Nowojski 提交者: Stefan Richter

[FLINK-8220][network-benchmarks] Define network benchmarks in Flink project

上级 81d3e72e
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.io.network.benchmark;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.types.LongValue;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
/**
* Wrapping thread around {@link RecordWriter} that sends a fixed number of <tt>LongValue(0)</tt>
* records.
*/
public class LongRecordWriterThread extends CheckedThread {
private final RecordWriter<LongValue> recordWriter;
/**
* Future to wait on a definition of the number of records to send.
*/
private CompletableFuture<Long> recordsToSend = new CompletableFuture<>();
private volatile boolean running = true;
public LongRecordWriterThread(RecordWriter<LongValue> recordWriter) {
this.recordWriter = checkNotNull(recordWriter);
}
public void shutdown() {
running = false;
recordsToSend.complete(0L);
}
/**
* Initializes the record writer thread with this many numbers to send.
*
* <p>If the thread was already started, if may now continue.
*
* @param records
* number of records to send
*/
public synchronized void setRecordsToSend(long records) {
checkState(!recordsToSend.isDone());
recordsToSend.complete(records);
}
private synchronized CompletableFuture<Long> getRecordsToSend() {
return recordsToSend;
}
private synchronized void finishSendingRecords() {
recordsToSend = new CompletableFuture<>();
}
@Override
public void go() throws Exception {
while (running) {
sendRecords(getRecordsToSend().get());
}
}
private void sendRecords(long records) throws IOException, InterruptedException {
LongValue value = new LongValue(0);
for (int i = 1; i < records; i++) {
recordWriter.emit(value);
}
value.setValue(records);
recordWriter.broadcastEmit(value);
recordWriter.flush();
finishSendingRecords();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.io.network.benchmark;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import static org.apache.flink.util.ExceptionUtils.suppressExceptions;
/**
* Context for network benchmarks executed by the external
* <a href="https://github.com/dataArtisans/flink-benchmarks">flink-benchmarks</a> project.
*/
public class NetworkBenchmarkEnvironment<T extends IOReadableWritable> {
private static final int BUFFER_SIZE = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue();
private static final int NUM_SLOTS_AND_THREADS = 1;
private static final InetAddress LOCAL_ADDRESS;
static {
try {
LOCAL_ADDRESS = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
throw new Error(e);
}
}
protected final JobID jobId = new JobID();
protected final IntermediateDataSetID dataSetID = new IntermediateDataSetID();
protected final ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
protected NetworkEnvironment senderEnv;
protected NetworkEnvironment receiverEnv;
protected IOManager ioManager;
protected int channels;
protected ResultPartitionID[] partitionIds;
public void setUp(int writers, int channels) throws Exception {
this.channels = channels;
this.partitionIds = new ResultPartitionID[writers];
int bufferPoolSize = Math.max(2048, writers * channels * 4);
senderEnv = createNettyNetworkEnvironment(bufferPoolSize);
receiverEnv = createNettyNetworkEnvironment(bufferPoolSize);
ioManager = new IOManagerAsync();
senderEnv.start();
receiverEnv.start();
generatePartitionIds();
}
public void tearDown() {
suppressExceptions(senderEnv::shutdown);
suppressExceptions(receiverEnv::shutdown);
suppressExceptions(ioManager::shutdown);
}
public SerializingLongReceiver createReceiver() throws Exception {
TaskManagerLocation senderLocation = new TaskManagerLocation(
ResourceID.generate(),
LOCAL_ADDRESS,
senderEnv.getConnectionManager().getDataPort());
InputGate receiverGate = createInputGate(
jobId,
dataSetID,
executionAttemptID,
senderLocation,
receiverEnv,
channels);
SerializingLongReceiver receiver = new SerializingLongReceiver(receiverGate, channels * partitionIds.length);
receiver.start();
return receiver;
}
public RecordWriter<T> createRecordWriter(int partitionIndex) throws Exception {
ResultPartitionWriter sender = createResultPartition(jobId, partitionIds[partitionIndex], senderEnv, channels);
return new RecordWriter<>(sender);
}
private void generatePartitionIds() throws Exception {
for (int writer = 0; writer < partitionIds.length; writer++) {
partitionIds[writer] = new ResultPartitionID();
}
}
private NetworkEnvironment createNettyNetworkEnvironment(
@SuppressWarnings("SameParameterValue") int bufferPoolSize) throws Exception {
final NetworkBufferPool bufferPool = new NetworkBufferPool(bufferPoolSize, BUFFER_SIZE);
final NettyConnectionManager nettyConnectionManager = new NettyConnectionManager(
new NettyConfig(LOCAL_ADDRESS, 0, BUFFER_SIZE, NUM_SLOTS_AND_THREADS, new Configuration()));
return new NetworkEnvironment(
bufferPool,
nettyConnectionManager,
new ResultPartitionManager(),
new TaskEventDispatcher(),
new KvStateRegistry(),
null,
null,
IOMode.SYNC,
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(),
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(),
TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(),
TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue());
}
protected ResultPartitionWriter createResultPartition(
JobID jobId,
ResultPartitionID partitionId,
NetworkEnvironment environment,
int channels) throws Exception {
ResultPartition resultPartition = new ResultPartition(
"sender task",
new NoOpTaskActions(),
jobId,
partitionId,
ResultPartitionType.PIPELINED_BOUNDED,
channels,
1,
environment.getResultPartitionManager(),
new NoOpResultPartitionConsumableNotifier(),
ioManager,
false);
// similar to NetworkEnvironment#registerTask()
int numBuffers = resultPartition.getNumberOfSubpartitions() *
TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() +
TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue();
BufferPool bufferPool = environment.getNetworkBufferPool().createBufferPool(channels, numBuffers);
resultPartition.registerBufferPool(bufferPool);
environment.getResultPartitionManager().registerResultPartition(resultPartition);
return resultPartition;
}
private InputGate createInputGate(
JobID jobId,
IntermediateDataSetID dataSetID,
ExecutionAttemptID executionAttemptID,
final TaskManagerLocation senderLocation,
NetworkEnvironment environment,
final int channels) throws IOException {
InputGate[] gates = new InputGate[channels];
for (int channel = 0; channel < channels; ++channel) {
int finalChannel = channel;
InputChannelDeploymentDescriptor[] channelDescriptors = Arrays.stream(partitionIds)
.map(partitionId -> new InputChannelDeploymentDescriptor(
partitionId,
ResultPartitionLocation.createRemote(new ConnectionID(senderLocation, finalChannel))))
.toArray(InputChannelDeploymentDescriptor[]::new);
final InputGateDeploymentDescriptor gateDescriptor = new InputGateDeploymentDescriptor(
dataSetID,
ResultPartitionType.PIPELINED_BOUNDED,
channel,
channelDescriptors);
SingleInputGate gate = SingleInputGate.create(
"receiving task[" + channel + "]",
jobId,
executionAttemptID,
gateDescriptor,
environment,
new NoOpTaskActions(),
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
// similar to NetworkEnvironment#registerTask()
int numBuffers = gate.getNumberOfInputChannels() *
TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() +
TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue();
BufferPool bufferPool =
environment.getNetworkBufferPool().createBufferPool(gate.getNumberOfInputChannels(), numBuffers);
gate.setBufferPool(bufferPool);
gates[channel] = gate;
}
if (channels > 1) {
return new UnionInputGate(gates);
} else {
return gates[0];
}
}
// ------------------------------------------------------------------------
// Mocks
// ------------------------------------------------------------------------
/**
* A dummy implementation of the {@link TaskActions}. We implement this here rather than using Mockito
* to avoid using mockito in this benchmark class.
*/
private static final class NoOpTaskActions implements TaskActions {
@Override
public void triggerPartitionProducerStateCheck(
JobID jobId,
IntermediateDataSetID intermediateDataSetId,
ResultPartitionID resultPartitionId) {}
@Override
public void failExternally(Throwable cause) {}
}
private static final class NoOpResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
@Override
public void notifyPartitionConsumable(JobID j, ResultPartitionID p, TaskActions t) {}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.io.network.benchmark;
import org.apache.flink.types.LongValue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* Network throughput benchmarks executed by the external
* <a href="https://github.com/dataArtisans/flink-benchmarks">flink-benchmarks</a> project.
*/
public class NetworkThroughputBenchmark {
private static final long RECEIVER_TIMEOUT = 30_000;
private NetworkBenchmarkEnvironment<LongValue> environment;
private ReceiverThread receiver;
private LongRecordWriterThread[] writerThreads;
/**
* Executes the throughput benchmark with the given number of records.
*
* @param records
* records to pass through the network stack
*/
public void executeBenchmark(long records) throws Exception {
final LongValue value = new LongValue();
value.setValue(0);
long lastRecord = records / writerThreads.length;
CompletableFuture<?> recordsReceived = receiver.setExpectedRecord(lastRecord);
for (LongRecordWriterThread writerThread : writerThreads) {
writerThread.setRecordsToSend(lastRecord);
}
recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS);
}
/**
* Initializes the throughput benchmark with the given parameters.
*
* @param recordWriters
* number of senders, i.e.
* {@link org.apache.flink.runtime.io.network.api.writer.RecordWriter} instances
* @param channels
* number of outgoing channels / receivers
*/
public void setUp(int recordWriters, int channels) throws Exception {
environment = new NetworkBenchmarkEnvironment<>();
environment.setUp(recordWriters, channels);
receiver = environment.createReceiver();
writerThreads = new LongRecordWriterThread[recordWriters];
for (int writer = 0; writer < recordWriters; writer++) {
writerThreads[writer] = new LongRecordWriterThread(environment.createRecordWriter(writer));
writerThreads[writer].start();
}
}
/**
* Shuts down a benchmark previously set up via {@link #setUp}.
*
* <p>This will wait for all senders to finish but timeout with an exception after 5 seconds.
*/
public void tearDown() throws Exception {
for (LongRecordWriterThread writerThread : writerThreads) {
writerThread.shutdown();
writerThread.sync(5000);
}
environment.tearDown();
receiver.shutdown();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.io.network.benchmark;
import org.junit.Test;
/**
* Tests for various network benchmarks based on {@link NetworkThroughputBenchmark}.
*/
public class NetworkThroughputBenchmarkTests {
@Test
public void pointToPointBenchmark() throws Exception {
NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark();
benchmark.setUp(1, 1);
try {
benchmark.executeBenchmark(1_000);
}
finally {
benchmark.tearDown();
}
}
@Test
public void pointToMultiPointBenchmark() throws Exception {
NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark();
benchmark.setUp(1, 100);
try {
benchmark.executeBenchmark(1_000);
}
finally {
benchmark.tearDown();
}
}
@Test
public void multiPointToPointBenchmark() throws Exception {
NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark();
benchmark.setUp(4, 1);
try {
benchmark.executeBenchmark(1_000);
}
finally {
benchmark.tearDown();
}
}
@Test
public void multiPointToMultiPointBenchmark() throws Exception {
NetworkThroughputBenchmark benchmark = new NetworkThroughputBenchmark();
benchmark.setUp(4, 100);
try {
benchmark.executeBenchmark(1_000);
}
finally {
benchmark.tearDown();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.io.network.benchmark;
import org.apache.flink.core.testutils.CheckedThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import static org.apache.flink.util.Preconditions.checkState;
/**
* This class waits for {@code expectedRepetitionsOfExpectedRecord} number of occurrences of the
* {@code expectedRecord}. {@code expectedRepetitionsOfExpectedRecord} is correlated with number of input channels.
*/
public abstract class ReceiverThread extends CheckedThread {
protected static final Logger LOG = LoggerFactory.getLogger(ReceiverThread.class);
protected final int expectedRepetitionsOfExpectedRecord;
protected int expectedRecordCounter;
protected CompletableFuture<Long> expectedRecord = new CompletableFuture<>();
protected CompletableFuture<?> recordsProcessed = new CompletableFuture<>();
protected volatile boolean running;
ReceiverThread(int expectedRepetitionsOfExpectedRecord) {
setName(this.getClass().getName());
this.expectedRepetitionsOfExpectedRecord = expectedRepetitionsOfExpectedRecord;
this.running = true;
}
public synchronized CompletableFuture<?> setExpectedRecord(long record) {
checkState(!expectedRecord.isDone());
checkState(!recordsProcessed.isDone());
expectedRecord.complete(record);
expectedRecordCounter = 0;
return recordsProcessed;
}
private synchronized CompletableFuture<Long> getExpectedRecord() {
return expectedRecord;
}
private synchronized void finishProcessingExpectedRecords() {
checkState(expectedRecord.isDone());
checkState(!recordsProcessed.isDone());
recordsProcessed.complete(null);
expectedRecord = new CompletableFuture<>();
recordsProcessed = new CompletableFuture<>();
}
@Override
public void go() throws Exception {
try {
while (running) {
readRecords(getExpectedRecord().get());
finishProcessingExpectedRecords();
}
}
catch (InterruptedException e) {
if (running) {
throw e;
}
} catch (Exception e) {
e.printStackTrace();
}
}
protected abstract void readRecords(long lastExpectedRecord) throws Exception;
public void shutdown() {
running = false;
interrupt();
expectedRecord.complete(0L);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.io.network.benchmark;
import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.types.LongValue;
/**
* {@link ReceiverThread} that deserialize incoming messages.
*/
public class SerializingLongReceiver extends ReceiverThread {
private final MutableRecordReader<LongValue> reader;
@SuppressWarnings("WeakerAccess")
public SerializingLongReceiver(InputGate inputGate, int expectedRepetitionsOfExpectedRecord) {
super(expectedRepetitionsOfExpectedRecord);
this.reader = new MutableRecordReader<>(
inputGate,
new String[]{
EnvironmentInformation.getTemporaryFileDirectory()
});
}
protected void readRecords(long lastExpectedRecord) throws Exception {
LOG.debug("readRecords(lastExpectedRecord = {})", lastExpectedRecord);
final LongValue value = new LongValue();
while (running && reader.next(value)) {
final long ts = value.getValue();
if (ts == lastExpectedRecord) {
expectedRecordCounter++;
if (expectedRecordCounter == expectedRepetitionsOfExpectedRecord) {
break;
}
}
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册