From aeb04aa5462adb8c944e9e64dc22fade6a94b16d Mon Sep 17 00:00:00 2001 From: lipenghui Date: Mon, 22 Jul 2019 22:59:43 +0800 Subject: [PATCH] Add options to rewrite namespace delimiter for pulsar sql. (#4749) ### Motivation Fix #4732 ### Modifications Add options to rewrite the namespace delimiter, disable by default Enable rewrite namespace delimiter can work well with superset: superset ### Does this pull request potentially affect one of the following parts: *If `yes` was chosen, please highlight the changes* - Dependencies (does it add or upgrade a dependency): (no) - The public API: (no) - The schema: (no) - The default values of configurations: (no) - The wire protocol: (no) - The rest endpoints: (no) - The admin cli options: (no) - Anything that affects deployment: (no) ### Documentation - Does this pull request introduce a new feature? (no) (cherry picked from commit 6ddd51ff45999bd7daa238073e9bcfd87d1df16a) --- conf/presto/catalog/pulsar.properties | 7 +- .../pulsar/common/naming/NamedEntity.java | 2 +- .../sql/presto/PulsarConnectorConfig.java | 33 ++++++++ .../sql/presto/PulsarConnectorFactory.java | 4 +- .../sql/presto/PulsarConnectorUtils.java | 14 ++++ .../pulsar/sql/presto/PulsarMetadata.java | 27 ++++--- .../pulsar/sql/presto/PulsarSplitManager.java | 11 ++- .../sql/presto/TestPulsarConnector.java | 18 +++++ .../sql/presto/TestPulsarConnectorConfig.java | 52 ++++++++++++ .../pulsar/sql/presto/TestPulsarMetadata.java | 81 +++++++++++-------- .../sql/presto/TestPulsarSplitManager.java | 23 +++--- 11 files changed, 210 insertions(+), 62 deletions(-) create mode 100644 pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java diff --git a/conf/presto/catalog/pulsar.properties b/conf/presto/catalog/pulsar.properties index 7f191e5e424..938729698e9 100644 --- a/conf/presto/catalog/pulsar.properties +++ b/conf/presto/catalog/pulsar.properties @@ -30,7 +30,12 @@ pulsar.target-num-splits=2 # max message queue size pulsar.max-split-message-queue-size=10000 # max entry queue size -pulsar.max-split-entry-queue-size = 1000 +pulsar.max-split-entry-queue-size=1000 +# Rewrite namespace delimiter +# Warn: avoid using symbols allowed by Namespace (a-zA-Z_0-9 -=:%) +# to prevent erroneous rewriting +pulsar.namespace-delimiter-rewrite-enable=false +pulsar.rewrite-namespace-delimiter=/ ####### TIERED STORAGE OFFLOADER CONFIGS ####### diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamedEntity.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamedEntity.java index e5180d6935a..8234f9b8434 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamedEntity.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamedEntity.java @@ -31,7 +31,7 @@ public class NamedEntity { // allowed characters for property, namespace, cluster and topic names are // alphanumeric (a-zA-Z_0-9) and these special chars -=:. // % is allowed as part of valid URL encoding - private static final Pattern NAMED_ENTITY_PATTERN = Pattern.compile("^[-=:.\\w]*$"); + public static final Pattern NAMED_ENTITY_PATTERN = Pattern.compile("^[-=:.\\w]*$"); public static void checkName(String name) throws IllegalArgumentException { Matcher m = NAMED_ENTITY_PATTERN.matcher(name); diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java index a6b625fd92e..b10c04036f9 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java @@ -23,12 +23,14 @@ import io.airlift.configuration.Config; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.bookkeeper.stats.NullStatsProvider; +import org.apache.pulsar.common.naming.NamedEntity; import org.apache.pulsar.common.protocol.Commands; import javax.validation.constraints.NotNull; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.regex.Matcher; public class PulsarConnectorConfig implements AutoCloseable { @@ -40,8 +42,12 @@ public class PulsarConnectorConfig implements AutoCloseable { private int maxSplitEntryQueueSize = 1000; private int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE; private String statsProvider = NullStatsProvider.class.getName(); + private Map statsProviderConfigs = new HashMap<>(); + private boolean namespaceDelimiterRewriteEnable = false; + private String rewriteNamespaceDelimiter = "/"; + /**** --- Ledger Offloading --- ****/ private String managedLedgerOffloadDriver = null; private int managedLedgerOffloadMaxThreads = 2; @@ -191,6 +197,33 @@ public class PulsarConnectorConfig implements AutoCloseable { return this; } + public String getRewriteNamespaceDelimiter() { + return rewriteNamespaceDelimiter; + } + + @Config("pulsar.rewrite-namespace-delimiter") + public PulsarConnectorConfig setRewriteNamespaceDelimiter(String rewriteNamespaceDelimiter) { + Matcher m = NamedEntity.NAMED_ENTITY_PATTERN.matcher(rewriteNamespaceDelimiter); + if (m.matches()) { + throw new IllegalArgumentException( + "Can't use " + rewriteNamespaceDelimiter + "as delimiter, " + + "because delimiter must contain characters which name of namespace not allowed" + ); + } + this.rewriteNamespaceDelimiter = rewriteNamespaceDelimiter; + return this; + } + + public boolean getNamespaceDelimiterRewriteEnable() { + return namespaceDelimiterRewriteEnable; + } + + @Config("pulsar.namespace-delimiter-rewrite-enable") + public PulsarConnectorConfig setNamespaceDelimiterRewriteEnable(boolean namespaceDelimiterRewriteEnable) { + this.namespaceDelimiterRewriteEnable = namespaceDelimiterRewriteEnable; + return this; + } + @NotNull public PulsarAdmin getPulsarAdmin() throws PulsarClientException { if (this.pulsarAdmin == null) { diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java index c119a9ce521..0719e8e1a57 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java @@ -49,7 +49,9 @@ public class PulsarConnectorFactory implements ConnectorFactory { @Override public Connector create(String connectorId, Map config, ConnectorContext context) { requireNonNull(config, "requiredConfig is null"); - log.debug("Creating Pulsar connector with configs: %s", config); + if (log.isDebugEnabled()) { + log.debug("Creating Pulsar connector with configs: %s", config); + } try { // A plugin is not required to use Guice; it is just very convenient Bootstrap app = new Bootstrap( diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java index ee256f63773..b14cb87dc83 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java @@ -83,4 +83,18 @@ public class PulsarConnectorUtils { } return properties; } + + + public static String rewriteNamespaceDelimiterIfNeeded(String namespace, PulsarConnectorConfig config) { + return config.getNamespaceDelimiterRewriteEnable() + ? namespace.replace("/", config.getRewriteNamespaceDelimiter()) + : namespace; + } + + public static String restoreNamespaceDelimiterIfNeeded(String namespace, PulsarConnectorConfig config) { + return config.getNamespaceDelimiterRewriteEnable() + ? namespace.replace(config.getRewriteNamespaceDelimiter(), "/") + : namespace; + } + } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java index 5ed31e50e0e..1ee01775624 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java @@ -72,6 +72,7 @@ import java.util.Optional; import java.util.Set; import java.util.Stack; import java.util.function.Consumer; +import java.util.stream.Collectors; import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; @@ -81,11 +82,14 @@ import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP; import static java.util.Objects.requireNonNull; import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertColumnHandle; import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertTableHandle; +import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded; +import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded; public class PulsarMetadata implements ConnectorMetadata { private final String connectorId; private final PulsarAdmin pulsarAdmin; + private final PulsarConnectorConfig pulsarConnectorConfig; private static final String INFORMATION_SCHEMA = "information_schema"; @@ -94,6 +98,7 @@ public class PulsarMetadata implements ConnectorMetadata { @Inject public PulsarMetadata(PulsarConnectorId connectorId, PulsarConnectorConfig pulsarConnectorConfig) { this.connectorId = requireNonNull(connectorId, "connectorId is null").toString(); + this.pulsarConnectorConfig = pulsarConnectorConfig; try { this.pulsarAdmin = pulsarConnectorConfig.getPulsarAdmin(); } catch (PulsarClientException e) { @@ -107,7 +112,8 @@ public class PulsarMetadata implements ConnectorMetadata { try { List tenants = pulsarAdmin.tenants().getTenants(); for (String tenant : tenants) { - prestoSchemas.addAll(pulsarAdmin.namespaces().getNamespaces(tenant)); + prestoSchemas.addAll(pulsarAdmin.namespaces().getNamespaces(tenant).stream().map(namespace -> + rewriteNamespaceDelimiterIfNeeded(namespace, pulsarConnectorConfig)).collect(Collectors.toList())); } } catch (PulsarAdminException e) { throw new RuntimeException("Failed to get schemas from pulsar: " @@ -163,7 +169,7 @@ public class PulsarMetadata implements ConnectorMetadata { } else { List pulsarTopicList = null; try { - pulsarTopicList = this.pulsarAdmin.topics().getList(schemaNameOrNull); + pulsarTopicList = this.pulsarAdmin.topics().getList(restoreNamespaceDelimiterIfNeeded(schemaNameOrNull, pulsarConnectorConfig)); } catch (PulsarAdminException e) { if (e.getStatusCode() == 404) { log.warn("Schema " + schemaNameOrNull + " does not exsit"); @@ -256,28 +262,29 @@ public class PulsarMetadata implements ConnectorMetadata { if (schemaTableName.getSchemaName().equals(INFORMATION_SCHEMA)) { return null; } + String namespace = restoreNamespaceDelimiterIfNeeded(schemaTableName.getSchemaName(), pulsarConnectorConfig); TopicName topicName = TopicName.get( - String.format("%s/%s", schemaTableName.getSchemaName(), schemaTableName.getTableName())); + String.format("%s/%s", namespace, schemaTableName.getTableName())); List topics; try { if (!PulsarConnectorUtils.isPartitionedTopic(topicName, this.pulsarAdmin)) { - topics = this.pulsarAdmin.topics().getList(schemaTableName.getSchemaName()); + topics = this.pulsarAdmin.topics().getList(namespace); } else { - topics = this.pulsarAdmin.topics().getPartitionedTopicList((schemaTableName.getSchemaName())); + topics = this.pulsarAdmin.topics().getPartitionedTopicList(namespace); } } catch (PulsarAdminException e) { if (e.getStatusCode() == 404) { - throw new PrestoException(NOT_FOUND, "Schema " + schemaTableName.getSchemaName() + " does not exist"); + throw new PrestoException(NOT_FOUND, "Schema " + namespace + " does not exist"); } - throw new RuntimeException("Failed to get topics in schema " + schemaTableName.getSchemaName() + throw new RuntimeException("Failed to get topics in schema " + namespace + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e); } if (!topics.contains(topicName.toString())) { log.error("Table %s not found", - String.format("%s/%s", schemaTableName.getSchemaName(), + String.format("%s/%s", namespace, schemaTableName.getTableName())); throw new TableNotFoundException(schemaTableName); } @@ -285,14 +292,14 @@ public class PulsarMetadata implements ConnectorMetadata { SchemaInfo schemaInfo; try { schemaInfo = this.pulsarAdmin.schemas().getSchemaInfo( - String.format("%s/%s", schemaTableName.getSchemaName(), schemaTableName.getTableName())); + String.format("%s/%s", namespace, schemaTableName.getTableName())); } catch (PulsarAdminException e) { if (e.getStatusCode() == 404) { // to indicate that we can't read from topic because there is no schema return null; } throw new RuntimeException("Failed to get schema information for topic " - + String.format("%s/%s", schemaTableName.getSchemaName(), schemaTableName.getTableName()) + + String.format("%s/%s", namespace, schemaTableName.getTableName()) + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e); } List handles = getPulsarColumns( diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java index c75d0aaeb39..15192e43d9c 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java @@ -60,6 +60,7 @@ import java.util.List; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; import static org.apache.bookkeeper.mledger.ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries; +import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded; public class PulsarSplitManager implements ConnectorSplitManager { @@ -94,16 +95,18 @@ public class PulsarSplitManager implements ConnectorSplitManager { PulsarTableHandle tableHandle = layoutHandle.getTable(); TupleDomain tupleDomain = layoutHandle.getTupleDomain(); - TopicName topicName = TopicName.get("persistent", NamespaceName.get(tableHandle.getSchemaName()), + String namespace = restoreNamespaceDelimiterIfNeeded(tableHandle.getSchemaName(), pulsarConnectorConfig); + TopicName topicName = TopicName.get("persistent", NamespaceName.get(namespace), tableHandle.getTableName()); SchemaInfo schemaInfo; + try { schemaInfo = this.pulsarAdmin.schemas().getSchemaInfo( - String.format("%s/%s", tableHandle.getSchemaName(), tableHandle.getTableName())); + String.format("%s/%s", namespace, tableHandle.getTableName())); } catch (PulsarAdminException e) { throw new RuntimeException("Failed to get schema for topic " - + String.format("%s/%s", tableHandle.getSchemaName(), tableHandle.getTableName()) + + String.format("%s/%s", namespace, tableHandle.getTableName()) + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e); } @@ -258,7 +261,7 @@ public class PulsarSplitManager implements ConnectorSplitManager { PositionImpl endPosition = (PositionImpl) readOnlyCursor.getReadPosition(); splits.add(new PulsarSplit(i, this.connectorId, - tableHandle.getSchemaName(), + restoreNamespaceDelimiterIfNeeded(tableHandle.getSchemaName(), pulsarConnectorConfig), tableName, entriesForSplit, new String(schemaInfo.getSchema()), diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java index 079cd9fd5c4..77213240da2 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java @@ -38,6 +38,7 @@ import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.impl.ReadOnlyCursorImpl; import org.apache.bookkeeper.mledger.proto.MLDataFormats; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -63,6 +64,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.time.LocalDate; @@ -934,4 +936,20 @@ public abstract class TestPulsarConnector { public void cleanup() { completedBytes = 0L; } + + @DataProvider(name = "rewriteNamespaceDelimiter") + public static Object[][] serviceUrls() { + return new Object[][] { + { "|" }, { null } + }; + } + + protected void updateRewriteNamespaceDelimiterIfNeeded(String delimiter) { + if (StringUtils.isNotBlank(delimiter)) { + pulsarConnectorConfig.setNamespaceDelimiterRewriteEnable(true); + pulsarConnectorConfig.setRewriteNamespaceDelimiter(delimiter); + } else { + pulsarConnectorConfig.setNamespaceDelimiterRewriteEnable(false); + } + } } diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java new file mode 100644 index 00000000000..82b8c977bf5 --- /dev/null +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.sql.presto; + +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestPulsarConnectorConfig { + + @Test + public void testDefaultNamespaceDelimiterRewrite() { + PulsarConnectorConfig connectorConfig = new PulsarConnectorConfig(); + Assert.assertFalse(connectorConfig.getNamespaceDelimiterRewriteEnable()); + Assert.assertEquals("/", connectorConfig.getRewriteNamespaceDelimiter()); + } + + @Test + public void testNamespaceRewriteDelimiterRestriction() { + PulsarConnectorConfig connectorConfig = new PulsarConnectorConfig(); + try { + connectorConfig.setRewriteNamespaceDelimiter("-=:.Az09_"); + } catch (Exception e) { + Assert.assertTrue(e instanceof IllegalArgumentException); + } + connectorConfig.setRewriteNamespaceDelimiter("|"); + Assert.assertEquals("|", (connectorConfig.getRewriteNamespaceDelimiter())); + connectorConfig.setRewriteNamespaceDelimiter("||"); + Assert.assertEquals("||", (connectorConfig.getRewriteNamespaceDelimiter())); + connectorConfig.setRewriteNamespaceDelimiter("$"); + Assert.assertEquals("$", (connectorConfig.getRewriteNamespaceDelimiter())); + connectorConfig.setRewriteNamespaceDelimiter("&"); + Assert.assertEquals("&", (connectorConfig.getRewriteNamespaceDelimiter())); + connectorConfig.setRewriteNamespaceDelimiter("--&"); + Assert.assertEquals("--&", (connectorConfig.getRewriteNamespaceDelimiter())); + } +} diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java index c879a57dc41..829aac9bca9 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java @@ -29,6 +29,7 @@ import com.facebook.presto.spi.SchemaTablePrefix; import com.facebook.presto.spi.TableNotFoundException; import io.airlift.log.Logger; import org.apache.avro.Schema; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.schema.SchemaInfo; @@ -61,19 +62,29 @@ public class TestPulsarMetadata extends TestPulsarConnector { private static final Logger log = Logger.get(TestPulsarMetadata.class); - @Test - public void testListSchemaNames() { - + @Test(dataProvider = "rewriteNamespaceDelimiter") + public void testListSchemaNames(String delimiter) { + updateRewriteNamespaceDelimiterIfNeeded(delimiter); List schemas = this.pulsarMetadata.listSchemaNames(mock(ConnectorSession.class)); - String[] expectedSchemas = {NAMESPACE_NAME_1.toString(), NAMESPACE_NAME_2.toString(), - NAMESPACE_NAME_3.toString(), NAMESPACE_NAME_4.toString()}; - assertEquals(new HashSet<>(schemas), new HashSet<>(Arrays.asList(expectedSchemas))); - } - @Test - public void testGetTableHandle() { + if (StringUtils.isBlank(delimiter)) { + String[] expectedSchemas = {NAMESPACE_NAME_1.toString(), NAMESPACE_NAME_2.toString(), + NAMESPACE_NAME_3.toString(), NAMESPACE_NAME_4.toString()}; + assertEquals(new HashSet<>(schemas), new HashSet<>(Arrays.asList(expectedSchemas))); + } else { + String[] expectedSchemas = { + PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded(NAMESPACE_NAME_1.toString(), pulsarConnectorConfig), + PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded(NAMESPACE_NAME_2.toString(), pulsarConnectorConfig), + PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded(NAMESPACE_NAME_3.toString(), pulsarConnectorConfig), + PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded(NAMESPACE_NAME_4.toString(), pulsarConnectorConfig)}; + assertEquals(new HashSet<>(schemas), new HashSet<>(Arrays.asList(expectedSchemas))); + } + } + @Test(dataProvider = "rewriteNamespaceDelimiter") + public void testGetTableHandle(String delimiter) { + updateRewriteNamespaceDelimiterIfNeeded(delimiter); SchemaTableName schemaTableName = new SchemaTableName(TOPIC_1.getNamespace(), TOPIC_1.getLocalName()); ConnectorTableHandle connectorTableHandle @@ -89,9 +100,9 @@ public class TestPulsarMetadata extends TestPulsarConnector { assertEquals(pulsarTableHandle.getTopicName(), TOPIC_1.getLocalName()); } - @Test - public void testGetTableMetadata() { - + @Test(dataProvider = "rewriteNamespaceDelimiter") + public void testGetTableMetadata(String delimiter) { + updateRewriteNamespaceDelimiterIfNeeded(delimiter); List allTopics = new LinkedList<>(); allTopics.addAll(topicNames); allTopics.addAll(partitionedTopicNames); @@ -133,9 +144,9 @@ public class TestPulsarMetadata extends TestPulsarConnector { } } - @Test - public void testGetTableMetadataWrongSchema() { - + @Test(dataProvider = "rewriteNamespaceDelimiter") + public void testGetTableMetadataWrongSchema(String delimiter) { + updateRewriteNamespaceDelimiterIfNeeded(delimiter); PulsarTableHandle pulsarTableHandle = new PulsarTableHandle( pulsarConnectorId.toString(), "wrong-tenant/wrong-ns", @@ -153,9 +164,9 @@ public class TestPulsarMetadata extends TestPulsarConnector { } } - @Test - public void testGetTableMetadataWrongTable() { - + @Test(dataProvider = "rewriteNamespaceDelimiter") + public void testGetTableMetadataWrongTable(String delimiter) { + updateRewriteNamespaceDelimiterIfNeeded(delimiter); PulsarTableHandle pulsarTableHandle = new PulsarTableHandle( pulsarConnectorId.toString(), TOPIC_1.getNamespace(), @@ -173,9 +184,9 @@ public class TestPulsarMetadata extends TestPulsarConnector { } } - @Test - public void testGetTableMetadataTableNoSchema() throws PulsarAdminException { - + @Test(dataProvider = "rewriteNamespaceDelimiter") + public void testGetTableMetadataTableNoSchema(String delimiter) throws PulsarAdminException { + updateRewriteNamespaceDelimiterIfNeeded(delimiter); when(this.schemas.getSchemaInfo(eq(TOPIC_1.getSchemaName()))).thenThrow( new PulsarAdminException(new ClientErrorException(Response.Status.NOT_FOUND))); @@ -192,9 +203,9 @@ public class TestPulsarMetadata extends TestPulsarConnector { assertEquals(tableMetadata.getColumns().size(), 0); } - @Test - public void testGetTableMetadataTableBlankSchema() throws PulsarAdminException { - + @Test(dataProvider = "rewriteNamespaceDelimiter") + public void testGetTableMetadataTableBlankSchema(String delimiter) throws PulsarAdminException { + updateRewriteNamespaceDelimiterIfNeeded(delimiter); SchemaInfo badSchemaInfo = new SchemaInfo(); badSchemaInfo.setSchema(new byte[0]); badSchemaInfo.setType(SchemaType.AVRO); @@ -218,9 +229,9 @@ public class TestPulsarMetadata extends TestPulsarConnector { } } - @Test - public void testGetTableMetadataTableInvalidSchema() throws PulsarAdminException { - + @Test(dataProvider = "rewriteNamespaceDelimiter") + public void testGetTableMetadataTableInvalidSchema(String delimiter) throws PulsarAdminException { + updateRewriteNamespaceDelimiterIfNeeded(delimiter); SchemaInfo badSchemaInfo = new SchemaInfo(); badSchemaInfo.setSchema("foo".getBytes()); badSchemaInfo.setType(SchemaType.AVRO); @@ -244,8 +255,9 @@ public class TestPulsarMetadata extends TestPulsarConnector { } } - @Test - public void testListTable() { + @Test(dataProvider = "rewriteNamespaceDelimiter") + public void testListTable(String delimiter) { + updateRewriteNamespaceDelimiterIfNeeded(delimiter); assertTrue(this.pulsarMetadata.listTables(mock(ConnectorSession.class), null).isEmpty()); assertTrue(this.pulsarMetadata.listTables(mock(ConnectorSession.class), "wrong-tenant/wrong-ns") .isEmpty()); @@ -260,9 +272,9 @@ public class TestPulsarMetadata extends TestPulsarConnector { NAMESPACE_NAME_4.toString())), new HashSet<>(Arrays.asList(expectedTopics2))); } - @Test - public void testGetColumnHandles() { - + @Test(dataProvider = "rewriteNamespaceDelimiter") + public void testGetColumnHandles(String delimiter) { + updateRewriteNamespaceDelimiterIfNeeded(delimiter); PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(pulsarConnectorId.toString(), TOPIC_1.getNamespace(), TOPIC_1.getLocalName(), TOPIC_1.getLocalName()); Map columnHandleMap @@ -296,8 +308,9 @@ public class TestPulsarMetadata extends TestPulsarConnector { assertTrue(columnHandleMap.isEmpty()); } - @Test - public void testListTableColumns() { + @Test(dataProvider = "rewriteNamespaceDelimiter") + public void testListTableColumns(String delimiter) { + updateRewriteNamespaceDelimiterIfNeeded(delimiter); Map> tableColumnsMap = this.pulsarMetadata.listTableColumns(mock(ConnectorSession.class), new SchemaTablePrefix(TOPIC_1.getNamespace())); diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java index 253803dd23a..adeaf909d20 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java @@ -66,9 +66,9 @@ public class TestPulsarSplitManager extends TestPulsarConnector { } } - @Test - public void testTopic() throws Exception { - + @Test(dataProvider = "rewriteNamespaceDelimiter") + public void testTopic(String delimiter) throws Exception { + updateRewriteNamespaceDelimiterIfNeeded(delimiter); for (TopicName topicName : topicNames) { setup(); log.info("!----- topic: %s -----!", topicName); @@ -113,8 +113,9 @@ public class TestPulsarSplitManager extends TestPulsarConnector { } - @Test - public void testPartitionedTopic() throws Exception { + @Test(dataProvider = "rewriteNamespaceDelimiter") + public void testPartitionedTopic(String delimiter) throws Exception { + updateRewriteNamespaceDelimiterIfNeeded(delimiter); for (TopicName topicName : partitionedTopicNames) { setup(); log.info("!----- topic: %s -----!", topicName); @@ -169,9 +170,9 @@ public class TestPulsarSplitManager extends TestPulsarConnector { }).collect(Collectors.toList()); } - @Test - public void testPublishTimePredicatePushdown() throws Exception { - + @Test(dataProvider = "rewriteNamespaceDelimiter") + public void testPublishTimePredicatePushdown(String delimiter) throws Exception { + updateRewriteNamespaceDelimiterIfNeeded(delimiter); TopicName topicName = TOPIC_1; setup(); @@ -226,9 +227,9 @@ public class TestPulsarSplitManager extends TestPulsarConnector { } - @Test - public void testPublishTimePredicatePushdownPartitionedTopic() throws Exception { - + @Test(dataProvider = "rewriteNamespaceDelimiter") + public void testPublishTimePredicatePushdownPartitionedTopic(String delimiter) throws Exception { + updateRewriteNamespaceDelimiterIfNeeded(delimiter); TopicName topicName = PARTITIONED_TOPIC_1; setup(); -- GitLab