From 76e3156d014e12094bbb55ecbd024b5edbf4b4cf Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 14 Dec 2017 16:25:43 +0100 Subject: [PATCH] [FLINK-7928] Move Resource out of ResourceSpec --- .../api/common/operators/ResourceSpec.java | 141 ++---------------- .../api/common/resources/GPUResource.java | 44 ++++++ .../flink/api/common/resources/Resource.java | 123 +++++++++++++++ .../types/ResourceProfile.java | 59 ++++---- .../org/apache/flink/api/scala/DataSet.scala | 2 +- 5 files changed, 209 insertions(+), 160 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/resources/GPUResource.java create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java index e82c738187d..7bc29484a2b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java @@ -19,7 +19,8 @@ package org.apache.flink.api.common.operators; import org.apache.flink.annotation.Internal; -import org.apache.flink.util.Preconditions; +import org.apache.flink.api.common.resources.GPUResource; +import org.apache.flink.api.common.resources.Resource; import javax.annotation.Nonnull; @@ -28,8 +29,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; -import static org.apache.flink.util.Preconditions.checkNotNull; - /** * Describe the different resource factors of the operator with UDF. * @@ -97,7 +96,7 @@ public class ResourceSpec implements Serializable { this.stateSizeInMB = stateSizeInMB; for (Resource resource : extendedResources) { if (resource != null) { - this.extendedResources.put(resource.name, resource); + this.extendedResources.put(resource.getName(), resource); } } } @@ -118,7 +117,7 @@ public class ResourceSpec implements Serializable { this.stateSizeInMB + other.stateSizeInMB); target.extendedResources.putAll(extendedResources); for (Resource resource : other.extendedResources.values()) { - target.extendedResources.merge(resource.name, resource, (v1, v2) -> v1.merge(v2)); + target.extendedResources.merge(resource.getName(), resource, (v1, v2) -> v1.merge(v2)); } return target; } @@ -148,9 +147,14 @@ public class ResourceSpec implements Serializable { if (gpuResource != null) { return gpuResource.getValue(); } + return 0.0; } + public Map getExtendedResources() { + return extendedResources; + } + /** * Check whether all the field values are valid. * @@ -160,7 +164,7 @@ public class ResourceSpec implements Serializable { if (this.cpuCores >= 0 && this.heapMemoryInMB >= 0 && this.directMemoryInMB >= 0 && this.nativeMemoryInMB >= 0 && this.stateSizeInMB >= 0) { for (Resource resource : extendedResources.values()) { - if (resource.value < 0) { + if (resource.getValue() < 0) { return false; } } @@ -185,9 +189,9 @@ public class ResourceSpec implements Serializable { int cmp5 = Integer.compare(this.stateSizeInMB, other.stateSizeInMB); if (cmp1 <= 0 && cmp2 <= 0 && cmp3 <= 0 && cmp4 <= 0 && cmp5 <= 0) { for (Resource resource : extendedResources.values()) { - if (!other.extendedResources.containsKey(resource.name) || - !other.extendedResources.get(resource.name).type.equals(resource.type) || - other.extendedResources.get(resource.name).value < resource.value) { + if (!other.extendedResources.containsKey(resource.getName()) || + other.extendedResources.get(resource.getName()).getResourceAggregateType() != resource.getResourceAggregateType() || + other.extendedResources.get(resource.getName()).getValue() < resource.getValue()) { return false; } } @@ -229,7 +233,7 @@ public class ResourceSpec implements Serializable { public String toString() { String extend = ""; for (Resource resource : extendedResources.values()) { - extend += ", " + resource.name + "=" + resource.value; + extend += ", " + resource.getName() + "=" + resource.getValue(); } return "ResourceSpec{" + "cpuCores=" + cpuCores + @@ -297,121 +301,4 @@ public class ResourceSpec implements Serializable { } } - /** - * Base class for additional resources one can specify. - */ - public abstract static class Resource implements Serializable { - - private static final long serialVersionUID = 1L; - - /** - * Enum defining how resources are aggregated. - */ - public enum ResourceAggregateType { - /** - * Denotes keeping the sum of the values with same name when merging two resource specs for operator chaining. - */ - AGGREGATE_TYPE_SUM, - - /** - * Denotes keeping the max of the values with same name when merging two resource specs for operator chaining. - */ - AGGREGATE_TYPE_MAX - } - - private final String name; - - private final double value; - - private final ResourceAggregateType type; - - public Resource(String name, double value, ResourceAggregateType type) { - this.name = checkNotNull(name); - this.value = value; - this.type = checkNotNull(type); - } - - Resource merge(Resource other) { - Preconditions.checkArgument(getClass() == other.getClass(), "Merge with different resource type"); - Preconditions.checkArgument(this.name.equals(other.name), "Merge with different resource name"); - Preconditions.checkArgument(this.type.equals(other.type), "Merge with different aggregate type"); - - Double value = null; - switch (type) { - case AGGREGATE_TYPE_MAX : - value = Math.max(other.value, this.value); - break; - - case AGGREGATE_TYPE_SUM: - default: - value = this.value + other.value; - } - - Resource resource = create(value, type); - return resource; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } else if (o != null && getClass() == o.getClass()) { - Resource other = (Resource) o; - - return name.equals(other.name) && type.equals(other.type) && value == other.value; - } else { - return false; - } - } - - @Override - public int hashCode() { - int result = name.hashCode(); - result = 31 * result + type.ordinal(); - result = 31 * result + (int) value; - return result; - } - - public String getName() { - return this.name; - } - - public ResourceAggregateType getAggregateType() { - return this.type; - } - - public double getValue() { - return this.value; - } - - /** - * Create a resource of the same resource type. - * - * @param value The value of the resource - * @param type The aggregate type of the resource - * @return A new instance of the sub resource - */ - protected abstract Resource create(double value, ResourceAggregateType type); - } - - /** - * The GPU resource. - */ - public static class GPUResource extends Resource { - - private static final long serialVersionUID = -2276080061777135142L; - - public GPUResource(double value) { - this(value, ResourceAggregateType.AGGREGATE_TYPE_SUM); - } - - public GPUResource(double value, ResourceAggregateType type) { - super(GPU_NAME, value, type); - } - - @Override - public Resource create(double value, ResourceAggregateType type) { - return new GPUResource(value, type); - } - } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/resources/GPUResource.java b/flink-core/src/main/java/org/apache/flink/api/common/resources/GPUResource.java new file mode 100644 index 00000000000..1e7f1ef572e --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/resources/GPUResource.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.resources; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.operators.ResourceSpec; + +/** + * The GPU resource. + */ +@Internal +public class GPUResource extends Resource { + + private static final long serialVersionUID = -2276080061777135142L; + + public GPUResource(double value) { + this(value, ResourceAggregateType.AGGREGATE_TYPE_SUM); + } + + private GPUResource(double value, ResourceAggregateType type) { + super(ResourceSpec.GPU_NAME, value, type); + } + + @Override + public Resource create(double value, ResourceAggregateType type) { + return new GPUResource(value, type); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java b/flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java new file mode 100644 index 00000000000..59448a8ab22 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.resources; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Base class for resources one can specify. + */ +@Internal +public abstract class Resource implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * Enum defining how resources are aggregated. + */ + public enum ResourceAggregateType { + /** + * Denotes keeping the sum of the values with same name when merging two resource specs for operator chaining. + */ + AGGREGATE_TYPE_SUM, + + /** + * Denotes keeping the max of the values with same name when merging two resource specs for operator chaining. + */ + AGGREGATE_TYPE_MAX + } + + private final String name; + + private final double value; + + private final ResourceAggregateType resourceAggregateType; + + protected Resource(String name, double value, ResourceAggregateType type) { + this.name = checkNotNull(name); + this.value = value; + this.resourceAggregateType = checkNotNull(type); + } + + public Resource merge(Resource other) { + Preconditions.checkArgument(getClass() == other.getClass(), "Merge with different resource resourceAggregateType"); + Preconditions.checkArgument(this.name.equals(other.name), "Merge with different resource name"); + Preconditions.checkArgument(this.resourceAggregateType == other.resourceAggregateType, "Merge with different aggregate resourceAggregateType"); + + final double aggregatedValue; + switch (resourceAggregateType) { + case AGGREGATE_TYPE_MAX : + aggregatedValue = Math.max(other.value, this.value); + break; + + case AGGREGATE_TYPE_SUM: + default: + aggregatedValue = this.value + other.value; + } + + return create(aggregatedValue, resourceAggregateType); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } else if (o != null && getClass() == o.getClass()) { + Resource other = (Resource) o; + + return name.equals(other.name) && resourceAggregateType == other.resourceAggregateType && value == other.value; + } else { + return false; + } + } + + @Override + public int hashCode() { + int result = name.hashCode(); + result = 31 * result + resourceAggregateType.ordinal(); + result = 31 * result + (int) value; + return result; + } + + public String getName() { + return name; + } + + public ResourceAggregateType getResourceAggregateType() { + return this.resourceAggregateType; + } + + public double getValue() { + return this.value; + } + + /** + * Create a resource of the same resource resourceAggregateType. + * + * @param value The value of the resource + * @param type The aggregate resourceAggregateType of the resource + * @return A new instance of the sub resource + */ + protected abstract Resource create(double value, ResourceAggregateType type); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java index fc0bf1505ac..3dec3f304f3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.clusterframework.types; +import org.apache.flink.api.common.resources.Resource; import org.apache.flink.api.common.operators.ResourceSpec; import javax.annotation.Nonnull; @@ -34,7 +35,7 @@ import java.util.Objects; * checked whether it can match another profile's requirement, and furthermore we may calculate a matching * score to decide which profile we should choose when we have lots of candidate slots. * It should be generated from {@link ResourceSpec} with the input and output memory calculated in JobMaster. - * + * *

Resource Profiles have a total ordering, defined by comparing these fields in sequence: *

    *
  1. Memory Size
  2. @@ -51,20 +52,20 @@ public class ResourceProfile implements Serializable, Comparable extendedResources = new HashMap<>(1); + private final Map extendedResources = new HashMap<>(1); // ------------------------------------------------------------------------ @@ -82,7 +83,7 @@ public class ResourceProfile implements Serializable, Comparable extendedResources) { + Map extendedResources) { this.cpuCores = cpuCores; this.heapMemoryInMB = heapMemoryInMB; this.directMemoryInMB = directMemoryInMB; @@ -104,8 +105,8 @@ public class ResourceProfile implements Serializable, Comparable getExtendedResources() { + public Map getExtendedResources() { return Collections.unmodifiableMap(extendedResources); } @@ -187,9 +188,9 @@ public class ResourceProfile implements Serializable, Comparable= required.getHeapMemoryInMB() && directMemoryInMB >= required.getDirectMemoryInMB() && nativeMemoryInMB >= required.getNativeMemoryInMB()) { - for (Map.Entry resource : required.extendedResources.entrySet()) { + for (Map.Entry resource : required.extendedResources.entrySet()) { if (!extendedResources.containsKey(resource.getKey()) || - !extendedResources.get(resource.getKey()).getAggregateType().equals(resource.getValue().getAggregateType()) || + !extendedResources.get(resource.getKey()).getResourceAggregateType().equals(resource.getValue().getResourceAggregateType()) || extendedResources.get(resource.getKey()).getValue() < resource.getValue().getValue()) { return false; } @@ -206,15 +207,15 @@ public class ResourceProfile implements Serializable, Comparable> thisIterator = extendedResources.entrySet().iterator(); - Iterator> otherIterator = other.extendedResources.entrySet().iterator(); + Iterator> thisIterator = extendedResources.entrySet().iterator(); + Iterator> otherIterator = other.extendedResources.entrySet().iterator(); while (thisIterator.hasNext() && otherIterator.hasNext()) { - Map.Entry thisResource = thisIterator.next(); - Map.Entry otherResource = otherIterator.next(); + Map.Entry thisResource = thisIterator.next(); + Map.Entry otherResource = otherIterator.next(); if ((cmp = otherResource.getKey().compareTo(thisResource.getKey())) != 0) { return cmp; } - if (!otherResource.getValue().getAggregateType().equals(thisResource.getValue().getAggregateType())) { + if (!otherResource.getValue().getResourceAggregateType().equals(thisResource.getValue().getResourceAggregateType())) { return 1; } if ((cmp = Double.compare(thisResource.getValue().getValue(), otherResource.getValue().getValue())) != 0) { @@ -261,32 +262,26 @@ public class ResourceProfile implements Serializable, Comparable resource : extendedResources.entrySet()) { - resourceStr += ", " + resource.getKey() + "=" + resource.getValue(); + final StringBuilder resources = new StringBuilder(extendedResources.size() * 10); + for (Map.Entry resource : extendedResources.entrySet()) { + resources.append(", ").append(resource.getKey()).append('=').append(resource.getValue()); } return "ResourceProfile{" + "cpuCores=" + cpuCores + ", heapMemoryInMB=" + heapMemoryInMB + ", directMemoryInMB=" + directMemoryInMB + - ", nativeMemoryInMB=" + nativeMemoryInMB + resourceStr + + ", nativeMemoryInMB=" + nativeMemoryInMB + resources + '}'; } - public static ResourceProfile fromResourceSpec( - ResourceSpec resourceSpec) { - Map extendResource = new HashMap<>(1); - double gpu = resourceSpec.getGPUResource(); - if (gpu > 0) { - extendResource.put(ResourceSpec.GPU_NAME, new ResourceSpec.GPUResource(gpu)); - } - ResourceProfile resourceProfile = new ResourceProfile( + static ResourceProfile fromResourceSpec(ResourceSpec resourceSpec) { + Map copiedExtendedResources = new HashMap<>(resourceSpec.getExtendedResources()); + + return new ResourceProfile( resourceSpec.getCpuCores(), resourceSpec.getHeapMemory(), resourceSpec.getDirectMemory(), resourceSpec.getNativeMemory(), - extendResource - ); - return resourceProfile; + copiedExtendedResources); } } diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala index bfe7567b5ef..fd365681c77 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala @@ -23,7 +23,7 @@ import org.apache.flink.api.common.accumulators.SerializedListAccumulator import org.apache.flink.api.common.aggregators.Aggregator import org.apache.flink.api.common.functions._ import org.apache.flink.api.common.io.{FileOutputFormat, OutputFormat} -import org.apache.flink.api.common.operators.{ResourceSpec, Keys, Order} +import org.apache.flink.api.common.operators.{Keys, Order, ResourceSpec} import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod -- GitLab