diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java index 9aa9670746f6a454c80379196e5048f9b72d180f..d289afa4db99b7f9c2b701f818e520f4f9ef969f 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java @@ -209,4 +209,5 @@ public interface Schema { static Schema getSchema(SchemaInfo schemaInfo) { return DefaultImplementation.getSchema(schemaInfo); } + } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/FieldSchemaBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/FieldSchemaBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..afd2cf20768bbe0e773edc6387111b858b4757cc --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/FieldSchemaBuilder.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.schema; + +import org.apache.pulsar.common.schema.SchemaType; + +/** + * Build a field for a record. + */ +public interface FieldSchemaBuilder> { + + /** + * Set name-value pair properties for this field. + * + * @param name name of the property + * @param val value of the property + * @return field schema builder + */ + S property(String name, String val); + + /** + * The documentation of this field. + * + * @param doc documentation + * @return field schema builder + */ + S doc(String doc); + + /** + * The optional name aliases of this field. + * + * @param aliases the name aliases of this field + * @return field schema builder + */ + S aliases(String... aliases); + + /** + * The type of this field. + * + *

Currently only primitive types are supported. + * + * @param type schema type of this field + * @return field schema builder + */ + S type(SchemaType type); + + /** + * Make this field optional. + * + * @return field schema builder + */ + S optional(); + + /** + * Make this field required. + * + * @return field schema builder + */ + S required(); + + /** + * Set the default value of this field. + * + *

The value is validated against the schema type. + * + * @return value + */ + S defaultValue(Object value); + +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/RecordSchemaBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/RecordSchemaBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..a3e34d7411ab70ac8ae522ef917f251a969d6349 --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/RecordSchemaBuilder.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.schema; + +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; + +/** + * Building the schema for a {@link GenericRecord}. + */ +public interface RecordSchemaBuilder { + + /** + * Attach val-name property pair to the record schema. + * + * @param name property name + * @param val property value + * @return record schema builder + */ + RecordSchemaBuilder property(String name, String val); + + /** + * Add a field with the given name to the record. + * + * @param fieldName name of the field + * @return field schema builder to build the field. + */ + FieldSchemaBuilder field(String fieldName); + + /** + * Add doc to the record schema. + * + * @param doc documentation + * @return field schema builder + */ + RecordSchemaBuilder doc(String doc); + + /** + * Build the schema info. + * + * @return the schema info. + */ + SchemaInfo build(SchemaType schemaType); + +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..1956c43b35c2882411d84954d6b3c44dfe8f2168 --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaBuilder.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.schema; + +import org.apache.pulsar.client.internal.DefaultImplementation; + +/** + * Builder to build schema. + */ +public interface SchemaBuilder { + + /** + * Build the schema for a record. + * + * @param name name of the record. + * @return builder to build the schema for a record. + */ + static RecordSchemaBuilder record(String name) { + return DefaultImplementation.newRecordSchemaBuilder(name); + } + +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java index 6dfd301d3f3a35158b2552e4087ffe7838a393bf..b1889653c6658b986ba55b7cf5eb1cffd5dccf0a 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java @@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.api.schema.RecordSchemaBuilder; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -177,4 +178,10 @@ public class DefaultImplementation { () -> (Schema) getStaticMethod("org.apache.pulsar.client.impl.schema.AutoConsumeSchema", "getSchema", SchemaInfo.class).invoke(null, schemaInfo)); } + + public static RecordSchemaBuilder newRecordSchemaBuilder(String name) { + return catchExceptions( + () -> (RecordSchemaBuilder) getConstructor("org.apache.pulsar.client.impl.schema.RecordSchemaBuilderImpl", + String.class).newInstance(name)); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FieldSchemaBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FieldSchemaBuilderImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..67aca5be766298d85c2f9fa83091e352fbfae6eb --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FieldSchemaBuilderImpl.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.Map; +import org.apache.avro.JsonProperties; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.SchemaBuilder; +import org.apache.pulsar.client.api.schema.FieldSchemaBuilder; +import org.apache.pulsar.common.schema.SchemaType; + +/** + * The default implementation of {@link FieldSchemaBuilder}. + */ +class FieldSchemaBuilderImpl implements FieldSchemaBuilder { + + private final String fieldName; + + private SchemaType type; + private boolean optional = false; + private Object defaultVal = null; + private final Map properties = new HashMap<>(); + private String doc; + private String[] aliases; + + FieldSchemaBuilderImpl(String fieldName) { + this.fieldName = fieldName; + } + + @Override + public FieldSchemaBuilderImpl property(String name, String val) { + properties.put(name, val); + return this; + } + + @Override + public FieldSchemaBuilderImpl doc(String doc) { + this.doc = doc; + return this; + } + + @Override + public FieldSchemaBuilderImpl aliases(String... aliases) { + this.aliases = aliases; + return this; + } + + @Override + public FieldSchemaBuilderImpl type(SchemaType type) { + this.type = type; + return this; + } + + @Override + public FieldSchemaBuilderImpl optional() { + optional = true; + return this; + } + + @Override + public FieldSchemaBuilderImpl required() { + optional = false; + return this; + } + + @Override + public FieldSchemaBuilderImpl defaultValue(Object value) { + defaultVal = value; + return this; + } + + Field build() { + requireNonNull(type, "Schema type is not provided"); + // verify the default value and object + SchemaUtils.validateFieldSchema( + fieldName, + type, + defaultVal + ); + + final Schema baseSchema; + switch (type) { + case INT32: + baseSchema = SchemaBuilder.builder().intType(); + break; + case INT64: + baseSchema = SchemaBuilder.builder().longType(); + break; + case STRING: + baseSchema = SchemaBuilder.builder().stringType(); + break; + case FLOAT: + baseSchema = SchemaBuilder.builder().floatType(); + break; + case DOUBLE: + baseSchema = SchemaBuilder.builder().doubleType(); + break; + case BOOLEAN: + baseSchema = SchemaBuilder.builder().booleanType(); + break; + case BYTES: + baseSchema = SchemaBuilder.builder().bytesType(); + break; + default: + throw new RuntimeException("Schema `" + type + "` is not supported to be used as a field for now"); + } + + for (Map.Entry entry : properties.entrySet()) { + baseSchema.addProp(entry.getKey(), entry.getValue()); + } + + if (null != aliases) { + for (String alias : aliases) { + baseSchema.addAlias(alias); + } + } + + final Schema finalSchema; + if (optional) { + if (defaultVal != null) { + finalSchema = SchemaBuilder.builder().unionOf() + .type(baseSchema) + .and() + .nullType() + .endUnion(); + } else { + finalSchema = SchemaBuilder.builder().unionOf() + .nullType() + .and() + .type(baseSchema) + .endUnion(); + } + } else { + finalSchema = baseSchema; + } + + final Object finalDefaultValue; + if (defaultVal != null) { + finalDefaultValue = SchemaUtils.toAvroObject(defaultVal); + } else { + if (optional) { + finalDefaultValue = JsonProperties.NULL_VALUE; + } else { + finalDefaultValue = null; + } + } + + return new Field( + fieldName, + finalSchema, + doc, + finalDefaultValue + ); + } + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..9abede808f04f79f56b675610b8d632f6abda6cd --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pulsar.client.api.schema.FieldSchemaBuilder; +import org.apache.pulsar.client.api.schema.RecordSchemaBuilder; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; + +/** + * The default implementation of {@link RecordSchemaBuilder}. + */ +public class RecordSchemaBuilderImpl implements RecordSchemaBuilder { + + public static final String NAMESPACE = "org.apache.pulsar.schema.record"; + public static final String DEFAULT_SCHEMA_NAME = "PulsarDefault"; + + private final String name; + private final Map properties; + private final List fields = new ArrayList<>(); + private String doc; + + public RecordSchemaBuilderImpl(String name) { + this.name = name; + this.properties = new HashMap<>(); + } + + @Override + public RecordSchemaBuilder property(String name, String val) { + this.properties.put(name, val); + return this; + } + + @Override + public FieldSchemaBuilder field(String fieldName) { + FieldSchemaBuilderImpl field = new FieldSchemaBuilderImpl(fieldName); + fields.add(field); + return field; + } + + @Override + public RecordSchemaBuilder doc(String doc) { + this.doc = doc; + return this; + } + + @Override + public SchemaInfo build(SchemaType schemaType) { + switch (schemaType) { + case JSON: + case AVRO: + break; + default: + throw new RuntimeException("Currently only AVRO and JSON record schema is supported"); + } + + String schemaNs = NAMESPACE; + String schemaName = DEFAULT_SCHEMA_NAME; + if (name != null) { + String[] split = splitName(name); + schemaNs = split[0]; + schemaName = split[1]; + } + + org.apache.avro.Schema baseSchema = org.apache.avro.Schema.createRecord( + schemaName != null ? schemaName : DEFAULT_SCHEMA_NAME, + doc, + schemaNs, + false + ); + + List avroFields = new ArrayList<>(); + for (FieldSchemaBuilderImpl field : fields) { + avroFields.add(field.build()); + } + + baseSchema.setFields(avroFields); + return new SchemaInfo( + name, + baseSchema.toString().getBytes(UTF_8), + schemaType, + properties + ); + } + + /** + * Split a full dotted-syntax name into a namespace and a single-component name. + */ + private static String[] splitName(String fullName) { + String[] result = new String[2]; + int indexLastDot = fullName.lastIndexOf('.'); + if (indexLastDot >= 0) { + result[0] = fullName.substring(0, indexLastDot); + result[1] = fullName.substring(indexLastDot + 1); + } else { + result[0] = null; + result[1] = fullName; + } + return result; + } + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..dce719247a96c3512a6be9c6dd6df9af5a9643d5 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pulsar.common.schema.SchemaType; + +/** + * Utils for schemas. + */ +final class SchemaUtils { + + private SchemaUtils() {} + + /** + * Keeps a map between {@link SchemaType} to a list of java classes that can be used to represent them. + */ + private static final Map> SCHEMA_TYPE_CLASSES = new HashMap<>(); + + /** + * Maps the java classes to the corresponding {@link SchemaType}. + */ + private static final Map, SchemaType> JAVA_CLASS_SCHEMA_TYPES = new HashMap<>(); + + static { + // int8 + SCHEMA_TYPE_CLASSES.put( + SchemaType.INT8, + Arrays.asList(Byte.class)); + // int16 + SCHEMA_TYPE_CLASSES.put( + SchemaType.INT16, + Arrays.asList(Short.class)); + // int32 + SCHEMA_TYPE_CLASSES.put( + SchemaType.INT32, + Arrays.asList(Integer.class)); + // int64 + SCHEMA_TYPE_CLASSES.put( + SchemaType.INT64, + Arrays.asList(Long.class)); + // float + SCHEMA_TYPE_CLASSES.put( + SchemaType.FLOAT, + Arrays.asList(Float.class)); + // double + SCHEMA_TYPE_CLASSES.put( + SchemaType.DOUBLE, + Arrays.asList(Double.class)); + // boolean + SCHEMA_TYPE_CLASSES.put( + SchemaType.BOOLEAN, + Arrays.asList(Boolean.class)); + // string + SCHEMA_TYPE_CLASSES.put( + SchemaType.STRING, + Arrays.asList(String.class)); + // bytes + SCHEMA_TYPE_CLASSES.put( + SchemaType.BYTES, + Arrays.asList(byte[].class, ByteBuffer.class, ByteBuf.class)); + // build the reverse mapping + SCHEMA_TYPE_CLASSES.forEach( + (type, classes) -> classes.forEach(clz -> JAVA_CLASS_SCHEMA_TYPES.put(clz, type))); + } + + public static void validateFieldSchema(String name, + SchemaType type, + Object val) { + if (null == val) { + return; + } + + List expectedClasses = SCHEMA_TYPE_CLASSES.get(type); + + if (null == expectedClasses) { + throw new RuntimeException("Invalid Java object for schema type " + type + + " : " + val.getClass() + + " for field : \"" + name + "\""); + } + + boolean foundMatch = false; + for (Class expectedCls : expectedClasses) { + if (expectedCls.isInstance(val)) { + foundMatch = true; + break; + } + } + + if (!foundMatch) { + throw new RuntimeException("Invalid Java object for schema type " + type + + " : " + val.getClass() + + " for field : \"" + name + "\""); + } + + switch (type) { + case INT8: + case INT16: + case PROTOBUF: + case AVRO: + case AUTO_CONSUME: + case AUTO_PUBLISH: + case AUTO: + case KEY_VALUE: + case JSON: + case NONE: + throw new RuntimeException("Currently " + type.name() + " is not supported"); + default: + break; + } + } + + public static Object toAvroObject(Object value) { + if (value != null) { + if (value instanceof ByteBuffer) { + ByteBuffer bb = (ByteBuffer) value; + byte[] bytes = new byte[bb.remaining()]; + bb.duplicate().get(bytes); + return bytes; + } else if (value instanceof ByteBuf) { + return ByteBufUtil.getBytes((ByteBuf) value); + } else { + return value; + } + } else { + return null; + } + } + +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java new file mode 100644 index 0000000000000000000000000000000000000000..eb11bc8b63ab348c0d522bcd45fc9e650b744935 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.testng.Assert.assertEquals; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.RecordSchemaBuilder; +import org.apache.pulsar.client.api.schema.SchemaBuilder; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; +import org.testng.annotations.Test; + +/** + * Schema Builder Test. + */ +public class SchemaBuilderTest { + + private static class AllOptionalFields { + private Integer intField; + private Long longField; + private String stringField; + private Boolean boolField; + private Float floatField; + private Double doubleField; + } + + private static class AllPrimitiveFields { + private int intField; + private long longField; + private boolean boolField; + private float floatField; + private double doubleField; + } + + @Test + public void testAllOptionalFieldsSchema() { + RecordSchemaBuilder recordSchemaBuilder = + SchemaBuilder.record("org.apache.pulsar.client.impl.schema.SchemaBuilderTest$.AllOptionalFields"); + recordSchemaBuilder.field("intField") + .type(SchemaType.INT32).optional(); + recordSchemaBuilder.field("longField") + .type(SchemaType.INT64).optional(); + recordSchemaBuilder.field("stringField") + .type(SchemaType.STRING).optional(); + recordSchemaBuilder.field("boolField") + .type(SchemaType.BOOLEAN).optional(); + recordSchemaBuilder.field("floatField") + .type(SchemaType.FLOAT).optional(); + recordSchemaBuilder.field("doubleField") + .type(SchemaType.DOUBLE).optional(); + SchemaInfo schemaInfo = recordSchemaBuilder.build( + SchemaType.AVRO + ); + + Schema pojoSchema = Schema.AVRO(AllOptionalFields.class); + SchemaInfo pojoSchemaInfo = pojoSchema.getSchemaInfo(); + + org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse( + new String(schemaInfo.getSchema(), UTF_8) + ); + org.apache.avro.Schema avroPojoSchema = new org.apache.avro.Schema.Parser().parse( + new String(pojoSchemaInfo.getSchema(), UTF_8) + ); + + assertEquals(avroPojoSchema, avroSchema); + } + + @Test + public void testAllPrimitiveFieldsSchema() { + RecordSchemaBuilder recordSchemaBuilder = + SchemaBuilder.record("org.apache.pulsar.client.impl.schema.SchemaBuilderTest$.AllPrimitiveFields"); + recordSchemaBuilder.field("intField") + .type(SchemaType.INT32); + recordSchemaBuilder.field("longField") + .type(SchemaType.INT64); + recordSchemaBuilder.field("boolField") + .type(SchemaType.BOOLEAN); + recordSchemaBuilder.field("floatField") + .type(SchemaType.FLOAT); + recordSchemaBuilder.field("doubleField") + .type(SchemaType.DOUBLE); + SchemaInfo schemaInfo = recordSchemaBuilder.build( + SchemaType.AVRO + ); + + Schema pojoSchema = Schema.AVRO(AllPrimitiveFields.class); + SchemaInfo pojoSchemaInfo = pojoSchema.getSchemaInfo(); + + org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse( + new String(schemaInfo.getSchema(), UTF_8) + ); + org.apache.avro.Schema avroPojoSchema = new org.apache.avro.Schema.Parser().parse( + new String(pojoSchemaInfo.getSchema(), UTF_8) + ); + + assertEquals(avroPojoSchema, avroSchema); + } + +}