提交 8789f766 编写于 作者: L lipenghui 提交者: Jia Zhai

[pulsar-sql] Make partition as internal column (#4888)

Fixes #4785

### Motivation

1. Stop return partition name in table list, just return the partitioned topic name in table list. This will avoid huge tables while user create large number of partition.
2. Make partition as internal column, provide users with the ability to get which partition data in and filtration based on partition. For example:
```
SELECT * FROM "my-table" WHERE "__partition__" = 0;
SELECT * FROM "my-table" WHERE "__partition__" in (2,3);
SELECT * FROM "my-table" WHERE "__partition__" < 1;
```
### Modifications

1. Add "__partition__" internal column.
2. Add domain handle for "__partition__".

### Verifying this change

Added new unit test to verify this change

### Does this pull request potentially affect one of the following parts:

*If `yes` was chosen, please highlight the changes*

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API: (yes)
  - The schema: (no)
  - The default values of configurations: (no)
  - The wire protocol: ( no)
  - The rest endpoints: (no)
  - The admin cli options: (no)
  - Anything that affects deployment: (no)

### Documentation

  - Does this pull request introduce a new feature? (yes)

(cherry picked from commit 5adc522e)
上级 b3397fb7
......@@ -23,6 +23,7 @@ import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.Objects.requireNonNull;
import com.facebook.presto.spi.type.BigintType;
import com.facebook.presto.spi.type.IntegerType;
import com.facebook.presto.spi.type.TimestampType;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.VarcharType;
......@@ -40,6 +41,21 @@ import org.apache.pulsar.common.api.raw.RawMessage;
*/
public abstract class PulsarInternalColumn {
/**
* Internal column representing the partition.
*/
public static class PartitionColumn extends PulsarInternalColumn {
PartitionColumn(String name, Type type, String comment) {
super(name, type, comment);
}
@Override
public Object getData(RawMessage message) {
return null;
}
}
/**
* Internal column representing the event time.
*/
......@@ -151,6 +167,9 @@ public abstract class PulsarInternalColumn {
}
}
public static final PartitionColumn PARTITION = new PartitionColumn("__partition__", IntegerType.INTEGER,
"The partition number which the message belongs to");
public static final PulsarInternalColumn EVENT_TIME = new EventTimeColumn("__event_time__", TimestampType
.TIMESTAMP, "Application defined timestamp in milliseconds of when the event occurred");
......@@ -207,7 +226,8 @@ public abstract class PulsarInternalColumn {
}
public static Set<PulsarInternalColumn> getInternalFields() {
return ImmutableSet.of(EVENT_TIME, PUBLISH_TIME, MESSAGE_ID, SEQUENCE_ID, PRODUCER_NAME, KEY, PROPERTIES);
return ImmutableSet.of(PARTITION, EVENT_TIME, PUBLISH_TIME, MESSAGE_ID, SEQUENCE_ID, PRODUCER_NAME, KEY,
PROPERTIES);
}
public static Map<String, PulsarInternalColumn> getInternalFieldsMap() {
......
......@@ -189,8 +189,11 @@ public class PulsarMetadata implements ConnectorMetadata {
+ ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
if (pulsarTopicList != null) {
pulsarTopicList.forEach(topic -> builder.add(
new SchemaTableName(schemaNameOrNull, TopicName.get(topic).getLocalName())));
pulsarTopicList.stream()
.map(topic -> TopicName.get(topic).getPartitionedTopicName())
.distinct()
.forEach(topic -> builder.add(new SchemaTableName(schemaNameOrNull,
TopicName.get(topic).getLocalName())));
}
}
}
......
......@@ -58,6 +58,7 @@ import org.apache.pulsar.common.api.raw.MessageParser;
import org.apache.pulsar.common.api.raw.RawMessage;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.sql.presto.PulsarInternalColumn.PartitionColumn;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.SpscArrayQueue;
......@@ -92,6 +93,7 @@ public class PulsarRecordCursor implements RecordCursor {
// are empty or not
private final long splitSize;
private long entriesProcessed = 0;
private int partition = -1;
private static final Logger log = Logger.get(PulsarRecordCursor.class);
......@@ -128,6 +130,7 @@ public class PulsarRecordCursor implements RecordCursor {
PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) {
this.columnHandles = columnHandles;
this.pulsarSplit = pulsarSplit;
this.partition = TopicName.getPartitionIndex(pulsarSplit.getTableName());
this.pulsarConnectorConfig = pulsarConnectorConfig;
this.maxBatchSize = pulsarConnectorConfig.getMaxEntryReadBatchSize();
this.messageQueue = new SpscArrayQueue<>(pulsarConnectorConfig.getMaxSplitMessageQueueSize());
......@@ -427,7 +430,11 @@ public class PulsarRecordCursor implements RecordCursor {
if (pulsarColumnHandle.isInternal()) {
String fieldName = this.columnHandles.get(fieldIndex).getName();
PulsarInternalColumn pulsarInternalColumn = this.internalColumnMap.get(fieldName);
data = pulsarInternalColumn.getData(this.currentMessage);
if (pulsarInternalColumn instanceof PartitionColumn) {
data = this.partition;
} else {
data = pulsarInternalColumn.getData(this.currentMessage);
}
} else {
data = this.schemaHandler.extractField(fieldIndex, this.currentRecord);
}
......
......@@ -39,6 +39,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import io.airlift.log.Logger;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
......@@ -136,45 +137,84 @@ public class PulsarSplitManager implements ConnectorSplitManager {
@VisibleForTesting
Collection<PulsarSplit> getSplitsPartitionedTopic(int numSplits, TopicName topicName, PulsarTableHandle
tableHandle, SchemaInfo schemaInfo, TupleDomain<ColumnHandle> tupleDomain) throws Exception {
int numPartitions;
try {
numPartitions = (this.pulsarAdmin.topics().getPartitionedTopicMetadata(topicName.toString())).partitions;
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 401) {
throw new PrestoException(QUERY_REJECTED,
String.format("Failed to get metadata for partitioned topic %s: Unauthorized", topicName));
}
throw new RuntimeException("Failed to get metadata for partitioned topic "
+ topicName + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
List<Integer> predicatedPartitions = getPredicatedPartitions(topicName, tupleDomain);
if (log.isDebugEnabled()) {
log.debug("Partition filter result %s", predicatedPartitions);
}
int actualNumSplits = Math.max(numPartitions, numSplits);
int actualNumSplits = Math.max(predicatedPartitions.size(), numSplits);
int splitsPerPartition = actualNumSplits / numPartitions;
int splitsPerPartition = actualNumSplits / predicatedPartitions.size();
int splitRemainder = actualNumSplits % numPartitions;
int splitRemainder = actualNumSplits % predicatedPartitions.size();
ManagedLedgerFactory managedLedgerFactory = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig)
.getManagedLedgerFactory();
List<PulsarSplit> splits = new LinkedList<>();
for (int i = 0; i < numPartitions; i++) {
for (int i = 0; i < predicatedPartitions.size(); i++) {
int splitsForThisPartition = (splitRemainder > i) ? splitsPerPartition + 1 : splitsPerPartition;
splits.addAll(
getSplitsForTopic(
topicName.getPartition(i).getPersistenceNamingEncoding(),
managedLedgerFactory,
splitsForThisPartition,
tableHandle,
schemaInfo,
topicName.getPartition(i).getLocalName(),
tupleDomain));
getSplitsForTopic(
topicName.getPartition(predicatedPartitions.get(i)).getPersistenceNamingEncoding(),
managedLedgerFactory,
splitsForThisPartition,
tableHandle,
schemaInfo,
topicName.getPartition(predicatedPartitions.get(i)).getLocalName(),
tupleDomain));
}
return splits;
}
private List<Integer> getPredicatedPartitions(TopicName topicName, TupleDomain<ColumnHandle> tupleDomain) {
int numPartitions;
try {
numPartitions = (this.pulsarAdmin.topics().getPartitionedTopicMetadata(topicName.toString())).partitions;
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 401) {
throw new PrestoException(QUERY_REJECTED,
String.format("Failed to get metadata for partitioned topic %s: Unauthorized", topicName));
}
throw new RuntimeException("Failed to get metadata for partitioned topic "
+ topicName + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
List<Integer> predicatePartitions = new ArrayList<>();
if (tupleDomain.getDomains().isPresent()) {
Domain domain = tupleDomain.getDomains().get().get(PulsarInternalColumn.PARTITION
.getColumnHandle(connectorId, false));
if (domain != null) {
domain.getValues().getValuesProcessor().consume(
ranges -> domain.getValues().getRanges().getOrderedRanges().forEach(range -> {
Integer low = 0;
Integer high = numPartitions;
if (!range.getLow().isLowerUnbounded() && range.getLow().getValueBlock().isPresent()) {
low = range.getLow().getValueBlock().get().getInt(0, 0);
}
if (!range.getHigh().isLowerUnbounded() && range.getHigh().getValueBlock().isPresent()) {
high = range.getHigh().getValueBlock().get().getInt(0, 0);
}
for (int i = low; i <= high; i++) {
predicatePartitions.add(i);
}
}),
discreteValues -> {},
allOrNone -> {});
} else {
for (int i = 0; i < numPartitions; i++) {
predicatePartitions.add(i);
}
}
} else {
for (int i = 0; i < numPartitions; i++) {
predicatePartitions.add(i);
}
}
return predicatePartitions;
}
@VisibleForTesting
Collection<PulsarSplit> getSplitsNonPartitionedTopic(int numSplits, TopicName topicName,
PulsarTableHandle tableHandle, SchemaInfo schemaInfo, TupleDomain<ColumnHandle> tupleDomain)
......
......@@ -71,6 +71,7 @@ import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
......@@ -130,6 +131,7 @@ public abstract class TestPulsarConnector {
protected static final TopicName TOPIC_5 = TopicName.get("persistent", NAMESPACE_NAME_4, "topic-1");
protected static final TopicName TOPIC_6 = TopicName.get("persistent", NAMESPACE_NAME_4, "topic-2");
protected static final TopicName PARTITIONED_TOPIC_1 = TopicName.get("persistent", NAMESPACE_NAME_1,
"partitioned-topic-1");
protected static final TopicName PARTITIONED_TOPIC_2 = TopicName.get("persistent", NAMESPACE_NAME_1,
......@@ -216,6 +218,7 @@ public abstract class TestPulsarConnector {
partitionedTopicNames.add(PARTITIONED_TOPIC_5);
partitionedTopicNames.add(PARTITIONED_TOPIC_6);
partitionedTopicsToPartitions = new HashMap<>();
partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_1.toString(), 2);
partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_2.toString(), 3);
......@@ -270,6 +273,7 @@ public abstract class TestPulsarConnector {
topicsToNumEntries.put(TOPIC_4.getSchemaName(), 12345L);
topicsToNumEntries.put(TOPIC_5.getSchemaName(), 8000L);
topicsToNumEntries.put(TOPIC_6.getSchemaName(), 1L);
topicsToNumEntries.put(PARTITIONED_TOPIC_1.getSchemaName(), 1233L);
topicsToNumEntries.put(PARTITIONED_TOPIC_2.getSchemaName(), 8000L);
topicsToNumEntries.put(PARTITIONED_TOPIC_3.getSchemaName(), 100L);
......@@ -649,9 +653,15 @@ public abstract class TestPulsarConnector {
}
protected static List<String> getTopics(String ns) {
return topicNames.stream()
List<String> topics = new ArrayList<>(topicNames.stream()
.filter(topicName -> topicName.getNamespace().equals(ns))
.map(TopicName::toString).collect(Collectors.toList());
.map(TopicName::toString).collect(Collectors.toList()));
partitionedTopicNames.stream().filter(topicName -> topicName.getNamespace().equals(ns)).forEach(topicName -> {
for (Integer i = 0; i < partitionedTopicsToPartitions.get(topicName.toString()); i++) {
topics.add(TopicName.get(topicName + "-partition-" + i).toString());
}
});
return topics;
}
protected static List<String> getPartitionedTopics(String ns) {
......
......@@ -262,14 +262,20 @@ public class TestPulsarMetadata extends TestPulsarConnector {
assertTrue(this.pulsarMetadata.listTables(mock(ConnectorSession.class), "wrong-tenant/wrong-ns")
.isEmpty());
SchemaTableName[] expectedTopics1 = {new SchemaTableName(TOPIC_4.getNamespace(), TOPIC_4.getLocalName())};
SchemaTableName[] expectedTopics1 = {new SchemaTableName(
TOPIC_4.getNamespace(), TOPIC_4.getLocalName()),
new SchemaTableName(PARTITIONED_TOPIC_4.getNamespace(), PARTITIONED_TOPIC_4.getLocalName())
};
assertEquals(this.pulsarMetadata.listTables(mock(ConnectorSession.class),
NAMESPACE_NAME_3.toString()), Arrays.asList(expectedTopics1));
SchemaTableName[] expectedTopics2 = {new SchemaTableName(TOPIC_5.getNamespace(), TOPIC_5.getLocalName()),
new SchemaTableName(TOPIC_6.getNamespace(), TOPIC_6.getLocalName())};
new SchemaTableName(TOPIC_6.getNamespace(), TOPIC_6.getLocalName()),
new SchemaTableName(PARTITIONED_TOPIC_5.getNamespace(), PARTITIONED_TOPIC_5.getLocalName()),
new SchemaTableName(PARTITIONED_TOPIC_6.getNamespace(), PARTITIONED_TOPIC_6.getLocalName()),
};
assertEquals(new HashSet<>(this.pulsarMetadata.listTables(mock(ConnectorSession.class),
NAMESPACE_NAME_4.toString())), new HashSet<>(Arrays.asList(expectedTopics2)));
NAMESPACE_NAME_4.toString())), new HashSet<>(Arrays.asList(expectedTopics2)));
}
@Test(dataProvider = "rewriteNamespaceDelimiter")
......@@ -315,7 +321,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
= this.pulsarMetadata.listTableColumns(mock(ConnectorSession.class),
new SchemaTablePrefix(TOPIC_1.getNamespace()));
assertEquals(tableColumnsMap.size(), 2);
assertEquals(tableColumnsMap.size(), 4);
List<ColumnMetadata> columnMetadataList
= tableColumnsMap.get(new SchemaTableName(TOPIC_1.getNamespace(), TOPIC_1.getLocalName()));
assertNotNull(columnMetadataList);
......
......@@ -31,6 +31,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Collection;
......@@ -40,13 +41,16 @@ import java.util.Map;
import java.util.stream.Collectors;
import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
import static com.facebook.presto.spi.type.IntegerType.INTEGER;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@Test(singleThreaded = true)
public class TestPulsarSplitManager extends TestPulsarConnector {
......@@ -288,5 +292,91 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
}
}
@Test(dataProvider = "rewriteNamespaceDelimiter")
public void testPartitionFilter(String delimiter) throws Exception {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
for (TopicName topicName : partitionedTopicNames) {
setup();
log.info("!----- topic: %s -----!", topicName);
PulsarTableHandle pulsarTableHandle = mock(PulsarTableHandle.class);
when(pulsarTableHandle.getConnectorId()).thenReturn(pulsarConnectorId.toString());
when(pulsarTableHandle.getSchemaName()).thenReturn(topicName.getNamespace());
when(pulsarTableHandle.getTopicName()).thenReturn(topicName.getLocalName());
when(pulsarTableHandle.getTableName()).thenReturn(topicName.getLocalName());
// test single domain with equal low and high of "__partition__"
Map<ColumnHandle, Domain> domainMap = new HashMap<>();
Domain domain = Domain.create(ValueSet.ofRanges(Range.range(INTEGER, 0L, true,
0L, true)), false);
domainMap.put(PulsarInternalColumn.PARTITION.getColumnHandle(pulsarConnectorId.toString(), false), domain);
TupleDomain<ColumnHandle> tupleDomain = TupleDomain.withColumnDomains(domainMap);
Collection<PulsarSplit> splits = this.pulsarSplitManager.getSplitsPartitionedTopic(2, topicName, pulsarTableHandle,
schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain);
if (topicsToNumEntries.get(topicName.getSchemaName()) > 1) {
Assert.assertEquals(splits.size(), 2);
}
for (PulsarSplit split : splits) {
assertEquals(TopicName.getPartitionIndex(split.getTableName()), 0);
}
// test multiple domain with equal low and high of "__partition__"
domainMap.clear();
domain = Domain.create(ValueSet.ofRanges(
Range.range(INTEGER, 0L, true, 0L, true),
Range.range(INTEGER, 3L, true, 3L, true)),
false);
domainMap.put(PulsarInternalColumn.PARTITION.getColumnHandle(pulsarConnectorId.toString(), false), domain);
tupleDomain = TupleDomain.withColumnDomains(domainMap);
splits = this.pulsarSplitManager.getSplitsPartitionedTopic(1, topicName, pulsarTableHandle,
schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain);
if (topicsToNumEntries.get(topicName.getSchemaName()) > 1) {
Assert.assertEquals(splits.size(), 2);
}
for (PulsarSplit split : splits) {
assertTrue(TopicName.getPartitionIndex(split.getTableName()) == 0 || TopicName.getPartitionIndex(split.getTableName()) == 3);
}
// test single domain with unequal low and high of "__partition__"
domainMap.clear();
domain = Domain.create(ValueSet.ofRanges(
Range.range(INTEGER, 0L, true, 2L, true)),
false);
domainMap.put(PulsarInternalColumn.PARTITION.getColumnHandle(pulsarConnectorId.toString(), false), domain);
tupleDomain = TupleDomain.withColumnDomains(domainMap);
splits = this.pulsarSplitManager.getSplitsPartitionedTopic(2, topicName, pulsarTableHandle,
schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain);
if (topicsToNumEntries.get(topicName.getSchemaName()) > 1) {
Assert.assertEquals(splits.size(), 3);
}
for (PulsarSplit split : splits) {
assertTrue(TopicName.getPartitionIndex(split.getTableName()) == 0
|| TopicName.getPartitionIndex(split.getTableName()) == 1
|| TopicName.getPartitionIndex(split.getTableName()) == 2);
}
// test multiple domain with unequal low and high of "__partition__"
domainMap.clear();
domain = Domain.create(ValueSet.ofRanges(
Range.range(INTEGER, 0L, true, 1L, true),
Range.range(INTEGER, 3L, true, 4L, true)),
false);
domainMap.put(PulsarInternalColumn.PARTITION.getColumnHandle(pulsarConnectorId.toString(), false), domain);
tupleDomain = TupleDomain.withColumnDomains(domainMap);
splits = this.pulsarSplitManager.getSplitsPartitionedTopic(2, topicName, pulsarTableHandle,
schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain);
if (topicsToNumEntries.get(topicName.getSchemaName()) > 1) {
Assert.assertEquals(splits.size(), 4);
}
for (PulsarSplit split : splits) {
assertTrue(TopicName.getPartitionIndex(split.getTableName()) == 0
|| TopicName.getPartitionIndex(split.getTableName()) == 1
|| TopicName.getPartitionIndex(split.getTableName()) == 3
|| TopicName.getPartitionIndex(split.getTableName()) == 4);
}
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册