diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java index c47698147284e71f4e3643ca7d92d3c35b4ad55f..76289128b5bca55a3d427a37d983413d5a9aff5b 100644 --- a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java +++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java @@ -938,7 +938,7 @@ public class DescriptorProperties { } // validate - for (int i = 0; i < maxIndex; i++) { + for (int i = 0; i <= maxIndex; i++) { for (Map.Entry> subKey : subKeyValidation.entrySet()) { final String fullKey = key + '.' + i + '.' + subKey.getKey(); if (properties.containsKey(fullKey)) { @@ -1134,7 +1134,7 @@ public class DescriptorProperties { } // validate array elements - for (int i = 0; i < maxIndex; i++) { + for (int i = 0; i <= maxIndex; i++) { final String fullKey = key + '.' + i; if (properties.containsKey(fullKey)) { // run validation logic diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorPropertiesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorPropertiesTest.scala index b2a8ec9f567d8d07fb739e21bb1adb4a55be63e6..fe7c75df6cf789a2a7156b91635e7a246037f126 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorPropertiesTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorPropertiesTest.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.descriptors import java.util import java.util.Collections +import java.util.function.Consumer import org.apache.flink.table.api.ValidationException import org.apache.flink.table.util.JavaScalaConversionUtil.toJava @@ -32,6 +33,9 @@ import org.junit.Test class DescriptorPropertiesTest { private val ARRAY_KEY = "my-array" + private val FIXED_INDEXED_PROPERTY_KEY = "my-fixed-indexed-property" + private val PROPERTY_1_KEY = "property-1" + private val PROPERTY_2_KEY = "property-2" @Test def testEquals(): Unit = { @@ -97,8 +101,8 @@ class DescriptorPropertiesTest { def testArrayInvalidValues(): Unit = { val properties = new DescriptorProperties() properties.putString(s"$ARRAY_KEY.0", "12") - properties.putString(s"$ARRAY_KEY.1", "INVALID") - properties.putString(s"$ARRAY_KEY.2", "66") + properties.putString(s"$ARRAY_KEY.1", "66") + properties.putString(s"$ARRAY_KEY.2", "INVALID") testArrayValidation(properties, 1, Integer.MAX_VALUE) } @@ -118,6 +122,19 @@ class DescriptorPropertiesTest { testArrayValidation(properties, 1, Integer.MAX_VALUE) } + @Test(expected = classOf[ValidationException]) + def testInvalidFixedIndexedProperties(): Unit = { + val property = new DescriptorProperties() + val list = new util.ArrayList[util.List[String]]() + list.add(util.Arrays.asList("1", "string")) + list.add(util.Arrays.asList("INVALID", "string")) + property.putIndexedFixedProperties( + FIXED_INDEXED_PROPERTY_KEY, + util.Arrays.asList(PROPERTY_1_KEY, PROPERTY_2_KEY), + list) + testFixedIndexedPropertiesValidation(property) + } + @Test def testRemoveKeys(): Unit = { val properties = new DescriptorProperties() @@ -155,7 +172,7 @@ class DescriptorPropertiesTest { minLength: Int, maxLength: Int) : Unit = { - val validator: (String) => Unit = (key: String) => { + val validator: String => Unit = (key: String) => { properties.validateInt(key, false) } @@ -165,4 +182,26 @@ class DescriptorPropertiesTest { minLength, maxLength) } + + private def testFixedIndexedPropertiesValidation(properties: DescriptorProperties): Unit = { + + val validatorMap = new util.HashMap[String, Consumer[String]]() + + // PROPERTY_1 should be Int + val validator1: String => Unit = (key: String) => { + properties.validateInt(key, false) + } + validatorMap.put(PROPERTY_1_KEY, toJava(validator1)) + // PROPERTY_2 should be String + val validator2: String => Unit = (key: String) => { + properties.validateString(key, false) + } + validatorMap.put(PROPERTY_2_KEY, toJava(validator2)) + + properties.validateFixedIndexedProperties( + FIXED_INDEXED_PROPERTY_KEY, + false, + validatorMap + ) + } }