提交 2c97da5e 编写于 作者: V vzhikserg 提交者: Jia Zhai

Cleanup tests in the presto module (#4683)

* Add static import statements for Assert to simplify the test in the presto module

* Use the preferred way of the schema's creation. The predicates and functions were converted to lambda

(cherry picked from commit c421ca6e)
上级 eceb2c97
......@@ -75,7 +75,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static com.facebook.presto.spi.type.DateType.DATE;
......@@ -224,21 +223,19 @@ public abstract class TestPulsarConnector {
partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_6.toString(), 7);
topicsToSchemas = new HashMap<>();
topicsToSchemas.put(TOPIC_1.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
topicsToSchemas.put(TOPIC_2.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
topicsToSchemas.put(TOPIC_3.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
topicsToSchemas.put(TOPIC_4.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
topicsToSchemas.put(TOPIC_5.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
topicsToSchemas.put(TOPIC_6.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
topicsToSchemas.put(PARTITIONED_TOPIC_1.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
topicsToSchemas.put(PARTITIONED_TOPIC_2.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
topicsToSchemas.put(PARTITIONED_TOPIC_3.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
topicsToSchemas.put(PARTITIONED_TOPIC_4.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
topicsToSchemas.put(PARTITIONED_TOPIC_5.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
topicsToSchemas.put(PARTITIONED_TOPIC_6.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
topicsToSchemas.put(TOPIC_1.getSchemaName(), Schema.AVRO(TestPulsarMetadata.Foo.class).getSchemaInfo());
topicsToSchemas.put(TOPIC_2.getSchemaName(), Schema.AVRO(TestPulsarMetadata.Foo.class).getSchemaInfo());
topicsToSchemas.put(TOPIC_3.getSchemaName(), Schema.AVRO(TestPulsarMetadata.Foo.class).getSchemaInfo());
topicsToSchemas.put(TOPIC_4.getSchemaName(), Schema.JSON(TestPulsarMetadata.Foo.class).getSchemaInfo());
topicsToSchemas.put(TOPIC_5.getSchemaName(), Schema.JSON(TestPulsarMetadata.Foo.class).getSchemaInfo());
topicsToSchemas.put(TOPIC_6.getSchemaName(), Schema.JSON(TestPulsarMetadata.Foo.class).getSchemaInfo());
topicsToSchemas.put(PARTITIONED_TOPIC_1.getSchemaName(), Schema.AVRO(TestPulsarMetadata.Foo.class).getSchemaInfo());
topicsToSchemas.put(PARTITIONED_TOPIC_2.getSchemaName(), Schema.AVRO(TestPulsarMetadata.Foo.class).getSchemaInfo());
topicsToSchemas.put(PARTITIONED_TOPIC_3.getSchemaName(), Schema.AVRO(TestPulsarMetadata.Foo.class).getSchemaInfo());
topicsToSchemas.put(PARTITIONED_TOPIC_4.getSchemaName(), Schema.JSON(TestPulsarMetadata.Foo.class).getSchemaInfo());
topicsToSchemas.put(PARTITIONED_TOPIC_5.getSchemaName(), Schema.JSON(TestPulsarMetadata.Foo.class).getSchemaInfo());
topicsToSchemas.put(PARTITIONED_TOPIC_6.getSchemaName(), Schema.JSON(TestPulsarMetadata.Foo.class).getSchemaInfo());
fooTypes = new HashMap<>();
fooTypes.put("field1", IntegerType.INTEGER);
......@@ -536,14 +533,9 @@ public abstract class TestPulsarConnector {
fieldNames10,
positionIndices10));
fooColumnHandles.addAll(PulsarInternalColumn.getInternalFields().stream().map(
new Function<PulsarInternalColumn, PulsarColumnHandle>() {
@Override
public PulsarColumnHandle apply(PulsarInternalColumn pulsarInternalColumn) {
return pulsarInternalColumn.getColumnHandle(pulsarConnectorId.toString(), false);
}
}).collect(Collectors.toList()));
fooColumnHandles.addAll(PulsarInternalColumn.getInternalFields().stream()
.map(pulsarInternalColumn -> pulsarInternalColumn.getColumnHandle(pulsarConnectorId.toString(), false))
.collect(Collectors.toList()));
splits = new HashMap<>();
......@@ -564,11 +556,11 @@ public abstract class TestPulsarConnector {
fooFunctions = new HashMap<>();
fooFunctions.put("field1", integer -> integer);
fooFunctions.put("field2", integer -> String.valueOf(integer));
fooFunctions.put("field3", integer -> integer.floatValue());
fooFunctions.put("field4", integer -> integer.doubleValue());
fooFunctions.put("field2", String::valueOf);
fooFunctions.put("field3", Integer::floatValue);
fooFunctions.put("field4", Integer::doubleValue);
fooFunctions.put("field5", integer -> integer % 2 == 0);
fooFunctions.put("field6", integer -> integer.longValue());
fooFunctions.put("field6", Integer::longValue);
fooFunctions.put("timestamp", integer -> System.currentTimeMillis());
fooFunctions.put("time", integer -> {
LocalTime now = LocalTime.now(ZoneId.systemDefault());
......@@ -647,45 +639,24 @@ public abstract class TestPulsarConnector {
private static final Logger log = Logger.get(TestPulsarConnector.class);
protected static List<String> getNamespace(String tenant) {
return new LinkedList<>(topicNames.stream().filter(new Predicate<TopicName>() {
@Override
public boolean test(TopicName topicName) {
return topicName.getTenant().equals(tenant);
}
}).map(new Function<TopicName, String>() {
@Override
public String apply(TopicName topicName) {
return topicName.getNamespace();
}
}).collect(Collectors.toSet()));
return topicNames.stream()
.filter(topicName -> topicName.getTenant().equals(tenant))
.map(TopicName::getNamespace)
.distinct()
.collect(Collectors.toCollection(LinkedList::new));
}
protected static List<String> getTopics(String ns) {
return topicNames.stream().filter(new Predicate<TopicName>() {
@Override
public boolean test(TopicName topicName) {
return topicName.getNamespace().equals(ns);
}
}).map(new Function<TopicName, String>() {
@Override
public String apply(TopicName topicName) {
return topicName.toString();
}
}).collect(Collectors.toList());
return topicNames.stream()
.filter(topicName -> topicName.getNamespace().equals(ns))
.map(TopicName::toString).collect(Collectors.toList());
}
protected static List<String> getPartitionedTopics(String ns) {
return partitionedTopicNames.stream().filter(new Predicate<TopicName>() {
@Override
public boolean test(TopicName topicName) {
return topicName.getNamespace().equals(ns);
}
}).map(new Function<TopicName, String>() {
@Override
public String apply(TopicName topicName) {
return topicName.toString();
}
}).collect(Collectors.toList());
return partitionedTopicNames.stream()
.filter(topicName -> topicName.getNamespace().equals(ns))
.map(TopicName::toString)
.collect(Collectors.toList());
}
@BeforeMethod
......@@ -696,12 +667,9 @@ public abstract class TestPulsarConnector {
this.pulsarConnectorConfig.setMaxSplitMessageQueueSize(100);
Tenants tenants = mock(Tenants.class);
doReturn(new LinkedList<>(topicNames.stream().map(new Function<TopicName, String>() {
@Override
public String apply(TopicName topicName) {
return topicName.getTenant();
}
}).collect(Collectors.toSet()))).when(tenants).getTenants();
doReturn(new LinkedList<>(topicNames.stream()
.map(TopicName::getTenant)
.collect(Collectors.toSet()))).when(tenants).getTenants();
Namespaces namespaces = mock(Namespaces.class);
......
......@@ -34,7 +34,6 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.core.Response;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Arrays;
......@@ -49,6 +48,11 @@ import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@Test(singleThreaded = true)
public class TestPulsarMetadata extends TestPulsarConnector {
......@@ -62,7 +66,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
String[] expectedSchemas = {NAMESPACE_NAME_1.toString(), NAMESPACE_NAME_2.toString(),
NAMESPACE_NAME_3.toString(), NAMESPACE_NAME_4.toString()};
Assert.assertEquals(new HashSet<>(schemas), new HashSet<>(Arrays.asList(expectedSchemas)));
assertEquals(new HashSet<>(schemas), new HashSet<>(Arrays.asList(expectedSchemas)));
}
@Test
......@@ -73,14 +77,14 @@ public class TestPulsarMetadata extends TestPulsarConnector {
ConnectorTableHandle connectorTableHandle
= this.pulsarMetadata.getTableHandle(mock(ConnectorSession.class), schemaTableName);
Assert.assertTrue(connectorTableHandle instanceof PulsarTableHandle);
assertTrue(connectorTableHandle instanceof PulsarTableHandle);
PulsarTableHandle pulsarTableHandle = (PulsarTableHandle) connectorTableHandle;
Assert.assertEquals(pulsarTableHandle.getConnectorId(), pulsarConnectorId.toString());
Assert.assertEquals(pulsarTableHandle.getSchemaName(), TOPIC_1.getNamespace());
Assert.assertEquals(pulsarTableHandle.getTableName(), TOPIC_1.getLocalName());
Assert.assertEquals(pulsarTableHandle.getTopicName(), TOPIC_1.getLocalName());
assertEquals(pulsarTableHandle.getConnectorId(), pulsarConnectorId.toString());
assertEquals(pulsarTableHandle.getSchemaName(), TOPIC_1.getNamespace());
assertEquals(pulsarTableHandle.getTableName(), TOPIC_1.getLocalName());
assertEquals(pulsarTableHandle.getTopicName(), TOPIC_1.getLocalName());
}
@Test
......@@ -101,10 +105,10 @@ public class TestPulsarMetadata extends TestPulsarConnector {
ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
pulsarTableHandle);
Assert.assertEquals(tableMetadata.getTable().getSchemaName(), topic.getNamespace());
Assert.assertEquals(tableMetadata.getTable().getTableName(), topic.getLocalName());
assertEquals(tableMetadata.getTable().getSchemaName(), topic.getNamespace());
assertEquals(tableMetadata.getTable().getTableName(), topic.getLocalName());
Assert.assertEquals(tableMetadata.getColumns().size(),
assertEquals(tableMetadata.getColumns().size(),
fooColumnHandles.size());
List<String> fieldNames = new LinkedList<>(fooFieldNames.keySet());
......@@ -115,7 +119,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
for (ColumnMetadata column : tableMetadata.getColumns()) {
if (PulsarInternalColumn.getInternalFieldsMap().containsKey(column.getName())) {
Assert.assertEquals(column.getComment(),
assertEquals(column.getComment(),
PulsarInternalColumn.getInternalFieldsMap()
.get(column.getName()).getColumnMetadata(true).getComment());
}
......@@ -123,7 +127,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
fieldNames.remove(column.getName());
}
Assert.assertTrue(fieldNames.isEmpty());
assertTrue(fieldNames.isEmpty());
}
}
......@@ -140,10 +144,10 @@ public class TestPulsarMetadata extends TestPulsarConnector {
try {
ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
pulsarTableHandle);
Assert.fail("Invalid schema should have generated an exception");
fail("Invalid schema should have generated an exception");
} catch (PrestoException e) {
Assert.assertEquals(e.getErrorCode(), NOT_FOUND.toErrorCode());
Assert.assertEquals(e.getMessage(), "Schema wrong-tenant/wrong-ns does not exist");
assertEquals(e.getErrorCode(), NOT_FOUND.toErrorCode());
assertEquals(e.getMessage(), "Schema wrong-tenant/wrong-ns does not exist");
}
}
......@@ -160,10 +164,10 @@ public class TestPulsarMetadata extends TestPulsarConnector {
try {
ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
pulsarTableHandle);
Assert.fail("Invalid table should have generated an exception");
fail("Invalid table should have generated an exception");
} catch (TableNotFoundException e) {
Assert.assertEquals(e.getErrorCode(), NOT_FOUND.toErrorCode());
Assert.assertEquals(e.getMessage(), "Table 'tenant-1/ns-1.wrong-topic' not found");
assertEquals(e.getErrorCode(), NOT_FOUND.toErrorCode());
assertEquals(e.getMessage(), "Table 'tenant-1/ns-1.wrong-topic' not found");
}
}
......@@ -183,7 +187,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
pulsarTableHandle);
Assert.assertEquals(tableMetadata.getColumns().size(), 0);
assertEquals(tableMetadata.getColumns().size(), 0);
}
@Test
......@@ -203,10 +207,10 @@ public class TestPulsarMetadata extends TestPulsarConnector {
try {
ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
pulsarTableHandle);
Assert.fail("Table without schema should have generated an exception");
fail("Table without schema should have generated an exception");
} catch (PrestoException e) {
Assert.assertEquals(e.getErrorCode(), NOT_SUPPORTED.toErrorCode());
Assert.assertEquals(e.getMessage(),
assertEquals(e.getErrorCode(), NOT_SUPPORTED.toErrorCode());
assertEquals(e.getMessage(),
"Topic persistent://tenant-1/ns-1/topic-1 does not have a valid schema");
}
}
......@@ -228,27 +232,27 @@ public class TestPulsarMetadata extends TestPulsarConnector {
try {
ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
pulsarTableHandle);
Assert.fail("Table without schema should have generated an exception");
fail("Table without schema should have generated an exception");
} catch (PrestoException e) {
Assert.assertEquals(e.getErrorCode(), NOT_SUPPORTED.toErrorCode());
Assert.assertEquals(e.getMessage(),
assertEquals(e.getErrorCode(), NOT_SUPPORTED.toErrorCode());
assertEquals(e.getMessage(),
"Topic persistent://tenant-1/ns-1/topic-1 does not have a valid schema");
}
}
@Test
public void testListTable() {
Assert.assertTrue(this.pulsarMetadata.listTables(mock(ConnectorSession.class), null).isEmpty());
Assert.assertTrue(this.pulsarMetadata.listTables(mock(ConnectorSession.class), "wrong-tenant/wrong-ns")
assertTrue(this.pulsarMetadata.listTables(mock(ConnectorSession.class), null).isEmpty());
assertTrue(this.pulsarMetadata.listTables(mock(ConnectorSession.class), "wrong-tenant/wrong-ns")
.isEmpty());
SchemaTableName[] expectedTopics1 = {new SchemaTableName(TOPIC_4.getNamespace(), TOPIC_4.getLocalName())};
Assert.assertEquals(this.pulsarMetadata.listTables(mock(ConnectorSession.class),
assertEquals(this.pulsarMetadata.listTables(mock(ConnectorSession.class),
NAMESPACE_NAME_3.toString()), Arrays.asList(expectedTopics1));
SchemaTableName[] expectedTopics2 = {new SchemaTableName(TOPIC_5.getNamespace(), TOPIC_5.getLocalName()),
new SchemaTableName(TOPIC_6.getNamespace(), TOPIC_6.getLocalName())};
Assert.assertEquals(new HashSet<>(this.pulsarMetadata.listTables(mock(ConnectorSession.class),
assertEquals(new HashSet<>(this.pulsarMetadata.listTables(mock(ConnectorSession.class),
NAMESPACE_NAME_4.toString())), new HashSet<>(Arrays.asList(expectedTopics2)));
}
......@@ -267,25 +271,25 @@ public class TestPulsarMetadata extends TestPulsarConnector {
}
for (String field : fieldNames) {
Assert.assertNotNull(columnHandleMap.get(field));
assertNotNull(columnHandleMap.get(field));
PulsarColumnHandle pulsarColumnHandle = (PulsarColumnHandle) columnHandleMap.get(field);
PulsarInternalColumn pulsarInternalColumn = PulsarInternalColumn.getInternalFieldsMap().get(field);
if (pulsarInternalColumn != null) {
Assert.assertEquals(pulsarColumnHandle,
assertEquals(pulsarColumnHandle,
pulsarInternalColumn.getColumnHandle(pulsarConnectorId.toString(), false));
} else {
Schema schema = new Schema.Parser().parse(new String(topicsToSchemas.get(TOPIC_1.getSchemaName())
.getSchema()));
Assert.assertEquals(pulsarColumnHandle.getConnectorId(), pulsarConnectorId.toString());
Assert.assertEquals(pulsarColumnHandle.getName(), field);
Assert.assertEquals(pulsarColumnHandle.getPositionIndices(), fooPositionIndices.get(field));
Assert.assertEquals(pulsarColumnHandle.getFieldNames(), fooFieldNames.get(field));
Assert.assertEquals(pulsarColumnHandle.getType(), fooTypes.get(field));
Assert.assertEquals(pulsarColumnHandle.isHidden(), false);
assertEquals(pulsarColumnHandle.getConnectorId(), pulsarConnectorId.toString());
assertEquals(pulsarColumnHandle.getName(), field);
assertEquals(pulsarColumnHandle.getPositionIndices(), fooPositionIndices.get(field));
assertEquals(pulsarColumnHandle.getFieldNames(), fooFieldNames.get(field));
assertEquals(pulsarColumnHandle.getType(), fooTypes.get(field));
assertFalse(pulsarColumnHandle.isHidden());
}
columnHandleMap.remove(field);
}
Assert.assertTrue(columnHandleMap.isEmpty());
assertTrue(columnHandleMap.isEmpty());
}
@Test
......@@ -294,11 +298,11 @@ public class TestPulsarMetadata extends TestPulsarConnector {
= this.pulsarMetadata.listTableColumns(mock(ConnectorSession.class),
new SchemaTablePrefix(TOPIC_1.getNamespace()));
Assert.assertEquals(tableColumnsMap.size(), 2);
assertEquals(tableColumnsMap.size(), 2);
List<ColumnMetadata> columnMetadataList
= tableColumnsMap.get(new SchemaTableName(TOPIC_1.getNamespace(), TOPIC_1.getLocalName()));
Assert.assertNotNull(columnMetadataList);
Assert.assertEquals(columnMetadataList.size(),
assertNotNull(columnMetadataList);
assertEquals(columnMetadataList.size(),
fooColumnHandles.size());
List<String> fieldNames = new LinkedList<>(fooFieldNames.keySet());
......@@ -309,7 +313,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
for (ColumnMetadata column : columnMetadataList) {
if (PulsarInternalColumn.getInternalFieldsMap().containsKey(column.getName())) {
Assert.assertEquals(column.getComment(),
assertEquals(column.getComment(),
PulsarInternalColumn.getInternalFieldsMap()
.get(column.getName()).getColumnMetadata(true).getComment());
}
......@@ -317,11 +321,11 @@ public class TestPulsarMetadata extends TestPulsarConnector {
fieldNames.remove(column.getName());
}
Assert.assertTrue(fieldNames.isEmpty());
assertTrue(fieldNames.isEmpty());
columnMetadataList = tableColumnsMap.get(new SchemaTableName(TOPIC_2.getNamespace(), TOPIC_2.getLocalName()));
Assert.assertNotNull(columnMetadataList);
Assert.assertEquals(columnMetadataList.size(),
assertNotNull(columnMetadataList);
assertEquals(columnMetadataList.size(),
fooColumnHandles.size());
fieldNames = new LinkedList<>(fooFieldNames.keySet());
......@@ -332,7 +336,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
for (ColumnMetadata column : columnMetadataList) {
if (PulsarInternalColumn.getInternalFieldsMap().containsKey(column.getName())) {
Assert.assertEquals(column.getComment(),
assertEquals(column.getComment(),
PulsarInternalColumn.getInternalFieldsMap()
.get(column.getName()).getColumnMetadata(true).getComment());
}
......@@ -340,17 +344,17 @@ public class TestPulsarMetadata extends TestPulsarConnector {
fieldNames.remove(column.getName());
}
Assert.assertTrue(fieldNames.isEmpty());
assertTrue(fieldNames.isEmpty());
// test table and schema
tableColumnsMap
= this.pulsarMetadata.listTableColumns(mock(ConnectorSession.class),
new SchemaTablePrefix(TOPIC_4.getNamespace(), TOPIC_4.getLocalName()));
Assert.assertEquals(tableColumnsMap.size(), 1);
assertEquals(tableColumnsMap.size(), 1);
columnMetadataList = tableColumnsMap.get(new SchemaTableName(TOPIC_4.getNamespace(), TOPIC_4.getLocalName()));
Assert.assertNotNull(columnMetadataList);
Assert.assertEquals(columnMetadataList.size(),
assertNotNull(columnMetadataList);
assertEquals(columnMetadataList.size(),
fooColumnHandles.size());
fieldNames = new LinkedList<>(fooFieldNames.keySet());
......@@ -361,7 +365,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
for (ColumnMetadata column : columnMetadataList) {
if (PulsarInternalColumn.getInternalFieldsMap().containsKey(column.getName())) {
Assert.assertEquals(column.getComment(),
assertEquals(column.getComment(),
PulsarInternalColumn.getInternalFieldsMap()
.get(column.getName()).getColumnMetadata(true).getComment());
}
......@@ -369,6 +373,6 @@ public class TestPulsarMetadata extends TestPulsarConnector {
fieldNames.remove(column.getName());
}
Assert.assertTrue(fieldNames.isEmpty());
assertTrue(fieldNames.isEmpty());
}
}
......@@ -20,13 +20,14 @@ package org.apache.pulsar.sql.presto;
import io.airlift.log.Logger;
import org.apache.pulsar.common.naming.TopicName;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import static org.testng.Assert.assertEquals;
@Test(singleThreaded = true)
public class TestPulsarRecordCursor extends TestPulsarConnector {
......@@ -50,22 +51,22 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
columnsSeen.add(fooColumnHandles.get(i).getName());
} else {
if (fooColumnHandles.get(i).getName().equals("field1")) {
Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("field1").apply(count)).longValue());
assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("field1").apply(count)).longValue());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("field2")) {
Assert.assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), ((String) fooFunctions.get("field2").apply(count)).getBytes());
assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), ((String) fooFunctions.get("field2").apply(count)).getBytes());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("field3")) {
Assert.assertEquals(pulsarRecordCursor.getLong(i), Float.floatToIntBits(((Float) fooFunctions.get("field3").apply(count)).floatValue()));
assertEquals(pulsarRecordCursor.getLong(i), Float.floatToIntBits(((Float) fooFunctions.get("field3").apply(count)).floatValue()));
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("field4")) {
Assert.assertEquals(pulsarRecordCursor.getDouble(i), ((Double) fooFunctions.get("field4").apply(count)).doubleValue());
assertEquals(pulsarRecordCursor.getDouble(i), ((Double) fooFunctions.get("field4").apply(count)).doubleValue());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("field5")) {
Assert.assertEquals(pulsarRecordCursor.getBoolean(i), ((Boolean) fooFunctions.get("field5").apply(count)).booleanValue());
assertEquals(pulsarRecordCursor.getBoolean(i), ((Boolean) fooFunctions.get("field5").apply(count)).booleanValue());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("field6")) {
Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Long) fooFunctions.get("field6").apply(count)).longValue());
assertEquals(pulsarRecordCursor.getLong(i), ((Long) fooFunctions.get("field6").apply(count)).longValue());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("timestamp")) {
pulsarRecordCursor.getLong(i);
......@@ -77,40 +78,40 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
pulsarRecordCursor.getLong(i);
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("bar.field1")) {
Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("bar.field1").apply(count)).longValue());
assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("bar.field1").apply(count)).longValue());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("bar.field2")) {
Assert.assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), ((String) fooFunctions.get("bar.field2").apply(count)).getBytes());
assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), ((String) fooFunctions.get("bar.field2").apply(count)).getBytes());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("bar.field3")) {
Assert.assertEquals(pulsarRecordCursor.getLong(i), Float.floatToIntBits(((Float) fooFunctions.get("bar.field3").apply(count)).floatValue()));
assertEquals(pulsarRecordCursor.getLong(i), Float.floatToIntBits(((Float) fooFunctions.get("bar.field3").apply(count)).floatValue()));
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("bar.test.field4")) {
Assert.assertEquals(pulsarRecordCursor.getDouble(i), ((Double) fooFunctions.get("bar.test.field4").apply(count)).doubleValue());
assertEquals(pulsarRecordCursor.getDouble(i), ((Double) fooFunctions.get("bar.test.field4").apply(count)).doubleValue());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("bar.test.field5")) {
Assert.assertEquals(pulsarRecordCursor.getBoolean(i), ((Boolean) fooFunctions.get("bar.test.field5").apply(count)).booleanValue());
assertEquals(pulsarRecordCursor.getBoolean(i), ((Boolean) fooFunctions.get("bar.test.field5").apply(count)).booleanValue());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("bar.test.field6")) {
Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Long) fooFunctions.get("bar.test.field6").apply(count)).longValue());
assertEquals(pulsarRecordCursor.getLong(i), ((Long) fooFunctions.get("bar.test.field6").apply(count)).longValue());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("bar.test.foobar.field1")) {
Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("bar.test.foobar.field1").apply(count)).longValue());
assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("bar.test.foobar.field1").apply(count)).longValue());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("bar.test2.field4")) {
Assert.assertEquals(pulsarRecordCursor.getDouble(i), ((Double) fooFunctions.get("bar.test2.field4").apply(count)).doubleValue());
assertEquals(pulsarRecordCursor.getDouble(i), ((Double) fooFunctions.get("bar.test2.field4").apply(count)).doubleValue());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("bar.test2.field5")) {
Assert.assertEquals(pulsarRecordCursor.getBoolean(i), ((Boolean) fooFunctions.get("bar.test2.field5").apply(count)).booleanValue());
assertEquals(pulsarRecordCursor.getBoolean(i), ((Boolean) fooFunctions.get("bar.test2.field5").apply(count)).booleanValue());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("bar.test2.field6")) {
Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Long) fooFunctions.get("bar.test2.field6").apply(count)).longValue());
assertEquals(pulsarRecordCursor.getLong(i), ((Long) fooFunctions.get("bar.test2.field6").apply(count)).longValue());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("bar.test2.foobar.field1")) {
Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("bar.test2.foobar.field1").apply(count)).longValue());
assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("bar.test2.foobar.field1").apply(count)).longValue());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else if (fooColumnHandles.get(i).getName().equals("field7")) {
Assert.assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), fooFunctions.get("field7").apply(count).toString().getBytes());
assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), fooFunctions.get("field7").apply(count).toString().getBytes());
columnsSeen.add(fooColumnHandles.get(i).getName());
} else {
if (PulsarInternalColumn.getInternalFieldsMap().containsKey(fooColumnHandles.get(i).getName())) {
......@@ -119,11 +120,11 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
}
}
}
Assert.assertEquals(columnsSeen.size(), fooColumnHandles.size());
assertEquals(columnsSeen.size(), fooColumnHandles.size());
count++;
}
Assert.assertEquals(count, topicsToNumEntries.get(topicName.getSchemaName()).longValue());
Assert.assertEquals(pulsarRecordCursor.getCompletedBytes(), completedBytes);
assertEquals(count, topicsToNumEntries.get(topicName.getSchemaName()).longValue());
assertEquals(pulsarRecordCursor.getCompletedBytes(), completedBytes);
cleanup();
pulsarRecordCursor.close();
}
......
......@@ -32,14 +32,12 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static com.facebook.presto.spi.type.DateTimeEncoding.packDateTimeWithZone;
......@@ -50,6 +48,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
@Test(singleThreaded = true)
public class TestPulsarSplitManager extends TestPulsarConnector {
......@@ -94,23 +93,23 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
int totalSize = 0;
for (PulsarSplit pulsarSplit : resultCaptor.getResult()) {
Assert.assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
Assert.assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
Assert.assertEquals(pulsarSplit.getTableName(), topicName.getLocalName());
Assert.assertEquals(pulsarSplit.getSchema(),
assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
assertEquals(pulsarSplit.getTableName(), topicName.getLocalName());
assertEquals(pulsarSplit.getSchema(),
new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()));
Assert.assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
Assert.assertEquals(pulsarSplit.getStartPositionEntryId(), totalSize);
Assert.assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
Assert.assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, totalSize));
Assert.assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
Assert.assertEquals(pulsarSplit.getEndPositionEntryId(), totalSize + pulsarSplit.getSplitSize());
Assert.assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, totalSize + pulsarSplit.getSplitSize()));
assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
assertEquals(pulsarSplit.getStartPositionEntryId(), totalSize);
assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, totalSize));
assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
assertEquals(pulsarSplit.getEndPositionEntryId(), totalSize + pulsarSplit.getSplitSize());
assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, totalSize + pulsarSplit.getSplitSize()));
totalSize += pulsarSplit.getSplitSize();
}
Assert.assertEquals(totalSize, topicsToNumEntries.get(topicName.getSchemaName()).intValue());
assertEquals(totalSize, topicsToNumEntries.get(topicName.getSchemaName()).intValue());
cleanup();
}
......@@ -142,23 +141,23 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
List<PulsarSplit> splits = getSplitsForPartition(topicName.getPartition(i), resultCaptor.getResult());
int totalSize = 0;
for (PulsarSplit pulsarSplit : splits) {
Assert.assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
Assert.assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
Assert.assertEquals(pulsarSplit.getTableName(), topicName.getPartition(i).getLocalName());
Assert.assertEquals(pulsarSplit.getSchema(),
assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
assertEquals(pulsarSplit.getTableName(), topicName.getPartition(i).getLocalName());
assertEquals(pulsarSplit.getSchema(),
new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()));
Assert.assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
Assert.assertEquals(pulsarSplit.getStartPositionEntryId(), totalSize);
Assert.assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
Assert.assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, totalSize));
Assert.assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
Assert.assertEquals(pulsarSplit.getEndPositionEntryId(), totalSize + pulsarSplit.getSplitSize());
Assert.assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, totalSize + pulsarSplit.getSplitSize()));
assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
assertEquals(pulsarSplit.getStartPositionEntryId(), totalSize);
assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, totalSize));
assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
assertEquals(pulsarSplit.getEndPositionEntryId(), totalSize + pulsarSplit.getSplitSize());
assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, totalSize + pulsarSplit.getSplitSize()));
totalSize += pulsarSplit.getSplitSize();
}
Assert.assertEquals(totalSize, topicsToNumEntries.get(topicName.getSchemaName()).intValue());
assertEquals(totalSize, topicsToNumEntries.get(topicName.getSchemaName()).intValue());
}
cleanup();
......@@ -166,13 +165,9 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
}
private List<PulsarSplit> getSplitsForPartition(TopicName target, Collection<PulsarSplit> splits) {
return splits.stream().filter(new Predicate<PulsarSplit>() {
@Override
public boolean test(PulsarSplit pulsarSplit) {
TopicName topicName = TopicName.get(pulsarSplit.getSchemaName() + "/" + pulsarSplit.getTableName());
return target.equals(topicName);
}
return splits.stream().filter(pulsarSplit -> {
TopicName topicName = TopicName.get(pulsarSplit.getSchemaName() + "/" + pulsarSplit.getTableName());
return target.equals(topicName);
}).collect(Collectors.toList());
}
......@@ -213,24 +208,24 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
int totalSize = 0;
int initalStart = 1;
for (PulsarSplit pulsarSplit : resultCaptor.getResult()) {
Assert.assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
Assert.assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
Assert.assertEquals(pulsarSplit.getTableName(), topicName.getLocalName());
Assert.assertEquals(pulsarSplit.getSchema(),
assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
assertEquals(pulsarSplit.getTableName(), topicName.getLocalName());
assertEquals(pulsarSplit.getSchema(),
new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()));
Assert.assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
Assert.assertEquals(pulsarSplit.getStartPositionEntryId(), initalStart);
Assert.assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
Assert.assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, initalStart));
Assert.assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
Assert.assertEquals(pulsarSplit.getEndPositionEntryId(), initalStart + pulsarSplit.getSplitSize());
Assert.assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, initalStart + pulsarSplit
assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
assertEquals(pulsarSplit.getStartPositionEntryId(), initalStart);
assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, initalStart));
assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
assertEquals(pulsarSplit.getEndPositionEntryId(), initalStart + pulsarSplit.getSplitSize());
assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, initalStart + pulsarSplit
.getSplitSize()));
initalStart += pulsarSplit.getSplitSize();
totalSize += pulsarSplit.getSplitSize();
}
Assert.assertEquals(totalSize, 49);
assertEquals(totalSize, 49);
}
......@@ -273,26 +268,26 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
for (int i = 0; i < partitions; i++) {
List<PulsarSplit> splits = getSplitsForPartition(topicName.getPartition(i), resultCaptor.getResult());
int totalSize = 0;
int initalStart = 1;
int initialStart = 1;
for (PulsarSplit pulsarSplit : splits) {
Assert.assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
Assert.assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
Assert.assertEquals(pulsarSplit.getTableName(), topicName.getPartition(i).getLocalName());
Assert.assertEquals(pulsarSplit.getSchema(),
assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
assertEquals(pulsarSplit.getTableName(), topicName.getPartition(i).getLocalName());
assertEquals(pulsarSplit.getSchema(),
new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()));
Assert.assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
Assert.assertEquals(pulsarSplit.getStartPositionEntryId(), initalStart);
Assert.assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
Assert.assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, initalStart));
Assert.assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
Assert.assertEquals(pulsarSplit.getEndPositionEntryId(), initalStart + pulsarSplit.getSplitSize());
Assert.assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, initalStart + pulsarSplit.getSplitSize()));
initalStart += pulsarSplit.getSplitSize();
assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
assertEquals(pulsarSplit.getStartPositionEntryId(), initialStart);
assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, initialStart));
assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
assertEquals(pulsarSplit.getEndPositionEntryId(), initialStart + pulsarSplit.getSplitSize());
assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, initialStart + pulsarSplit.getSplitSize()));
initialStart += pulsarSplit.getSplitSize();
totalSize += pulsarSplit.getSplitSize();
}
Assert.assertEquals(totalSize, 49);
assertEquals(totalSize, 49);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册