提交 f458ec34 编写于 作者: S Sijie Guo 提交者: Matteo Merli

[schema] Introduce schema builder to build schema. (#3682)

*Motivation*

Currently we are supporting POJO based schema in java clients.
POJO schema is only useful when the POJO is predefined. However
in applications like a CDC pipeline, POJO is no predefined, there
is no other way to define a schema.

Since we are using avro schema for schema management, this PR
is proposing a simple schema builder wrapper on avro schema builder.

*Modifications*

Introduce schema builder to build a record schema.

*NOTES*

Currently we only support primitives in defining fields in a record schema in this PR.
We will add nested types in future PRs.
上级 89e3410b
......@@ -209,4 +209,5 @@ public interface Schema<T> {
static Schema<?> getSchema(SchemaInfo schemaInfo) {
return DefaultImplementation.getSchema(schemaInfo);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api.schema;
import org.apache.pulsar.common.schema.SchemaType;
/**
* Build a field for a record.
*/
public interface FieldSchemaBuilder<S extends FieldSchemaBuilder<S>> {
/**
* Set name-value pair properties for this field.
*
* @param name name of the property
* @param val value of the property
* @return field schema builder
*/
S property(String name, String val);
/**
* The documentation of this field.
*
* @param doc documentation
* @return field schema builder
*/
S doc(String doc);
/**
* The optional name aliases of this field.
*
* @param aliases the name aliases of this field
* @return field schema builder
*/
S aliases(String... aliases);
/**
* The type of this field.
*
* <p>Currently only primitive types are supported.
*
* @param type schema type of this field
* @return field schema builder
*/
S type(SchemaType type);
/**
* Make this field optional.
*
* @return field schema builder
*/
S optional();
/**
* Make this field required.
*
* @return field schema builder
*/
S required();
/**
* Set the default value of this field.
*
* <p>The value is validated against the schema type.
*
* @return value
*/
S defaultValue(Object value);
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api.schema;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
/**
* Building the schema for a {@link GenericRecord}.
*/
public interface RecordSchemaBuilder {
/**
* Attach val-name property pair to the record schema.
*
* @param name property name
* @param val property value
* @return record schema builder
*/
RecordSchemaBuilder property(String name, String val);
/**
* Add a field with the given name to the record.
*
* @param fieldName name of the field
* @return field schema builder to build the field.
*/
FieldSchemaBuilder field(String fieldName);
/**
* Add doc to the record schema.
*
* @param doc documentation
* @return field schema builder
*/
RecordSchemaBuilder doc(String doc);
/**
* Build the schema info.
*
* @return the schema info.
*/
SchemaInfo build(SchemaType schemaType);
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api.schema;
import org.apache.pulsar.client.internal.DefaultImplementation;
/**
* Builder to build schema.
*/
public interface SchemaBuilder {
/**
* Build the schema for a record.
*
* @param name name of the record.
* @return builder to build the schema for a record.
*/
static RecordSchemaBuilder record(String name) {
return DefaultImplementation.newRecordSchemaBuilder(name);
}
}
......@@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
......@@ -177,4 +178,10 @@ public class DefaultImplementation {
() -> (Schema<?>) getStaticMethod("org.apache.pulsar.client.impl.schema.AutoConsumeSchema",
"getSchema", SchemaInfo.class).invoke(null, schemaInfo));
}
public static RecordSchemaBuilder newRecordSchemaBuilder(String name) {
return catchExceptions(
() -> (RecordSchemaBuilder) getConstructor("org.apache.pulsar.client.impl.schema.RecordSchemaBuilderImpl",
String.class).newInstance(name));
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema;
import static java.util.Objects.requireNonNull;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaBuilder;
import org.apache.pulsar.client.api.schema.FieldSchemaBuilder;
import org.apache.pulsar.common.schema.SchemaType;
/**
* The default implementation of {@link FieldSchemaBuilder}.
*/
class FieldSchemaBuilderImpl implements FieldSchemaBuilder<FieldSchemaBuilderImpl> {
private final String fieldName;
private SchemaType type;
private boolean optional = false;
private Object defaultVal = null;
private final Map<String, String> properties = new HashMap<>();
private String doc;
private String[] aliases;
FieldSchemaBuilderImpl(String fieldName) {
this.fieldName = fieldName;
}
@Override
public FieldSchemaBuilderImpl property(String name, String val) {
properties.put(name, val);
return this;
}
@Override
public FieldSchemaBuilderImpl doc(String doc) {
this.doc = doc;
return this;
}
@Override
public FieldSchemaBuilderImpl aliases(String... aliases) {
this.aliases = aliases;
return this;
}
@Override
public FieldSchemaBuilderImpl type(SchemaType type) {
this.type = type;
return this;
}
@Override
public FieldSchemaBuilderImpl optional() {
optional = true;
return this;
}
@Override
public FieldSchemaBuilderImpl required() {
optional = false;
return this;
}
@Override
public FieldSchemaBuilderImpl defaultValue(Object value) {
defaultVal = value;
return this;
}
Field build() {
requireNonNull(type, "Schema type is not provided");
// verify the default value and object
SchemaUtils.validateFieldSchema(
fieldName,
type,
defaultVal
);
final Schema baseSchema;
switch (type) {
case INT32:
baseSchema = SchemaBuilder.builder().intType();
break;
case INT64:
baseSchema = SchemaBuilder.builder().longType();
break;
case STRING:
baseSchema = SchemaBuilder.builder().stringType();
break;
case FLOAT:
baseSchema = SchemaBuilder.builder().floatType();
break;
case DOUBLE:
baseSchema = SchemaBuilder.builder().doubleType();
break;
case BOOLEAN:
baseSchema = SchemaBuilder.builder().booleanType();
break;
case BYTES:
baseSchema = SchemaBuilder.builder().bytesType();
break;
default:
throw new RuntimeException("Schema `" + type + "` is not supported to be used as a field for now");
}
for (Map.Entry<String, String> entry : properties.entrySet()) {
baseSchema.addProp(entry.getKey(), entry.getValue());
}
if (null != aliases) {
for (String alias : aliases) {
baseSchema.addAlias(alias);
}
}
final Schema finalSchema;
if (optional) {
if (defaultVal != null) {
finalSchema = SchemaBuilder.builder().unionOf()
.type(baseSchema)
.and()
.nullType()
.endUnion();
} else {
finalSchema = SchemaBuilder.builder().unionOf()
.nullType()
.and()
.type(baseSchema)
.endUnion();
}
} else {
finalSchema = baseSchema;
}
final Object finalDefaultValue;
if (defaultVal != null) {
finalDefaultValue = SchemaUtils.toAvroObject(defaultVal);
} else {
if (optional) {
finalDefaultValue = JsonProperties.NULL_VALUE;
} else {
finalDefaultValue = null;
}
}
return new Field(
fieldName,
finalSchema,
doc,
finalDefaultValue
);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pulsar.client.api.schema.FieldSchemaBuilder;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
/**
* The default implementation of {@link RecordSchemaBuilder}.
*/
public class RecordSchemaBuilderImpl implements RecordSchemaBuilder {
public static final String NAMESPACE = "org.apache.pulsar.schema.record";
public static final String DEFAULT_SCHEMA_NAME = "PulsarDefault";
private final String name;
private final Map<String, String> properties;
private final List<FieldSchemaBuilderImpl> fields = new ArrayList<>();
private String doc;
public RecordSchemaBuilderImpl(String name) {
this.name = name;
this.properties = new HashMap<>();
}
@Override
public RecordSchemaBuilder property(String name, String val) {
this.properties.put(name, val);
return this;
}
@Override
public FieldSchemaBuilder field(String fieldName) {
FieldSchemaBuilderImpl field = new FieldSchemaBuilderImpl(fieldName);
fields.add(field);
return field;
}
@Override
public RecordSchemaBuilder doc(String doc) {
this.doc = doc;
return this;
}
@Override
public SchemaInfo build(SchemaType schemaType) {
switch (schemaType) {
case JSON:
case AVRO:
break;
default:
throw new RuntimeException("Currently only AVRO and JSON record schema is supported");
}
String schemaNs = NAMESPACE;
String schemaName = DEFAULT_SCHEMA_NAME;
if (name != null) {
String[] split = splitName(name);
schemaNs = split[0];
schemaName = split[1];
}
org.apache.avro.Schema baseSchema = org.apache.avro.Schema.createRecord(
schemaName != null ? schemaName : DEFAULT_SCHEMA_NAME,
doc,
schemaNs,
false
);
List<org.apache.avro.Schema.Field> avroFields = new ArrayList<>();
for (FieldSchemaBuilderImpl field : fields) {
avroFields.add(field.build());
}
baseSchema.setFields(avroFields);
return new SchemaInfo(
name,
baseSchema.toString().getBytes(UTF_8),
schemaType,
properties
);
}
/**
* Split a full dotted-syntax name into a namespace and a single-component name.
*/
private static String[] splitName(String fullName) {
String[] result = new String[2];
int indexLastDot = fullName.lastIndexOf('.');
if (indexLastDot >= 0) {
result[0] = fullName.substring(0, indexLastDot);
result[1] = fullName.substring(indexLastDot + 1);
} else {
result[0] = null;
result[1] = fullName;
}
return result;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pulsar.common.schema.SchemaType;
/**
* Utils for schemas.
*/
final class SchemaUtils {
private SchemaUtils() {}
/**
* Keeps a map between {@link SchemaType} to a list of java classes that can be used to represent them.
*/
private static final Map<SchemaType, List<Class>> SCHEMA_TYPE_CLASSES = new HashMap<>();
/**
* Maps the java classes to the corresponding {@link SchemaType}.
*/
private static final Map<Class<?>, SchemaType> JAVA_CLASS_SCHEMA_TYPES = new HashMap<>();
static {
// int8
SCHEMA_TYPE_CLASSES.put(
SchemaType.INT8,
Arrays.asList(Byte.class));
// int16
SCHEMA_TYPE_CLASSES.put(
SchemaType.INT16,
Arrays.asList(Short.class));
// int32
SCHEMA_TYPE_CLASSES.put(
SchemaType.INT32,
Arrays.asList(Integer.class));
// int64
SCHEMA_TYPE_CLASSES.put(
SchemaType.INT64,
Arrays.asList(Long.class));
// float
SCHEMA_TYPE_CLASSES.put(
SchemaType.FLOAT,
Arrays.asList(Float.class));
// double
SCHEMA_TYPE_CLASSES.put(
SchemaType.DOUBLE,
Arrays.asList(Double.class));
// boolean
SCHEMA_TYPE_CLASSES.put(
SchemaType.BOOLEAN,
Arrays.asList(Boolean.class));
// string
SCHEMA_TYPE_CLASSES.put(
SchemaType.STRING,
Arrays.asList(String.class));
// bytes
SCHEMA_TYPE_CLASSES.put(
SchemaType.BYTES,
Arrays.asList(byte[].class, ByteBuffer.class, ByteBuf.class));
// build the reverse mapping
SCHEMA_TYPE_CLASSES.forEach(
(type, classes) -> classes.forEach(clz -> JAVA_CLASS_SCHEMA_TYPES.put(clz, type)));
}
public static void validateFieldSchema(String name,
SchemaType type,
Object val) {
if (null == val) {
return;
}
List<Class> expectedClasses = SCHEMA_TYPE_CLASSES.get(type);
if (null == expectedClasses) {
throw new RuntimeException("Invalid Java object for schema type " + type
+ " : " + val.getClass()
+ " for field : \"" + name + "\"");
}
boolean foundMatch = false;
for (Class<?> expectedCls : expectedClasses) {
if (expectedCls.isInstance(val)) {
foundMatch = true;
break;
}
}
if (!foundMatch) {
throw new RuntimeException("Invalid Java object for schema type " + type
+ " : " + val.getClass()
+ " for field : \"" + name + "\"");
}
switch (type) {
case INT8:
case INT16:
case PROTOBUF:
case AVRO:
case AUTO_CONSUME:
case AUTO_PUBLISH:
case AUTO:
case KEY_VALUE:
case JSON:
case NONE:
throw new RuntimeException("Currently " + type.name() + " is not supported");
default:
break;
}
}
public static Object toAvroObject(Object value) {
if (value != null) {
if (value instanceof ByteBuffer) {
ByteBuffer bb = (ByteBuffer) value;
byte[] bytes = new byte[bb.remaining()];
bb.duplicate().get(bytes);
return bytes;
} else if (value instanceof ByteBuf) {
return ByteBufUtil.getBytes((ByteBuf) value);
} else {
return value;
}
} else {
return null;
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.client.api.schema.SchemaBuilder;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.annotations.Test;
/**
* Schema Builder Test.
*/
public class SchemaBuilderTest {
private static class AllOptionalFields {
private Integer intField;
private Long longField;
private String stringField;
private Boolean boolField;
private Float floatField;
private Double doubleField;
}
private static class AllPrimitiveFields {
private int intField;
private long longField;
private boolean boolField;
private float floatField;
private double doubleField;
}
@Test
public void testAllOptionalFieldsSchema() {
RecordSchemaBuilder recordSchemaBuilder =
SchemaBuilder.record("org.apache.pulsar.client.impl.schema.SchemaBuilderTest$.AllOptionalFields");
recordSchemaBuilder.field("intField")
.type(SchemaType.INT32).optional();
recordSchemaBuilder.field("longField")
.type(SchemaType.INT64).optional();
recordSchemaBuilder.field("stringField")
.type(SchemaType.STRING).optional();
recordSchemaBuilder.field("boolField")
.type(SchemaType.BOOLEAN).optional();
recordSchemaBuilder.field("floatField")
.type(SchemaType.FLOAT).optional();
recordSchemaBuilder.field("doubleField")
.type(SchemaType.DOUBLE).optional();
SchemaInfo schemaInfo = recordSchemaBuilder.build(
SchemaType.AVRO
);
Schema<AllOptionalFields> pojoSchema = Schema.AVRO(AllOptionalFields.class);
SchemaInfo pojoSchemaInfo = pojoSchema.getSchemaInfo();
org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(
new String(schemaInfo.getSchema(), UTF_8)
);
org.apache.avro.Schema avroPojoSchema = new org.apache.avro.Schema.Parser().parse(
new String(pojoSchemaInfo.getSchema(), UTF_8)
);
assertEquals(avroPojoSchema, avroSchema);
}
@Test
public void testAllPrimitiveFieldsSchema() {
RecordSchemaBuilder recordSchemaBuilder =
SchemaBuilder.record("org.apache.pulsar.client.impl.schema.SchemaBuilderTest$.AllPrimitiveFields");
recordSchemaBuilder.field("intField")
.type(SchemaType.INT32);
recordSchemaBuilder.field("longField")
.type(SchemaType.INT64);
recordSchemaBuilder.field("boolField")
.type(SchemaType.BOOLEAN);
recordSchemaBuilder.field("floatField")
.type(SchemaType.FLOAT);
recordSchemaBuilder.field("doubleField")
.type(SchemaType.DOUBLE);
SchemaInfo schemaInfo = recordSchemaBuilder.build(
SchemaType.AVRO
);
Schema<AllPrimitiveFields> pojoSchema = Schema.AVRO(AllPrimitiveFields.class);
SchemaInfo pojoSchemaInfo = pojoSchema.getSchemaInfo();
org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(
new String(schemaInfo.getSchema(), UTF_8)
);
org.apache.avro.Schema avroPojoSchema = new org.apache.avro.Schema.Parser().parse(
new String(pojoSchemaInfo.getSchema(), UTF_8)
);
assertEquals(avroPojoSchema, avroSchema);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册