diff --git a/conf/presto/catalog/pulsar.properties b/conf/presto/catalog/pulsar.properties index 938729698e973d4856e26359f61e39ff5320d47b..c104b1e0eb1813b3b56e348c8c64758c516685a5 100644 --- a/conf/presto/catalog/pulsar.properties +++ b/conf/presto/catalog/pulsar.properties @@ -53,4 +53,22 @@ pulsar.rewrite-namespace-delimiter=/ #pulsar.offloader-properties = \ # {"s3ManagedLedgerOffloadBucket": "offload-bucket", \ # "s3ManagedLedgerOffloadRegion": "us-west-2", \ -# "s3ManagedLedgerOffloadServiceEndpoint": "http://s3.amazonaws.com"} \ No newline at end of file +# "s3ManagedLedgerOffloadServiceEndpoint": "http://s3.amazonaws.com"} + + +####### AUTHENTICATION CONFIGS ####### + +## the authentication plugin to be used to authenticate to Pulsar cluster +#pulsar.auth-plugin = + +## the authentication parameter to be used to authenticate to Pulsar cluster +#pulsar.auth-params = + +## Accept untrusted TLS certificate +#pulsar.tls-allow-insecure-connection = + +## Whether to enable hostname verification on TLS connections +#pulsar.tls-hostname-verification-enable = + +## Path for the trusted TLS certificate file +#pulsar.tls-trust-cert-file-path = diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java index b10c04036f9d6597d16a69939e1a7b8d8fbe9e46..edfa97d78276eb27f442000a42a93510265b5e18 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java @@ -21,6 +21,8 @@ package org.apache.pulsar.sql.presto; import com.fasterxml.jackson.databind.ObjectMapper; import io.airlift.configuration.Config; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminBuilder; +import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.bookkeeper.stats.NullStatsProvider; import org.apache.pulsar.common.naming.NamedEntity; @@ -44,6 +46,11 @@ public class PulsarConnectorConfig implements AutoCloseable { private String statsProvider = NullStatsProvider.class.getName(); private Map statsProviderConfigs = new HashMap<>(); + private String authPluginClassName; + private String authParams; + private String tlsTrustCertsFilePath; + private Boolean tlsAllowInsecureConnection; + private Boolean tlsHostnameVerificationEnable; private boolean namespaceDelimiterRewriteEnable = false; private String rewriteNamespaceDelimiter = "/"; @@ -154,6 +161,33 @@ public class PulsarConnectorConfig implements AutoCloseable { return this; } + public String getRewriteNamespaceDelimiter() { + return rewriteNamespaceDelimiter; + } + + @Config("pulsar.rewrite-namespace-delimiter") + public PulsarConnectorConfig setRewriteNamespaceDelimiter(String rewriteNamespaceDelimiter) { + Matcher m = NamedEntity.NAMED_ENTITY_PATTERN.matcher(rewriteNamespaceDelimiter); + if (m.matches()) { + throw new IllegalArgumentException( + "Can't use " + rewriteNamespaceDelimiter + "as delimiter, " + + "because delimiter must contain characters which name of namespace not allowed" + ); + } + this.rewriteNamespaceDelimiter = rewriteNamespaceDelimiter; + return this; + } + + public boolean getNamespaceDelimiterRewriteEnable() { + return namespaceDelimiterRewriteEnable; + } + + @Config("pulsar.namespace-delimiter-rewrite-enable") + public PulsarConnectorConfig setNamespaceDelimiterRewriteEnable(boolean namespaceDelimiterRewriteEnable) { + this.namespaceDelimiterRewriteEnable = namespaceDelimiterRewriteEnable; + return this; + } + /**** --- Ledger Offloading --- ****/ public int getManagedLedgerOffloadMaxThreads() { @@ -197,37 +231,80 @@ public class PulsarConnectorConfig implements AutoCloseable { return this; } - public String getRewriteNamespaceDelimiter() { - return rewriteNamespaceDelimiter; + /**** --- Authentication --- ****/ + + public String getAuthPlugin() { + return this.authPluginClassName; } - @Config("pulsar.rewrite-namespace-delimiter") - public PulsarConnectorConfig setRewriteNamespaceDelimiter(String rewriteNamespaceDelimiter) { - Matcher m = NamedEntity.NAMED_ENTITY_PATTERN.matcher(rewriteNamespaceDelimiter); - if (m.matches()) { - throw new IllegalArgumentException( - "Can't use " + rewriteNamespaceDelimiter + "as delimiter, " - + "because delimiter must contain characters which name of namespace not allowed" - ); - } - this.rewriteNamespaceDelimiter = rewriteNamespaceDelimiter; + @Config("pulsar.auth-plugin") + public PulsarConnectorConfig setAuthPlugin(String authPluginClassName) throws IOException { + this.authPluginClassName = authPluginClassName; return this; } - public boolean getNamespaceDelimiterRewriteEnable() { - return namespaceDelimiterRewriteEnable; + public String getAuthParams() { + return this.authParams; } - @Config("pulsar.namespace-delimiter-rewrite-enable") - public PulsarConnectorConfig setNamespaceDelimiterRewriteEnable(boolean namespaceDelimiterRewriteEnable) { - this.namespaceDelimiterRewriteEnable = namespaceDelimiterRewriteEnable; + @Config("pulsar.auth-params") + public PulsarConnectorConfig setAuthParams(String authParams) throws IOException { + this.authParams = authParams; + return this; + } + + public Boolean isTlsAllowInsecureConnection() { + return tlsAllowInsecureConnection; + } + + @Config("pulsar.tls-allow-insecure-connection") + public PulsarConnectorConfig setTlsAllowInsecureConnection(boolean tlsAllowInsecureConnection) { + this.tlsAllowInsecureConnection = tlsAllowInsecureConnection; + return this; + } + + public Boolean isTlsHostnameVerificationEnable() { + return tlsHostnameVerificationEnable; + } + + @Config("pulsar.tls-hostname-verification-enable") + public PulsarConnectorConfig setTlsHostnameVerificationEnable(boolean tlsHostnameVerificationEnable) { + this.tlsHostnameVerificationEnable = tlsHostnameVerificationEnable; + return this; + } + + public String getTlsTrustCertsFilePath() { + return tlsTrustCertsFilePath; + } + + @Config("pulsar.tls-trust-cert-file-path") + public PulsarConnectorConfig setTlsTrustCertsFilePath(String tlsTrustCertsFilePath) { + this.tlsTrustCertsFilePath = tlsTrustCertsFilePath; return this; } @NotNull public PulsarAdmin getPulsarAdmin() throws PulsarClientException { if (this.pulsarAdmin == null) { - this.pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(getBrokerServiceUrl()).build(); + PulsarAdminBuilder builder = PulsarAdmin.builder(); + + if (getAuthPlugin() != null) { + builder.authentication(getAuthPlugin(), getAuthParams()); + } + + if (isTlsAllowInsecureConnection() != null) { + builder.allowTlsInsecureConnection(isTlsAllowInsecureConnection()); + } + + if (isTlsHostnameVerificationEnable() != null) { + builder.enableTlsHostnameVerification(isTlsHostnameVerificationEnable()); + } + + if (getTlsTrustCertsFilePath() != null) { + builder.tlsTrustCertsFilePath(getTlsTrustCertsFilePath()); + } + + this.pulsarAdmin = builder.serviceHttpUrl(getBrokerServiceUrl()).build(); } return this.pulsarAdmin; } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java index 1ee01775624b9190678e399fa7c907838ec2f6d7..a43198271ebfe030963199e1b75b8e46908fc4c2 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java @@ -76,6 +76,7 @@ import java.util.stream.Collectors; import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; +import static com.facebook.presto.spi.StandardErrorCode.QUERY_REJECTED; import static com.facebook.presto.spi.type.DateType.DATE; import static com.facebook.presto.spi.type.TimeType.TIME; import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP; @@ -116,6 +117,9 @@ public class PulsarMetadata implements ConnectorMetadata { rewriteNamespaceDelimiterIfNeeded(namespace, pulsarConnectorConfig)).collect(Collectors.toList())); } } catch (PulsarAdminException e) { + if (e.getStatusCode() == 401) { + throw new PrestoException(QUERY_REJECTED, "Failed to get schemas from pulsar: Unauthorized"); + } throw new RuntimeException("Failed to get schemas from pulsar: " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e); } @@ -174,6 +178,9 @@ public class PulsarMetadata implements ConnectorMetadata { if (e.getStatusCode() == 404) { log.warn("Schema " + schemaNameOrNull + " does not exsit"); return builder.build(); + } else if (e.getStatusCode() == 401) { + throw new PrestoException(QUERY_REJECTED, + String.format("Failed to get tables/topics in %s: Unauthorized", schemaNameOrNull)); } throw new RuntimeException("Failed to get tables/topics in " + schemaNameOrNull + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e); @@ -277,6 +284,9 @@ public class PulsarMetadata implements ConnectorMetadata { } catch (PulsarAdminException e) { if (e.getStatusCode() == 404) { throw new PrestoException(NOT_FOUND, "Schema " + namespace + " does not exist"); + } else if (e.getStatusCode() == 401) { + throw new PrestoException(QUERY_REJECTED, + String.format("Failed to get topics in schema %s: Unauthorized", namespace)); } throw new RuntimeException("Failed to get topics in schema " + namespace + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e); @@ -297,8 +307,13 @@ public class PulsarMetadata implements ConnectorMetadata { if (e.getStatusCode() == 404) { // to indicate that we can't read from topic because there is no schema return null; + } else if (e.getStatusCode() == 401) { + throw new PrestoException(QUERY_REJECTED, + String.format("Failed to get pulsar topic schema information for topic %s/%s: Unauthorized", + namespace, schemaTableName.getTableName())); } - throw new RuntimeException("Failed to get schema information for topic " + + throw new RuntimeException("Failed to get pulsar topic schema information for topic " + String.format("%s/%s", namespace, schemaTableName.getTableName()) + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e); } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java index 15192e43d9c1d1f6df38b740cf8248cb2150a173..bb1dea22377888668029e38b29597acd97ffd354 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java @@ -23,12 +23,12 @@ import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorSplitSource; import com.facebook.presto.spi.ConnectorTableLayoutHandle; import com.facebook.presto.spi.FixedSplitSource; +import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.connector.ConnectorSplitManager; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; import com.facebook.presto.spi.predicate.Domain; import com.facebook.presto.spi.predicate.Range; import com.facebook.presto.spi.predicate.TupleDomain; -import com.facebook.presto.spi.type.SqlTimestampWithTimeZone; import com.google.common.annotations.VisibleForTesting; import io.airlift.log.Logger; import lombok.Data; @@ -57,6 +57,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; +import static com.facebook.presto.spi.StandardErrorCode.QUERY_REJECTED; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; import static org.apache.bookkeeper.mledger.ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries; @@ -105,7 +106,13 @@ public class PulsarSplitManager implements ConnectorSplitManager { schemaInfo = this.pulsarAdmin.schemas().getSchemaInfo( String.format("%s/%s", namespace, tableHandle.getTableName())); } catch (PulsarAdminException e) { - throw new RuntimeException("Failed to get schema for topic " + if (e.getStatusCode() == 401) { + throw new PrestoException(QUERY_REJECTED, + String.format("Failed to get pulsar topic schema for topic %s/%s: Unauthorized", + namespace, tableHandle.getTableName())); + } + + throw new RuntimeException("Failed to get pulsar topic schema for topic " + String.format("%s/%s", namespace, tableHandle.getTableName()) + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e); } @@ -143,6 +150,11 @@ public class PulsarSplitManager implements ConnectorSplitManager { try { numPartitions = (this.pulsarAdmin.topics().getPartitionedTopicMetadata(topicName.toString())).partitions; } catch (PulsarAdminException e) { + if (e.getStatusCode() == 401) { + throw new PrestoException(QUERY_REJECTED, + String.format("Failed to get metadata for partitioned topic %s: Unauthorized", topicName)); + } + throw new RuntimeException("Failed to get metadata for partitioned topic " + topicName + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(),e); }