TestPulsarMetadata.java 18.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.sql.presto;

import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SchemaTablePrefix;
import com.facebook.presto.spi.TableNotFoundException;
import io.airlift.log.Logger;
import org.apache.avro.Schema;
32
import org.apache.commons.lang3.StringUtils;
33 34 35
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;
36 37
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.core.Response;
38 39

import org.apache.pulsar.common.schema.SchemaType;
40 41 42 43 44 45 46 47
import org.testng.annotations.Test;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
48
import java.util.stream.Collectors;
49 50 51

import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
M
Matteo Merli 已提交
52
import static org.mockito.Mockito.eq;
53 54
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
55 56 57 58 59
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
60 61 62 63 64 65

@Test(singleThreaded = true)
public class TestPulsarMetadata extends TestPulsarConnector {

    private static final Logger log = Logger.get(TestPulsarMetadata.class);

66 67 68
    @Test(dataProvider = "rewriteNamespaceDelimiter")
    public void testListSchemaNames(String delimiter) {
        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
69 70 71
        List<String> schemas = this.pulsarMetadata.listSchemaNames(mock(ConnectorSession.class));


72 73 74 75 76 77 78 79 80 81 82 83 84
        if (StringUtils.isBlank(delimiter)) {
            String[] expectedSchemas = {NAMESPACE_NAME_1.toString(), NAMESPACE_NAME_2.toString(),
                    NAMESPACE_NAME_3.toString(), NAMESPACE_NAME_4.toString()};
            assertEquals(new HashSet<>(schemas), new HashSet<>(Arrays.asList(expectedSchemas)));
        } else {
            String[] expectedSchemas = {
                    PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded(NAMESPACE_NAME_1.toString(), pulsarConnectorConfig),
                    PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded(NAMESPACE_NAME_2.toString(), pulsarConnectorConfig),
                    PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded(NAMESPACE_NAME_3.toString(), pulsarConnectorConfig),
                    PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded(NAMESPACE_NAME_4.toString(), pulsarConnectorConfig)};
            assertEquals(new HashSet<>(schemas), new HashSet<>(Arrays.asList(expectedSchemas)));
        }
    }
85

86 87 88
    @Test(dataProvider = "rewriteNamespaceDelimiter")
    public void testGetTableHandle(String delimiter) {
        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
89 90 91 92 93
        SchemaTableName schemaTableName = new SchemaTableName(TOPIC_1.getNamespace(), TOPIC_1.getLocalName());

        ConnectorTableHandle connectorTableHandle
                = this.pulsarMetadata.getTableHandle(mock(ConnectorSession.class), schemaTableName);

94
        assertTrue(connectorTableHandle instanceof PulsarTableHandle);
95 96 97

        PulsarTableHandle pulsarTableHandle = (PulsarTableHandle) connectorTableHandle;

98 99 100 101
        assertEquals(pulsarTableHandle.getConnectorId(), pulsarConnectorId.toString());
        assertEquals(pulsarTableHandle.getSchemaName(), TOPIC_1.getNamespace());
        assertEquals(pulsarTableHandle.getTableName(), TOPIC_1.getLocalName());
        assertEquals(pulsarTableHandle.getTopicName(), TOPIC_1.getLocalName());
102 103
    }

104 105 106
    @Test(dataProvider = "rewriteNamespaceDelimiter")
    public void testGetTableMetadata(String delimiter) {
        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
107
        List<TopicName> allTopics = new LinkedList<>();
108
        allTopics.addAll(topicNames.stream().filter(topicName -> !topicName.equals(NON_SCHEMA_TOPIC)).collect(Collectors.toList()));
109 110 111 112 113 114 115 116 117 118 119 120 121
        allTopics.addAll(partitionedTopicNames);

        for (TopicName topic : allTopics) {
            PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(
                    topic.toString(),
                    topic.getNamespace(),
                    topic.getLocalName(),
                    topic.getLocalName()
            );

            ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
                    pulsarTableHandle);

122 123 124
            assertEquals(tableMetadata.getTable().getSchemaName(), topic.getNamespace());
            assertEquals(tableMetadata.getTable().getTableName(), topic.getLocalName());
            assertEquals(tableMetadata.getColumns().size(),
125
                    fooColumnHandles.size());
126

127
            List<String> fieldNames = new LinkedList<>(fooFieldNames.keySet());
128 129 130 131 132 133 134

            for (PulsarInternalColumn internalField : PulsarInternalColumn.getInternalFields()) {
                fieldNames.add(internalField.getName());
            }

            for (ColumnMetadata column : tableMetadata.getColumns()) {
                if (PulsarInternalColumn.getInternalFieldsMap().containsKey(column.getName())) {
135
                    assertEquals(column.getComment(),
136 137 138 139 140 141 142
                            PulsarInternalColumn.getInternalFieldsMap()
                                    .get(column.getName()).getColumnMetadata(true).getComment());
                }

                fieldNames.remove(column.getName());
            }

143
            assertTrue(fieldNames.isEmpty());
144 145 146
        }
    }

147 148 149
    @Test(dataProvider = "rewriteNamespaceDelimiter")
    public void testGetTableMetadataWrongSchema(String delimiter) {
        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
150 151 152 153 154 155 156 157 158 159
        PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(
                pulsarConnectorId.toString(),
                "wrong-tenant/wrong-ns",
                TOPIC_1.getLocalName(),
                TOPIC_1.getLocalName()
        );

        try {
            ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
                    pulsarTableHandle);
160
            fail("Invalid schema should have generated an exception");
161
        } catch (PrestoException e) {
162 163
            assertEquals(e.getErrorCode(), NOT_FOUND.toErrorCode());
            assertEquals(e.getMessage(), "Schema wrong-tenant/wrong-ns does not exist");
164 165 166
        }
    }

167 168 169
    @Test(dataProvider = "rewriteNamespaceDelimiter")
    public void testGetTableMetadataWrongTable(String delimiter) {
        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
170 171 172 173 174 175 176 177 178 179
        PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(
                pulsarConnectorId.toString(),
                TOPIC_1.getNamespace(),
                "wrong-topic",
                "wrong-topic"
        );

        try {
            ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
                    pulsarTableHandle);
180
            fail("Invalid table should have generated an exception");
181
        } catch (TableNotFoundException e) {
182 183
            assertEquals(e.getErrorCode(), NOT_FOUND.toErrorCode());
            assertEquals(e.getMessage(), "Table 'tenant-1/ns-1.wrong-topic' not found");
184 185 186
        }
    }

187 188 189
    @Test(dataProvider = "rewriteNamespaceDelimiter")
    public void testGetTableMetadataTableNoSchema(String delimiter) throws PulsarAdminException {
        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
190 191 192 193 194 195 196 197 198 199
        when(this.schemas.getSchemaInfo(eq(TOPIC_1.getSchemaName()))).thenThrow(
                new PulsarAdminException(new ClientErrorException(Response.Status.NOT_FOUND)));

        PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(
                pulsarConnectorId.toString(),
                TOPIC_1.getNamespace(),
                TOPIC_1.getLocalName(),
                TOPIC_1.getLocalName()
        );

200 201 202

        ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
                pulsarTableHandle);
203
        assertEquals(tableMetadata.getColumns().size(), PulsarInternalColumn.getInternalFields().size() + 1);
204 205
    }

206 207 208
    @Test(dataProvider = "rewriteNamespaceDelimiter")
    public void testGetTableMetadataTableBlankSchema(String delimiter) throws PulsarAdminException {
        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
209 210
        SchemaInfo badSchemaInfo = new SchemaInfo();
        badSchemaInfo.setSchema(new byte[0]);
211
        badSchemaInfo.setType(SchemaType.AVRO);
212 213 214 215 216 217 218 219 220 221 222 223
        when(this.schemas.getSchemaInfo(eq(TOPIC_1.getSchemaName()))).thenReturn(badSchemaInfo);

        PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(
                pulsarConnectorId.toString(),
                TOPIC_1.getNamespace(),
                TOPIC_1.getLocalName(),
                TOPIC_1.getLocalName()
        );

        try {
            ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
                    pulsarTableHandle);
224
            fail("Table without schema should have generated an exception");
225
        } catch (PrestoException e) {
226 227
            assertEquals(e.getErrorCode(), NOT_SUPPORTED.toErrorCode());
            assertEquals(e.getMessage(),
228 229 230 231
                    "Topic persistent://tenant-1/ns-1/topic-1 does not have a valid schema");
        }
    }

232 233 234
    @Test(dataProvider = "rewriteNamespaceDelimiter")
    public void testGetTableMetadataTableInvalidSchema(String delimiter) throws PulsarAdminException {
        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
235 236
        SchemaInfo badSchemaInfo = new SchemaInfo();
        badSchemaInfo.setSchema("foo".getBytes());
237
        badSchemaInfo.setType(SchemaType.AVRO);
238 239 240 241 242 243 244 245 246 247 248 249
        when(this.schemas.getSchemaInfo(eq(TOPIC_1.getSchemaName()))).thenReturn(badSchemaInfo);

        PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(
                pulsarConnectorId.toString(),
                TOPIC_1.getNamespace(),
                TOPIC_1.getLocalName(),
                TOPIC_1.getLocalName()
        );

        try {
            ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
                    pulsarTableHandle);
250
            fail("Table without schema should have generated an exception");
251
        } catch (PrestoException e) {
252 253
            assertEquals(e.getErrorCode(), NOT_SUPPORTED.toErrorCode());
            assertEquals(e.getMessage(),
254 255 256 257
                    "Topic persistent://tenant-1/ns-1/topic-1 does not have a valid schema");
        }
    }

258 259 260
    @Test(dataProvider = "rewriteNamespaceDelimiter")
    public void testListTable(String delimiter) {
        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
261 262
        assertTrue(this.pulsarMetadata.listTables(mock(ConnectorSession.class), null).isEmpty());
        assertTrue(this.pulsarMetadata.listTables(mock(ConnectorSession.class), "wrong-tenant/wrong-ns")
263 264
                .isEmpty());

265 266 267 268
        SchemaTableName[] expectedTopics1 = {new SchemaTableName(
            TOPIC_4.getNamespace(), TOPIC_4.getLocalName()),
            new SchemaTableName(PARTITIONED_TOPIC_4.getNamespace(), PARTITIONED_TOPIC_4.getLocalName())
        };
269
        assertEquals(this.pulsarMetadata.listTables(mock(ConnectorSession.class),
270 271 272
                NAMESPACE_NAME_3.toString()), Arrays.asList(expectedTopics1));

        SchemaTableName[] expectedTopics2 = {new SchemaTableName(TOPIC_5.getNamespace(), TOPIC_5.getLocalName()),
273 274 275 276
                new SchemaTableName(TOPIC_6.getNamespace(), TOPIC_6.getLocalName()),
            new SchemaTableName(PARTITIONED_TOPIC_5.getNamespace(), PARTITIONED_TOPIC_5.getLocalName()),
            new SchemaTableName(PARTITIONED_TOPIC_6.getNamespace(), PARTITIONED_TOPIC_6.getLocalName()),
        };
277
        assertEquals(new HashSet<>(this.pulsarMetadata.listTables(mock(ConnectorSession.class),
278
            NAMESPACE_NAME_4.toString())), new HashSet<>(Arrays.asList(expectedTopics2)));
279 280
    }

281 282 283
    @Test(dataProvider = "rewriteNamespaceDelimiter")
    public void testGetColumnHandles(String delimiter) {
        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
284 285 286 287 288
        PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(pulsarConnectorId.toString(), TOPIC_1.getNamespace(),
                TOPIC_1.getLocalName(), TOPIC_1.getLocalName());
        Map<String, ColumnHandle> columnHandleMap
                = new HashMap<>(this.pulsarMetadata.getColumnHandles(mock(ConnectorSession.class), pulsarTableHandle));

289
        List<String> fieldNames = new LinkedList<>(fooFieldNames.keySet());
290 291 292 293 294 295

        for (PulsarInternalColumn internalField : PulsarInternalColumn.getInternalFields()) {
            fieldNames.add(internalField.getName());
        }

        for (String field : fieldNames) {
296
            assertNotNull(columnHandleMap.get(field));
297 298 299
            PulsarColumnHandle pulsarColumnHandle = (PulsarColumnHandle) columnHandleMap.get(field);
            PulsarInternalColumn pulsarInternalColumn = PulsarInternalColumn.getInternalFieldsMap().get(field);
            if (pulsarInternalColumn != null) {
300
                assertEquals(pulsarColumnHandle,
301 302 303 304
                        pulsarInternalColumn.getColumnHandle(pulsarConnectorId.toString(), false));
            } else {
                Schema schema = new Schema.Parser().parse(new String(topicsToSchemas.get(TOPIC_1.getSchemaName())
                        .getSchema()));
305 306 307 308 309 310
                assertEquals(pulsarColumnHandle.getConnectorId(), pulsarConnectorId.toString());
                assertEquals(pulsarColumnHandle.getName(), field);
                assertEquals(pulsarColumnHandle.getPositionIndices(), fooPositionIndices.get(field));
                assertEquals(pulsarColumnHandle.getFieldNames(), fooFieldNames.get(field));
                assertEquals(pulsarColumnHandle.getType(), fooTypes.get(field));
                assertFalse(pulsarColumnHandle.isHidden());
311 312 313
            }
            columnHandleMap.remove(field);
        }
314
        assertTrue(columnHandleMap.isEmpty());
315 316
    }

317 318 319
    @Test(dataProvider = "rewriteNamespaceDelimiter")
    public void testListTableColumns(String delimiter) {
        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
320 321 322 323
        Map<SchemaTableName, List<ColumnMetadata>> tableColumnsMap
                = this.pulsarMetadata.listTableColumns(mock(ConnectorSession.class),
                new SchemaTablePrefix(TOPIC_1.getNamespace()));

324
        assertEquals(tableColumnsMap.size(), 4);
325 326
        List<ColumnMetadata> columnMetadataList
                = tableColumnsMap.get(new SchemaTableName(TOPIC_1.getNamespace(), TOPIC_1.getLocalName()));
327 328
        assertNotNull(columnMetadataList);
        assertEquals(columnMetadataList.size(),
329
                fooColumnHandles.size());
330

331
        List<String> fieldNames = new LinkedList<>(fooFieldNames.keySet());
332 333 334 335 336 337 338

        for (PulsarInternalColumn internalField : PulsarInternalColumn.getInternalFields()) {
            fieldNames.add(internalField.getName());
        }

        for (ColumnMetadata column : columnMetadataList) {
            if (PulsarInternalColumn.getInternalFieldsMap().containsKey(column.getName())) {
339
                assertEquals(column.getComment(),
340 341 342 343 344 345 346
                        PulsarInternalColumn.getInternalFieldsMap()
                                .get(column.getName()).getColumnMetadata(true).getComment());
            }

            fieldNames.remove(column.getName());
        }

347
        assertTrue(fieldNames.isEmpty());
348 349

        columnMetadataList = tableColumnsMap.get(new SchemaTableName(TOPIC_2.getNamespace(), TOPIC_2.getLocalName()));
350 351
        assertNotNull(columnMetadataList);
        assertEquals(columnMetadataList.size(),
352
                fooColumnHandles.size());
353

354
        fieldNames = new LinkedList<>(fooFieldNames.keySet());
355 356 357 358 359 360 361

        for (PulsarInternalColumn internalField : PulsarInternalColumn.getInternalFields()) {
            fieldNames.add(internalField.getName());
        }

        for (ColumnMetadata column : columnMetadataList) {
            if (PulsarInternalColumn.getInternalFieldsMap().containsKey(column.getName())) {
362
                assertEquals(column.getComment(),
363 364 365 366 367 368 369
                        PulsarInternalColumn.getInternalFieldsMap()
                                .get(column.getName()).getColumnMetadata(true).getComment());
            }

            fieldNames.remove(column.getName());
        }

370
        assertTrue(fieldNames.isEmpty());
371 372 373 374 375 376

        // test table and schema
        tableColumnsMap
                = this.pulsarMetadata.listTableColumns(mock(ConnectorSession.class),
                new SchemaTablePrefix(TOPIC_4.getNamespace(), TOPIC_4.getLocalName()));

377
        assertEquals(tableColumnsMap.size(), 1);
378
        columnMetadataList = tableColumnsMap.get(new SchemaTableName(TOPIC_4.getNamespace(), TOPIC_4.getLocalName()));
379 380
        assertNotNull(columnMetadataList);
        assertEquals(columnMetadataList.size(),
381
                fooColumnHandles.size());
382

383
        fieldNames = new LinkedList<>(fooFieldNames.keySet());
384 385 386 387 388 389 390

        for (PulsarInternalColumn internalField : PulsarInternalColumn.getInternalFields()) {
            fieldNames.add(internalField.getName());
        }

        for (ColumnMetadata column : columnMetadataList) {
            if (PulsarInternalColumn.getInternalFieldsMap().containsKey(column.getName())) {
391
                assertEquals(column.getComment(),
392 393 394 395 396 397 398
                        PulsarInternalColumn.getInternalFieldsMap()
                                .get(column.getName()).getColumnMetadata(true).getComment());
            }

            fieldNames.remove(column.getName());
        }

399
        assertTrue(fieldNames.isEmpty());
400 401
    }
}