提交 1ab35b01 编写于 作者: C congbo 提交者: Sijie Guo

Pulsar SQL supports pulsar's primitive schema (#4728)

### Motivation
Continue the PR of #4151
上级 0362944f
......@@ -175,4 +175,46 @@ public enum SchemaType {
default: return NONE;
}
}
public boolean isPrimitive() {
return isPrimitiveType(this);
}
public boolean isStruct() {
return isStructType(this);
}
public static boolean isPrimitiveType(SchemaType type) {
switch (type) {
case STRING:
case BOOLEAN:
case INT8:
case INT16:
case INT32:
case INT64:
case FLOAT:
case DOUBLE:
case DATE:
case TIME:
case TIMESTAMP:
case BYTES:
case NONE:
return true;
default:
return false;
}
}
public static boolean isStructType(SchemaType type) {
switch (type) {
case AVRO:
case JSON:
case PROTOBUF:
return true;
default:
return false;
}
}
}
......@@ -34,9 +34,14 @@ import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.type.BigintType;
import com.facebook.presto.spi.type.BooleanType;
import com.facebook.presto.spi.type.DateType;
import com.facebook.presto.spi.type.DoubleType;
import com.facebook.presto.spi.type.IntegerType;
import com.facebook.presto.spi.type.RealType;
import com.facebook.presto.spi.type.SmallintType;
import com.facebook.presto.spi.type.TimeType;
import com.facebook.presto.spi.type.TimestampType;
import com.facebook.presto.spi.type.TinyintType;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.VarbinaryType;
import com.facebook.presto.spi.type.VarcharType;
......@@ -55,6 +60,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import javax.inject.Inject;
import java.util.HashMap;
......@@ -296,6 +302,56 @@ public class PulsarMetadata implements ConnectorMetadata {
+ String.format("%s/%s", namespace, schemaTableName.getTableName())
+ ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
List<ColumnMetadata> handles = getPulsarColumns(
topicName, schemaInfo, withInternalColumns
);
return new ConnectorTableMetadata(schemaTableName, handles);
}
/**
* Convert pulsar schema into presto table metadata.
*/
static List<ColumnMetadata> getPulsarColumns(TopicName topicName,
SchemaInfo schemaInfo,
boolean withInternalColumns) {
SchemaType schemaType = schemaInfo.getType();
if (schemaType.isStruct()) {
return getPulsarColumnsFromStructSchema(topicName, schemaInfo, withInternalColumns);
} else if (schemaType.isPrimitive()) {
return getPulsarColumnsFromPrimitiveSchema(topicName, schemaInfo, withInternalColumns);
} else {
throw new IllegalArgumentException("Unsupported schema : " + schemaInfo);
}
}
static List<ColumnMetadata> getPulsarColumnsFromPrimitiveSchema(TopicName topicName,
SchemaInfo schemaInfo,
boolean withInternalColumns) {
ImmutableList.Builder<ColumnMetadata> builder = ImmutableList.builder();
ColumnMetadata valueColumn = new PulsarColumnMetadata(
"__value__",
convertPulsarType(schemaInfo.getType()),
null, null, false, false,
new String[0],
new Integer[0]);
builder.add(valueColumn);
if (withInternalColumns) {
PulsarInternalColumn.getInternalFields()
.stream()
.forEach(pulsarInternalColumn -> builder.add(pulsarInternalColumn.getColumnMetadata(false)));
}
return builder.build();
}
static List<ColumnMetadata> getPulsarColumnsFromStructSchema(TopicName topicName,
SchemaInfo schemaInfo,
boolean withInternalColumns) {
String schemaJson = new String(schemaInfo.getSchema());
if (StringUtils.isBlank(schemaJson)) {
......@@ -315,11 +371,44 @@ public class PulsarMetadata implements ConnectorMetadata {
builder.addAll(getColumns(null, schema, new HashSet<>(), new Stack<>(), new Stack<>()));
if (withInternalColumns) {
PulsarInternalColumn.getInternalFields().forEach(
pulsarInternalColumn -> builder.add(pulsarInternalColumn.getColumnMetadata(false)));
PulsarInternalColumn.getInternalFields()
.stream()
.forEach(pulsarInternalColumn -> builder.add(pulsarInternalColumn.getColumnMetadata(false)));
}
return builder.build();
}
@VisibleForTesting
static Type convertPulsarType(SchemaType pulsarType) {
switch (pulsarType) {
case BOOLEAN:
return BooleanType.BOOLEAN;
case INT8:
return TinyintType.TINYINT;
case INT16:
return SmallintType.SMALLINT;
case INT32:
return IntegerType.INTEGER;
case INT64:
return BigintType.BIGINT;
case FLOAT:
return RealType.REAL;
case DOUBLE:
return DoubleType.DOUBLE;
case NONE:
case BYTES:
return VarbinaryType.VARBINARY;
case STRING:
return VarcharType.VARCHAR;
case DATE:
return DateType.DATE;
case TIME:
return TimeType.TIME;
case TIMESTAMP:
return TimestampType.TIMESTAMP;
default:
log.error("Cannot convert type: %s", pulsarType);
return null;
}
return new ConnectorTableMetadata(schemaTableName, builder.build());
}
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.sql.presto;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Date;
import io.netty.util.concurrent.FastThreadLocal;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.common.schema.SchemaInfo;
/**
* A presto schema handler that interprets data using pulsar schema.
*/
public class PulsarPrimitiveSchemaHandler implements SchemaHandler {
private final SchemaInfo schemaInfo;
private final Schema<?> schema;
public PulsarPrimitiveSchemaHandler(SchemaInfo schemaInfo) {
this.schemaInfo = schemaInfo;
this.schema = AutoConsumeSchema.getSchema(schemaInfo);
}
@Override
public Object deserialize(ByteBuf byteBuf) {
byte[] data = ByteBufUtil.getBytes(byteBuf);
Object currentRecord = schema.decode(data);
switch (schemaInfo.getType()) {
case DATE:
return ((Date) currentRecord).getTime();
case TIME:
return ((Time) currentRecord).getTime();
case TIMESTAMP:
return ((Timestamp) currentRecord).getTime();
default:
return currentRecord;
}
}
@Override
public Object extractField(int index, Object currentRecord) {
return currentRecord;
}
}
\ No newline at end of file
......@@ -140,9 +140,10 @@ public class PulsarRecordCursor implements RecordCursor {
this.readOffloaded = pulsarConnectorConfig.getManagedLedgerOffloadDriver() != null;
this.pulsarConnectorConfig = pulsarConnectorConfig;
Schema schema = PulsarConnectorUtils.parseSchema(pulsarSplit.getSchema());
this.schemaHandler = getSchemaHandler(schema, pulsarSplit.getSchemaType(), columnHandles);
this.schemaHandler = PulsarSchemaHandlers.newPulsarSchemaHandler(
pulsarSplit.getSchemaInfo(),
columnHandles
);
log.info("Initializing split with parameters: %s", pulsarSplit);
......@@ -156,22 +157,6 @@ public class PulsarRecordCursor implements RecordCursor {
}
}
private SchemaHandler getSchemaHandler(Schema schema, SchemaType schemaType,
List<PulsarColumnHandle> columnHandles) {
SchemaHandler schemaHandler;
switch (schemaType) {
case JSON:
schemaHandler = new JSONSchemaHandler(columnHandles);
break;
case AVRO:
schemaHandler = new AvroSchemaHandler(schema, columnHandles);
break;
default:
throw new PrestoException(NOT_SUPPORTED, "Not supported schema type: " + schemaType);
}
return schemaHandler;
}
private ReadOnlyCursor getCursor(TopicName topicName, Position startPosition, ManagedLedgerFactory
managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig)
throws ManagedLedgerException, InterruptedException {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.sql.presto;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.facebook.presto.spi.PrestoException;
import java.util.List;
import org.apache.pulsar.common.schema.SchemaInfo;
class PulsarSchemaHandlers {
static SchemaHandler newPulsarSchemaHandler(SchemaInfo schemaInfo,
List<PulsarColumnHandle> columnHandles) {
if (schemaInfo.getType().isPrimitive()) {
return new PulsarPrimitiveSchemaHandler(schemaInfo);
} else if (schemaInfo.getType().isStruct()) {
switch (schemaInfo.getType()) {
case JSON:
return new JSONSchemaHandler(columnHandles);
case AVRO:
return new AvroSchemaHandler(PulsarConnectorUtils
.parseSchema(new String(schemaInfo.getSchema(), UTF_8)
), columnHandles);
default:
throw new PrestoException(NOT_SUPPORTED, "Not supported schema type: " + schemaInfo.getType());
}
} else {
throw new PrestoException(
NOT_SUPPORTED,
"Schema `" + schemaInfo.getType() + "` is not supported by presto yet : " + schemaInfo);
}
}
}
\ No newline at end of file
......@@ -26,9 +26,11 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.util.List;
import java.util.Map;
import static java.util.Objects.requireNonNull;
......@@ -46,6 +48,7 @@ public class PulsarSplit implements ConnectorSplit {
private final long startPositionLedgerId;
private final long endPositionLedgerId;
private final TupleDomain<ColumnHandle> tupleDomain;
private final SchemaInfo schemaInfo;
private final PositionImpl startPosition;
private final PositionImpl endPosition;
......@@ -63,8 +66,16 @@ public class PulsarSplit implements ConnectorSplit {
@JsonProperty("endPositionEntryId") long endPositionEntryId,
@JsonProperty("startPositionLedgerId") long startPositionLedgerId,
@JsonProperty("endPositionLedgerId") long endPositionLedgerId,
@JsonProperty("tupleDomain") TupleDomain<ColumnHandle> tupleDomain) {
@JsonProperty("tupleDomain") TupleDomain<ColumnHandle> tupleDomain,
@JsonProperty("properties") Map<String, String> schemaInfoProperties) {
this.splitId = splitId;
requireNonNull(schemaName, "schema name is null");
this.schemaInfo = SchemaInfo.builder()
.type(schemaType)
.name(schemaName)
.schema(schema.getBytes())
.properties(schemaInfoProperties)
.build();
this.schemaName = requireNonNull(schemaName, "schema name is null");
this.connectorId = requireNonNull(connectorId, "connector id is null");
this.tableName = requireNonNull(tableName, "table name is null");
......@@ -179,4 +190,8 @@ public class PulsarSplit implements ConnectorSplit {
", endPositionLedgerId=" + endPositionLedgerId +
'}';
}
public SchemaInfo getSchemaInfo() {
return schemaInfo;
}
}
......@@ -270,7 +270,8 @@ public class PulsarSplitManager implements ConnectorSplitManager {
endPosition.getEntryId(),
startPosition.getLedgerId(),
endPosition.getLedgerId(),
tupleDomain));
tupleDomain,
schemaInfo.getProperties()));
}
return splits;
} finally {
......
......@@ -552,7 +552,7 @@ public abstract class TestPulsarConnector {
new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()),
topicsToSchemas.get(topicName.getSchemaName()).getType(),
0, topicsToNumEntries.get(topicName.getSchemaName()),
0, 0, TupleDomain.all()));
0, 0, TupleDomain.all(), new HashMap<>()));
}
fooFunctions = new HashMap<>();
......
......@@ -35,6 +35,8 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.core.Response;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.annotations.Test;
import java.util.Arrays;
......@@ -206,6 +208,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
SchemaInfo badSchemaInfo = new SchemaInfo();
badSchemaInfo.setSchema(new byte[0]);
badSchemaInfo.setType(SchemaType.AVRO);
when(this.schemas.getSchemaInfo(eq(TOPIC_1.getSchemaName()))).thenReturn(badSchemaInfo);
PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(
......@@ -231,6 +234,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
SchemaInfo badSchemaInfo = new SchemaInfo();
badSchemaInfo.setSchema("foo".getBytes());
badSchemaInfo.setType(SchemaType.AVRO);
when(this.schemas.getSchemaInfo(eq(TOPIC_1.getSchemaName()))).thenReturn(badSchemaInfo);
PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.sql.presto;
import com.facebook.presto.spi.ColumnMetadata;
import io.netty.buffer.ByteBufAllocator;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.impl.schema.BooleanSchema;
import org.apache.pulsar.client.impl.schema.ByteSchema;
import org.apache.pulsar.client.impl.schema.BytesSchema;
import org.apache.pulsar.client.impl.schema.DateSchema;
import org.apache.pulsar.client.impl.schema.DoubleSchema;
import org.apache.pulsar.client.impl.schema.FloatSchema;
import org.apache.pulsar.client.impl.schema.IntSchema;
import org.apache.pulsar.client.impl.schema.LongSchema;
import org.apache.pulsar.client.impl.schema.ShortSchema;
import org.apache.pulsar.client.impl.schema.StringSchema;
import org.apache.pulsar.client.impl.schema.TimeSchema;
import org.apache.pulsar.client.impl.schema.TimestampSchema;
import org.apache.pulsar.common.api.raw.RawMessage;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Date;
import java.util.List;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@Slf4j
public class TestPulsarPrimitiveSchemaHandler {
private static final TopicName stringTopicName = TopicName.get("persistent", "tenant-1", "ns-1", "topic-1");
@Test
public void testPulsarPrimitiveSchemaHandler() {
PulsarPrimitiveSchemaHandler pulsarPrimitiveSchemaHandler;
RawMessage rawMessage = mock(RawMessage.class);
SchemaInfo schemaInfoInt8 = SchemaInfo.builder().type(SchemaType.INT8).build();
pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoInt8);
byte int8Value = 1;
when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(ByteSchema.of().encode(int8Value)));
Assert.assertEquals(int8Value, (byte)pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData()));
SchemaInfo schemaInfoInt16 = SchemaInfo.builder().type(SchemaType.INT16).build();
pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoInt16);
short int16Value = 2;
when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(ShortSchema.of().encode(int16Value)));
Assert.assertEquals(int16Value, pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData()));
SchemaInfo schemaInfoInt32 = SchemaInfo.builder().type(SchemaType.INT32).build();
pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoInt32);
int int32Value = 2;
when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(IntSchema.of().encode(int32Value)));
Assert.assertEquals(int32Value, pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData()));
SchemaInfo schemaInfoInt64 = SchemaInfo.builder().type(SchemaType.INT64).build();
pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoInt64);
long int64Value = 2;
when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(LongSchema.of().encode(int64Value)));
Assert.assertEquals(int64Value, pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData()));
SchemaInfo schemaInfoString = SchemaInfo.builder().type(SchemaType.STRING).build();
pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoString);
String stringValue = "test";
when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(StringSchema.utf8().encode(stringValue)));
Assert.assertEquals(stringValue, pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData()));
SchemaInfo schemaInfoFloat = SchemaInfo.builder().type(SchemaType.FLOAT).build();
pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoFloat);
float floatValue = 0.2f;
when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(FloatSchema.of().encode(floatValue)));
Assert.assertEquals(floatValue, pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData()));
SchemaInfo schemaInfoDouble = SchemaInfo.builder().type(SchemaType.DOUBLE).build();
pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoDouble);
double doubleValue = 0.22d;
when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(DoubleSchema.of().encode(doubleValue)));
Assert.assertEquals(doubleValue, pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData()));
SchemaInfo schemaInfoBoolean = SchemaInfo.builder().type(SchemaType.BOOLEAN).build();
pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoBoolean);
boolean booleanValue = true;
when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(BooleanSchema.of().encode(booleanValue)));
Assert.assertEquals(booleanValue, pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData()));
SchemaInfo schemaInfoBytes = SchemaInfo.builder().type(SchemaType.BYTES).build();
pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoBytes);
byte[] bytesValue = new byte[1];
bytesValue[0] = 1;
when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(BytesSchema.of().encode(bytesValue)));
Assert.assertEquals(bytesValue, pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData()));
SchemaInfo schemaInfoDate = SchemaInfo.builder().type(SchemaType.DATE).build();
pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoDate);
Date dateValue = new Date(System.currentTimeMillis());
when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(DateSchema.of().encode(dateValue)));
Object dateDeserializeValue = pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData());
Assert.assertEquals(dateValue.getTime(), dateDeserializeValue);
SchemaInfo schemaInfoTime = SchemaInfo.builder().type(SchemaType.TIME).build();
pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoTime);
Time timeValue = new Time(System.currentTimeMillis());
when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(TimeSchema.of().encode(timeValue)));
Object timeDeserializeValue = pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData());
Assert.assertEquals(timeValue.getTime(), timeDeserializeValue);
SchemaInfo schemaInfoTimestamp = SchemaInfo.builder().type(SchemaType.TIMESTAMP).build();
pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoTimestamp);
Timestamp timestampValue = new Timestamp(System.currentTimeMillis());
when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(TimestampSchema.of().encode(timestampValue)));
Object timestampDeserializeValue = pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData());
Assert.assertEquals(timestampValue.getTime(), timestampDeserializeValue);
}
@Test
public void testNewPulsarPrimitiveSchemaHandler() {
RawMessage rawMessage = mock(RawMessage.class);
SchemaHandler schemaHandler = PulsarSchemaHandlers.newPulsarSchemaHandler(
StringSchema.utf8().getSchemaInfo(),
null);
String stringValue = "test";
when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(StringSchema.utf8().encode(stringValue)));
Object deserializeValue = schemaHandler.deserialize(rawMessage.getData());
Assert.assertEquals(stringValue, (String)deserializeValue);
Assert.assertEquals(stringValue, (String)deserializeValue);
}
@Test
public void testNewColumnMetadata() {
List<ColumnMetadata> columnMetadataList = PulsarMetadata.getPulsarColumns(stringTopicName,
StringSchema.utf8().getSchemaInfo(), false);
Assert.assertEquals(columnMetadataList.size(), 1);
ColumnMetadata columnMetadata = columnMetadataList.get(0);
Assert.assertEquals("__value__", columnMetadata.getName());
Assert.assertEquals("varchar", columnMetadata.getType().toString());
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册