提交 6e76af2f 编写于 作者: C congbo 提交者: Matteo Merli

revise the schema default type not null (#3752)

Fix #3741

Support define not not allow null field in schema

Add not allow null field schema verify

Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes

Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (yes)
The default values of configurations: (no)
The wire protocol: (no)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)
上级 41d09796
......@@ -31,6 +31,7 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
......@@ -43,6 +44,8 @@ public class ClientGetSchemaTest extends ProducerConsumerBase {
private static final String topicString = "my-property/my-ns/topic-string";
private static final String topicJson = "my-property/my-ns/topic-json";
private static final String topicAvro = "my-property/my-ns/topic-avro";
private static final String topicJsonNotNull = "my-property/my-ns/topic-json-not-null";
private static final String topicAvroNotNull = "my-property/my-ns/topic-avro-not-null";
List<Producer<?>> producers = new ArrayList<>();
......@@ -62,6 +65,11 @@ public class ClientGetSchemaTest extends ProducerConsumerBase {
producers.add(pulsarClient.newProducer(Schema.STRING).topic(topicString).create());
producers.add(pulsarClient.newProducer(Schema.AVRO(MyClass.class)).topic(topicAvro).create());
producers.add(pulsarClient.newProducer(Schema.JSON(MyClass.class)).topic(topicJson).create());
producers.add(pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.<MyClass>builder().withPojo(MyClass.class).build())).topic(topicAvro).create());
producers.add(pulsarClient.newProducer(Schema.JSON(SchemaDefinition.<MyClass>builder().withPojo(MyClass.class).build())).topic(topicJson).create());
producers.add(pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.<MyClass>builder().withPojo(MyClass.class).withAlwaysAllowNull(false).build())).topic(topicAvroNotNull).create());
producers.add(pulsarClient.newProducer(Schema.JSON(SchemaDefinition.<MyClass>builder().withPojo(MyClass.class).withAlwaysAllowNull(false).build())).topic(topicJsonNotNull).create());
}
@AfterClass
......
......@@ -33,6 +33,7 @@ import lombok.ToString;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaInfo;
......@@ -51,11 +52,11 @@ public class JsonSchemaCompatibilityCheckTest extends BaseAvroSchemaCompatibilit
public void testJsonSchemaBackwardsCompatibility() throws JsonProcessingException {
SchemaData from = SchemaData.builder().data(OldJSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
SchemaData to = SchemaData.builder().data(JSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
SchemaData to = SchemaData.builder().data(JSONSchema.of(SchemaDefinition.builder().withPojo(Foo.class).build()).getSchemaInfo().getSchema()).build();
JsonSchemaCompatibilityCheck jsonSchemaCompatibilityCheck = new JsonSchemaCompatibilityCheck();
Assert.assertTrue(jsonSchemaCompatibilityCheck.isCompatible(from, to, SchemaCompatibilityStrategy.FULL));
from = SchemaData.builder().data(JSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
from = SchemaData.builder().data(JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build()).getSchemaInfo().getSchema()).build();
to = SchemaData.builder().data(OldJSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
Assert.assertTrue(jsonSchemaCompatibilityCheck.isCompatible(from, to, SchemaCompatibilityStrategy.FULL));
}
......
......@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.ProtobufSchema;
......@@ -63,7 +64,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Starting {} test --", methodName);
JSONSchema<JsonEncodedPojo> jsonSchema =
JSONSchema.of(JsonEncodedPojo.class);
JSONSchema.of(SchemaDefinition.<JsonEncodedPojo>builder().withPojo(JsonEncodedPojo.class).build());
Consumer<JsonEncodedPojo> consumer = pulsarClient
.newConsumer(jsonSchema)
......@@ -108,7 +109,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Starting {} test --", methodName);
JSONSchema<JsonEncodedPojo> jsonSchema =
JSONSchema.of(JsonEncodedPojo.class);
JSONSchema.of(SchemaDefinition.<JsonEncodedPojo>builder().withPojo(JsonEncodedPojo.class).build());
pulsar.getSchemaRegistryService()
.putSchemaIfAbsent("my-property/my-ns/my-topic1",
......@@ -166,7 +167,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
).get();
Consumer<JsonEncodedPojo> consumer = pulsarClient
.newConsumer(JSONSchema.of(JsonEncodedPojo.class))
.newConsumer(JSONSchema.of(SchemaDefinition.<JsonEncodedPojo>builder().withPojo(JsonEncodedPojo.class).build()))
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("my-subscriber-name")
.subscribe();
......@@ -194,7 +195,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
).get();
Producer<JsonEncodedPojo> producer = pulsarClient
.newProducer(JSONSchema.of(JsonEncodedPojo.class))
.newProducer(JSONSchema.of(SchemaDefinition.<JsonEncodedPojo>builder().withPojo(JsonEncodedPojo.class).build()))
.topic("persistent://my-property/use/my-ns/my-topic1")
.create();
......@@ -273,7 +274,9 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
).get();
Consumer<org.apache.pulsar.client.api.schema.proto.Test.TestMessageWrong> consumer = pulsarClient
.newConsumer(AvroSchema.of(org.apache.pulsar.client.api.schema.proto.Test.TestMessageWrong.class))
.newConsumer(AvroSchema.of
(SchemaDefinition.<org.apache.pulsar.client.api.schema.proto.Test.TestMessageWrong>builder().
withPojo(org.apache.pulsar.client.api.schema.proto.Test.TestMessageWrong.class).build()))
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("my-subscriber-name")
.subscribe();
......@@ -286,7 +289,8 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Starting {} test --", methodName);
AvroSchema<AvroEncodedPojo> avroSchema =
AvroSchema.of(AvroEncodedPojo.class);
AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
withPojo(AvroEncodedPojo.class).build());
Consumer<AvroEncodedPojo> consumer = pulsarClient
.newConsumer(avroSchema)
......@@ -355,7 +359,8 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
).get();
Consumer<AvroEncodedPojo> consumer = pulsarClient
.newConsumer(AvroSchema.of(AvroEncodedPojo.class))
.newConsumer(AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
withPojo(AvroEncodedPojo.class).build()))
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("my-subscriber-name")
.subscribe();
......@@ -454,7 +459,8 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Starting {} test --", methodName);
AvroSchema<AvroEncodedPojo> avroSchema =
AvroSchema.of(AvroEncodedPojo.class);
AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
withPojo(AvroEncodedPojo.class).build());
Producer<AvroEncodedPojo> producer = pulsarClient
.newProducer(avroSchema)
......@@ -502,7 +508,8 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Starting {} test --", methodName);
AvroSchema<AvroEncodedPojo> avroSchema =
AvroSchema.of(AvroEncodedPojo.class);
AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
withPojo(AvroEncodedPojo.class).build());
Producer<AvroEncodedPojo> producer = pulsarClient
.newProducer(avroSchema)
......@@ -548,7 +555,8 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Starting {} test --", methodName);
AvroSchema<AvroEncodedPojo> avroSchema =
AvroSchema.of(AvroEncodedPojo.class);
AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
withPojo(AvroEncodedPojo.class).build());
try (Producer<AvroEncodedPojo> producer = pulsarClient
.newProducer(avroSchema)
......
......@@ -19,10 +19,10 @@
package org.apache.pulsar.client.api;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
......@@ -169,58 +169,43 @@ public interface Schema<T> {
}
/**
* Create a Avro schema type by extracting the fields of the specified class.
*
* @param clazz the POJO class to be used to extract the Avro schema
* @return a Schema instance
*/
static <T> Schema<T> AVRO(Class<T> clazz) {
return DefaultImplementation.newAvroSchema(clazz);
}
/**
* Create a Avro schema type using the provided avro schema definition.
* Create a Avro schema type by default configuration of the class
*
* @param schemaDefinition avro schema definition
* @param pojo the POJO class to be used to extract the Avro schema
* @return a Schema instance
*/
static <T> Schema<T> AVRO(String schemaDefinition) {
return AVRO(schemaDefinition, Collections.emptyMap());
static <T> Schema<T> AVRO(Class<T> pojo) {
return DefaultImplementation.newAvroSchema(SchemaDefinition.builder().withPojo(pojo).build());
}
/**
* Create a Avro schema type using the provided avro schema definition.
* Create a Avro schema type with schema definition
*
* @param schemaDefinition avro schema definition
* @param properties pulsar schema properties
* @param schemaDefinition the definition of the schema
* @return a Schema instance
*/
static <T> Schema<T> AVRO(String schemaDefinition, Map<String, String> properties) {
return DefaultImplementation.newAvroSchema(schemaDefinition, properties);
static <T> Schema<T> AVRO(SchemaDefinition<T> schemaDefinition) {
return DefaultImplementation.newAvroSchema(schemaDefinition);
}
/**
* Create a JSON schema type by extracting the fields of the specified class.
*
* @param clazz the POJO class to be used to extract the JSON schema
* @param pojo the POJO class to be used to extract the JSON schema
* @return a Schema instance
*/
static <T> Schema<T> JSON(Class<T> clazz) {
return DefaultImplementation.newJSONSchema(clazz);
static <T> Schema<T> JSON(Class<T> pojo) {
return DefaultImplementation.newJSONSchema(SchemaDefinition.builder().withPojo(pojo).build());
}
/**
* Create a JSON schema type by extracting the fields of the specified class.
* Create a JSON schema type with schema definition
*
* @param clazz the POJO class to be used to extract the JSON schema
* @param schemaDefinition schema definition json string (using avro schema syntax)
* @param properties pulsar schema properties
* @param schemaDefinition the definition of the schema
* @return a Schema instance
*/
static <T> Schema<T> JSON(Class<T> clazz,
String schemaDefinition,
Map<String, String> properties) {
return DefaultImplementation.newJSONSchema(clazz, schemaDefinition, properties);
static <T> Schema<T> JSON(SchemaDefinition schemaDefinition) {
return DefaultImplementation.newJSONSchema(schemaDefinition);
}
/**
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api.schema;
import org.apache.pulsar.client.internal.DefaultImplementation;
import java.util.Map;
public interface SchemaDefinition<T> {
/**
* Get a new builder instance that can used to configure and build a {@link SchemaDefinition} instance.
*
* @return the {@link SchemaDefinition}
*/
static <T> SchemaDefinitionBuilder<T> builder() {
return DefaultImplementation.newSchemaDefinitionBuilder();
}
/**
* get schema whether always allow null or not
*
* @return schema always null or not
*/
public boolean getAlwaysAllowNull();
/**
* Get schema class
*
* @return schema class
*/
public Map<String, String> getProperties();
/**
* Get json schema definition
*
* @return schema class
*/
public String getJsonDef();
/**
* Get pojo schema definition
*
* @return pojo schema
*/
public Class<T> getPojo();
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api.schema;
import java.util.Map;
/**
* Builder to build schema definition {@link SchemaDefinition}.
*/
public interface SchemaDefinitionBuilder<T> {
/**
* Set schema whether always allow null or not
*
* @param alwaysAllowNull definition null or not
* @return schema definition builder
*/
SchemaDefinitionBuilder<T> withAlwaysAllowNull(boolean alwaysAllowNull);
/**
* Set schema info properties
*
* @param properties schema info properties
* @return schema definition builder
*/
SchemaDefinitionBuilder<T> withProperties(Map<String, String> properties);
/**
* Set schema info properties
*
* @param key property key
* @param value property value
*
* @return record schema definition
*/
SchemaDefinitionBuilder<T> addProperty(String key, String value);
/**
* Set schema of pojo definition
*
* @param pojo pojo schema definition
*
* @return record schema definition
*/
SchemaDefinitionBuilder<T> withPojo(Class pojo);
/**
* Set schema of json definition
*
* @param jsonDefinition json schema definition
*
* @return record schema definition
*/
SchemaDefinitionBuilder<T> withJsonDef(String jsonDefinition);
/**
* Build the schema definition.
*
* @return the schema definition.
*/
SchemaDefinition<T> build();
}
......@@ -37,9 +37,7 @@ import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.client.api.schema.*;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
......@@ -71,6 +69,13 @@ public class DefaultImplementation {
private static final Constructor<Authentication> AUTHENTICATION_TLS_String_String = getConstructor(
"org.apache.pulsar.client.impl.auth.AuthenticationTls", String.class, String.class);
private static final Constructor<SchemaDefinitionBuilder> SCHEMA_DEFINITION_BUILDER_CONSTRUCTOR = getConstructor(
"org.apache.pulsar.client.impl.schema.SchemaDefinitionBuilderImpl");
public static <T> SchemaDefinitionBuilder<T> newSchemaDefinitionBuilder() {
return catchExceptions(() -> (SchemaDefinitionBuilder<T>)SCHEMA_DEFINITION_BUILDER_CONSTRUCTOR.newInstance());
}
public static ClientBuilder newClientBuilder() {
return catchExceptions(() -> CLIENT_BUILDER_IMPL.newInstance());
}
......@@ -182,16 +187,10 @@ public class DefaultImplementation {
.newInstance());
}
public static <T> Schema<T> newAvroSchema(Class<T> clazz) {
public static <T> Schema<T> newAvroSchema(SchemaDefinition schemaDefinition) {
return catchExceptions(
() -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.AvroSchema", "of", Class.class)
.invoke(null, clazz));
}
public static <T> Schema<T> newAvroSchema(String schemaDefinition, Map<String, String> properties) {
return catchExceptions(
() -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.AvroSchema", "of", String.class, Map.class)
.invoke(null, schemaDefinition, properties));
() -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.AvroSchema", "of", SchemaDefinition.class)
.invoke(null,schemaDefinition));
}
public static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> newProtobufSchema(Class<T> clazz) {
......@@ -200,18 +199,10 @@ public class DefaultImplementation {
.invoke(null, clazz));
}
public static <T> Schema<T> newJSONSchema(Class<T> clazz) {
return catchExceptions(
() -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.JSONSchema", "of", Class.class)
.invoke(null, clazz));
}
public static <T> Schema<T> newJSONSchema(Class<T> clazz,
String schemaDefinition,
Map<String, String> properties) {
public static <T> Schema<T> newJSONSchema(SchemaDefinition schemaDefinition) {
return catchExceptions(
() -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.JSONSchema", "of", Class.class, String.class, Map.class)
.invoke(null, clazz, schemaDefinition, properties));
() -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.JSONSchema", "of", SchemaDefinition.class)
.invoke(null, schemaDefinition));
}
public static Schema<GenericRecord> newAutoConsumeSchema() {
......
......@@ -26,11 +26,12 @@ import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
/**
......@@ -48,12 +49,11 @@ public class AvroSchema<T> extends StructSchema<T> {
new ThreadLocal<>();
private AvroSchema(org.apache.avro.Schema schema,
Map<String, String> properties) {
SchemaDefinition schemaDefinition) {
super(
SchemaType.AVRO,
schema,
properties);
schemaDefinition.getProperties());
this.byteArrayOutputStream = new ByteArrayOutputStream();
this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, this.encoder);
this.datumWriter = new ReflectDatumWriter<>(this.schema);
......@@ -87,23 +87,23 @@ public class AvroSchema<T> extends StructSchema<T> {
}
}
public static <T> AvroSchema<T> of(Class<T> pojo) {
return new AvroSchema<>(createAvroSchema(pojo), Collections.emptyMap());
@Override
public SchemaInfo getSchemaInfo() {
return this.schemaInfo;
}
public static <T> AvroSchema<T> of(Class<T> pojo, Map<String, String> properties) {
return new AvroSchema<>(createAvroSchema(pojo), properties);
public static <T> AvroSchema<T> of(SchemaDefinition<T> schemaDefinition) {
return schemaDefinition.getJsonDef() == null ?
new AvroSchema<>(createAvroSchema(schemaDefinition), schemaDefinition) : new AvroSchema<>(parseAvroSchema(schemaDefinition.getJsonDef()), schemaDefinition);
}
/**
* Create an Avro schema based on provided schema definition.
*
* @param schemaDefinition avro schema definition
* @param properties schema properties
* @return avro schema instance
*/
public static <T> AvroSchema<T> of(String schemaDefinition, Map<String, String> properties) {
return new AvroSchema<>(parseAvroSchema(schemaDefinition), properties);
public static <T> AvroSchema<T> of(Class<T> pojo) {
return AvroSchema.of(SchemaDefinition.<T>builder().withPojo(pojo).build());
}
public static <T> AvroSchema<T> of(Class<T> pojo, Map<String, String> properties) {
SchemaDefinition<T> schemaDefinition = SchemaDefinition.<T>builder().withPojo(pojo).withProperties(properties).build();
return new AvroSchema<>(createAvroSchema(schemaDefinition), schemaDefinition);
}
}
......@@ -26,11 +26,11 @@ import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
/**
......@@ -38,7 +38,6 @@ import java.util.Map;
*/
@Slf4j
public class JSONSchema<T> extends StructSchema<T> {
// Cannot use org.apache.pulsar.common.util.ObjectMapperFactory.getThreadLocal() because it does not
// return shaded version of object mapper
private static final ThreadLocal<ObjectMapper> JSON_MAPPER = ThreadLocal.withInitial(() -> {
......@@ -51,14 +50,13 @@ public class JSONSchema<T> extends StructSchema<T> {
private final Class<T> pojo;
private final ObjectMapper objectMapper;
private JSONSchema(Class<T> pojo,
org.apache.avro.Schema schema,
Map<String, String> properties) {
private JSONSchema(org.apache.avro.Schema schema,
SchemaDefinition<T> schemaDefinition) {
super(
SchemaType.JSON,
schema,
properties);
this.pojo = pojo;
schemaDefinition.getProperties());
this.pojo = schemaDefinition.getPojo();
this.objectMapper = JSON_MAPPER.get();
}
......@@ -89,6 +87,7 @@ public class JSONSchema<T> extends StructSchema<T> {
* Implemented for backwards compatibility reasons
* since the original schema generated by JSONSchema was based off the json schema standard
* since then we have standardized on Avro
*
* @return
*/
public SchemaInfo getBackwardsCompatibleJsonSchemaInfo() {
......@@ -108,25 +107,18 @@ public class JSONSchema<T> extends StructSchema<T> {
return backwardsCompatibleSchemaInfo;
}
public static <T> JSONSchema<T> of(SchemaDefinition<T> schemaDefinition) {
String jsonDef = schemaDefinition.getJsonDef();
return jsonDef == null ? new JSONSchema<>(createAvroSchema(schemaDefinition), schemaDefinition) :
new JSONSchema<>(parseAvroSchema(jsonDef), schemaDefinition);
}
public static <T> JSONSchema<T> of(Class<T> pojo) {
return new JSONSchema<>(pojo, createAvroSchema(pojo), Collections.emptyMap());
return JSONSchema.of(SchemaDefinition.<T>builder().withPojo(pojo).build());
}
public static <T> JSONSchema<T> of(Class<T> pojo, Map<String, String> properties) {
return new JSONSchema<>(pojo, createAvroSchema(pojo), properties);
return JSONSchema.of(SchemaDefinition.<T>builder().withPojo(pojo).withProperties(properties).build());
}
/**
* Create an json schema based on provided schema definition.
*
* @param pojo pojo class
* @param schemaDefinition avro schema definition
* @param properties schema properties
* @return avro schema instance
*/
public static <T> JSONSchema<T> of(Class<T> pojo,
String schemaDefinition,
Map<String, String> properties) {
return new JSONSchema<>(pojo, parseAvroSchema(schemaDefinition), properties);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaDefinitionBuilder;
import java.util.HashMap;
import java.util.Map;
/**
* Builder to build {@link org.apache.pulsar.client.api.schema.GenericRecord}.
*/
public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T> {
public static final String ALWAYS_ALLOW_NULL = "__alwaysAllowNull";
/**
* the schema definition class
*/
private Class<T> clazz;
/**
* The flag of schema type always allow null
*
* If it's true, will make all of the pojo field generate schema
* define default can be null,false default can't be null, but it's
* false you can define the field by yourself by the annotation@Nullable
*
*/
private boolean alwaysAllowNull = true;
/**
* The schema info properties
*/
private Map<String, String> properties = new HashMap<>();
/**
* The json schema definition
*/
private String jsonDef;
@Override
public SchemaDefinitionBuilder<T> withAlwaysAllowNull(boolean alwaysAllowNull) {
this.alwaysAllowNull = alwaysAllowNull;
return this;
}
@Override
public SchemaDefinitionBuilder<T> addProperty(String key, String value) {
this.properties.put(key, value);
return this;
}
@Override
public SchemaDefinitionBuilder<T> withPojo(Class clazz) {
this.clazz = clazz;
return this;
}
@Override
public SchemaDefinitionBuilder<T> withJsonDef(String jsonDef) {
this.jsonDef = jsonDef;
return this;
}
@Override
public SchemaDefinitionBuilder<T> withProperties(Map<String,String> properties) {
this.properties = properties;
return this;
}
@Override
public SchemaDefinition<T> build() {
properties.put(ALWAYS_ALLOW_NULL, this.alwaysAllowNull ? "true" : "false");
return new SchemaDefinitionImpl(clazz, jsonDef, alwaysAllowNull, properties);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import java.util.HashMap;
import java.util.Map;
/**
* A json schema definition
* {@link org.apache.pulsar.client.api.schema.SchemaDefinition} for the json schema definition.
*/
public class SchemaDefinitionImpl<T> implements SchemaDefinition<T>{
/**
* the schema definition class
*/
private Class<T> pojo;
/**
* The flag of schema type always allow null
*
* If it's true, will make all of the pojo field generate schema
* define default can be null,false default can't be null, but it's
* false you can define the field by yourself by the annotation@Nullable
*
*/
private boolean alwaysAllowNull;
private Map<String, String> properties;
private String jsonDef;
public SchemaDefinitionImpl(Class<T> pojo, String jsonDef, boolean alwaysAllowNull, Map<String,String> properties) {
this.alwaysAllowNull = alwaysAllowNull;
this.properties = properties;
this.jsonDef = jsonDef;
this.pojo = pojo;
}
/**
* get schema whether always allow null or not
*
* @return schema always null or not
*/
public boolean getAlwaysAllowNull() {
return alwaysAllowNull;
}
/**
* Get json schema definition
*
* @return schema class
*/
public String getJsonDef() {
return jsonDef;
}
/**
* Get pojo schema definition
*
* @return pojo class
*/
@Override
public Class<T> getPojo() {
return pojo;
}
/**
* Get schema class
*
* @return schema class
*/
public Map<String, String> getProperties() {
return properties;
}
}
......@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.avro.Schema.Parser;
import org.apache.avro.reflect.ReflectData;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
......@@ -62,13 +63,14 @@ abstract class StructSchema<T> implements Schema<T> {
return this.schemaInfo;
}
protected static <T> org.apache.avro.Schema createAvroSchema(Class<T> pojo) {
return ReflectData.AllowNull.get().getSchema(pojo);
protected static org.apache.avro.Schema createAvroSchema(SchemaDefinition schemaDefinition) {
Class pojo = schemaDefinition.getPojo();
return schemaDefinition.getAlwaysAllowNull() ? ReflectData.AllowNull.get().getSchema(pojo) : ReflectData.get().getSchema(pojo);
}
protected static org.apache.avro.Schema parseAvroSchema(String definition) {
protected static org.apache.avro.Schema parseAvroSchema(String jsonDef) {
Parser parser = new Parser();
return parser.parse(definition);
return parser.parse(jsonDef);
}
}
......@@ -20,29 +20,34 @@ package org.apache.pulsar.client.impl.schema;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.FOO_FIELDS;
import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_JSON;
import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_AVRO_NOT_ALLOW_NULL;
import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_AVRO_ALLOW_NULL;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.fail;
import java.util.Arrays;
import java.util.Collections;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.avro.SchemaValidationException;
import org.apache.avro.SchemaValidator;
import org.apache.avro.SchemaValidatorBuilder;
import org.apache.avro.reflect.AvroDefault;
import org.apache.avro.reflect.Nullable;
import org.apache.avro.reflect.ReflectData;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.Assert;
import org.testng.annotations.Test;
@Slf4j
public class AvroSchemaTest {
......@@ -88,7 +93,7 @@ public class AvroSchemaTest {
// expected
}
AvroSchema<StructWithAnnotations> schema3 = AvroSchema.of(schemaDef1, Collections.emptyMap());
AvroSchema<StructWithAnnotations> schema3 = AvroSchema.of(SchemaDefinition.<StructWithAnnotations>builder().withJsonDef(schemaDef1).build());
String schemaDef3 = new String(schema3.getSchemaInfo().getSchema(), UTF_8);
assertEquals(schemaDef1, schemaDef3);
assertNotEquals(schemaDef2, schemaDef3);
......@@ -108,12 +113,12 @@ public class AvroSchemaTest {
}
@Test
public void testSchema() {
AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class);
public void testNotAllowNullSchema() {
AvroSchema<Foo> avroSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
assertEquals(avroSchema.getSchemaInfo().getType(), SchemaType.AVRO);
Schema.Parser parser = new Schema.Parser();
String schemaJson = new String(avroSchema.getSchemaInfo().getSchema());
assertEquals(schemaJson, SCHEMA_JSON);
assertEquals(schemaJson, SCHEMA_AVRO_NOT_ALLOW_NULL);
Schema schema = parser.parse(schemaJson);
for (String fieldName : FOO_FIELDS) {
......@@ -123,12 +128,66 @@ public class AvroSchemaTest {
if (field.name().equals("field4")) {
Assert.assertNotNull(field.schema().getTypes().get(1).getField("field1"));
}
if (field.name().equals("fieldUnableNull")) {
Assert.assertNotNull(field.schema().getType());
}
}
}
@Test
public void testEncodeAndDecode() {
AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class, null);
public void testAllowNullSchema() {
AvroSchema<Foo> avroSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
assertEquals(avroSchema.getSchemaInfo().getType(), SchemaType.AVRO);
Schema.Parser parser = new Schema.Parser();
String schemaJson = new String(avroSchema.getSchemaInfo().getSchema());
assertEquals(schemaJson, SCHEMA_AVRO_ALLOW_NULL);
Schema schema = parser.parse(schemaJson);
for (String fieldName : FOO_FIELDS) {
Schema.Field field = schema.getField(fieldName);
Assert.assertNotNull(field);
if (field.name().equals("field4")) {
Assert.assertNotNull(field.schema().getTypes().get(1).getField("field1"));
}
if (field.name().equals("fieldUnableNull")) {
Assert.assertNotNull(field.schema().getType());
}
}
}
@Test
public void testNotAllowNullEncodeAndDecode() {
AvroSchema<Foo> avroSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
Foo foo1 = new Foo();
foo1.setField1("foo1");
foo1.setField2("bar1");
foo1.setField4(new Bar());
foo1.setFieldUnableNull("notNull");
Foo foo2 = new Foo();
foo2.setField1("foo2");
foo2.setField2("bar2");
byte[] bytes1 = avroSchema.encode(foo1);
Foo object1 = avroSchema.decode(bytes1);
Assert.assertTrue(bytes1.length > 0);
assertEquals(object1, foo1);
try {
avroSchema.encode(foo2);
} catch (Exception e) {
Assert.assertTrue(e instanceof SchemaSerializationException);
}
}
@Test
public void testAllowNullEncodeAndDecode() {
AvroSchema<Foo> avroSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
Foo foo1 = new Foo();
foo1.setField1("foo1");
......@@ -150,6 +209,8 @@ public class AvroSchemaTest {
assertEquals(object1, foo1);
assertEquals(object2, foo2);
}
}
......@@ -24,7 +24,7 @@ import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.DerivedFoo;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
......@@ -35,18 +35,20 @@ import org.testng.Assert;
import org.testng.annotations.Test;
import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.FOO_FIELDS;
import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_JSON;
import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_JSON_NOT_ALLOW_NULL;
import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_JSON_ALLOW_NULL;
import static org.testng.Assert.assertEquals;
@Slf4j
public class JSONSchemaTest {
@Test
public void testSchema() {
JSONSchema<Foo> jsonSchema = JSONSchema.of(Foo.class);
public void testNotAllowNullSchema() {
JSONSchema<Foo> jsonSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
Assert.assertEquals(jsonSchema.getSchemaInfo().getType(), SchemaType.JSON);
Schema.Parser parser = new Schema.Parser();
String schemaJson = new String(jsonSchema.getSchemaInfo().getSchema());
Assert.assertEquals(schemaJson, SCHEMA_JSON);
Assert.assertEquals(schemaJson, SCHEMA_JSON_NOT_ALLOW_NULL);
Schema schema = parser.parse(schemaJson);
for (String fieldName : FOO_FIELDS) {
......@@ -56,12 +58,37 @@ public class JSONSchemaTest {
if (field.name().equals("field4")) {
Assert.assertNotNull(field.schema().getTypes().get(1).getField("field1"));
}
if (field.name().equals("fieldUnableNull")) {
Assert.assertNotNull(field.schema().getType());
}
}
}
@Test
public void testEncodeAndDecode() {
JSONSchema<Foo> jsonSchema = JSONSchema.of(Foo.class, null);
public void testAllowNullSchema() {
JSONSchema<Foo> jsonSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
Assert.assertEquals(jsonSchema.getSchemaInfo().getType(), SchemaType.JSON);
Schema.Parser parser = new Schema.Parser();
String schemaJson = new String(jsonSchema.getSchemaInfo().getSchema());
Assert.assertEquals(schemaJson, SCHEMA_JSON_ALLOW_NULL);
Schema schema = parser.parse(schemaJson);
for (String fieldName : FOO_FIELDS) {
Schema.Field field = schema.getField(fieldName);
Assert.assertNotNull(field);
if (field.name().equals("field4")) {
Assert.assertNotNull(field.schema().getTypes().get(1).getField("field1"));
}
if (field.name().equals("fieldUnableNull")) {
Assert.assertNotNull(field.schema().getType());
}
}
}
@Test
public void testAllowNullEncodeAndDecode() {
JSONSchema<Foo> jsonSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
Bar bar = new Bar();
bar.setField1(true);
......@@ -90,9 +117,65 @@ public class JSONSchemaTest {
}
@Test
public void testNestedClasses() {
JSONSchema<NestedBar> jsonSchema = JSONSchema.of(NestedBar.class, null);
JSONSchema<NestedBarList> listJsonSchema = JSONSchema.of(NestedBarList.class, null);
public void testNotAllowNullEncodeAndDecode() {
JSONSchema<Foo> jsonSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
Foo foo1 = new Foo();
foo1.setField1("foo1");
foo1.setField2("bar1");
foo1.setField4(new Bar());
foo1.setFieldUnableNull("notNull");
Foo foo2 = new Foo();
foo2.setField1("foo2");
foo2.setField2("bar2");
byte[] bytes1 = jsonSchema.encode(foo1);
Foo object1 = jsonSchema.decode(bytes1);
Assert.assertTrue(bytes1.length > 0);
assertEquals(object1, foo1);
try {
jsonSchema.encode(foo2);
} catch (Exception e) {
Assert.assertTrue(e instanceof SchemaSerializationException);
}
}
@Test
public void testAllowNullNestedClasses() {
JSONSchema<NestedBar> jsonSchema = JSONSchema.of(SchemaDefinition.<NestedBar>builder().withPojo(NestedBar.class).build());
JSONSchema<NestedBarList> listJsonSchema = JSONSchema.of(SchemaDefinition.<NestedBarList>builder().withPojo(NestedBarList.class).build());
Bar bar = new Bar();
bar.setField1(true);
NestedBar nested = new NestedBar();
nested.setField1(true);
nested.setNested(bar);
byte[] bytes = jsonSchema.encode(nested);
Assert.assertTrue(bytes.length > 0);
Assert.assertEquals(jsonSchema.decode(bytes), nested);
List<Bar> list = Collections.singletonList(bar);
NestedBarList nestedList = new NestedBarList();
nestedList.setField1(true);
nestedList.setList(list);
bytes = listJsonSchema.encode(nestedList);
Assert.assertTrue(bytes.length > 0);
Assert.assertEquals(listJsonSchema.decode(bytes), nestedList);
}
@Test
public void testNotAllowNullNestedClasses() {
JSONSchema<NestedBar> jsonSchema = JSONSchema.of(SchemaDefinition.<NestedBar>builder().withPojo(NestedBar.class).withAlwaysAllowNull(false).build());
JSONSchema<NestedBarList> listJsonSchema = JSONSchema.of(SchemaDefinition.<NestedBarList>builder().withPojo(NestedBarList.class).withAlwaysAllowNull(false).build());
Bar bar = new Bar();
bar.setField1(true);
......@@ -117,7 +200,59 @@ public class JSONSchemaTest {
}
@Test
public void testCorrectPolymorphism() {
public void testNotAllowNullCorrectPolymorphism() {
Bar bar = new Bar();
bar.setField1(true);
DerivedFoo derivedFoo = new DerivedFoo();
derivedFoo.setField1("foo1");
derivedFoo.setField2("bar2");
derivedFoo.setField3(4);
derivedFoo.setField4(bar);
derivedFoo.setField5("derived1");
derivedFoo.setField6(2);
Foo foo = new Foo();
foo.setField1("foo1");
foo.setField2("bar2");
foo.setField3(4);
foo.setField4(bar);
SchemaTestUtils.DerivedDerivedFoo derivedDerivedFoo = new SchemaTestUtils.DerivedDerivedFoo();
derivedDerivedFoo.setField1("foo1");
derivedDerivedFoo.setField2("bar2");
derivedDerivedFoo.setField3(4);
derivedDerivedFoo.setField4(bar);
derivedDerivedFoo.setField5("derived1");
derivedDerivedFoo.setField6(2);
derivedDerivedFoo.setFoo2(foo);
derivedDerivedFoo.setDerivedFoo(derivedFoo);
// schema for base class
JSONSchema<Foo> baseJsonSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
Assert.assertEquals(baseJsonSchema.decode(baseJsonSchema.encode(foo)), foo);
Assert.assertEquals(baseJsonSchema.decode(baseJsonSchema.encode(derivedFoo)), foo);
Assert.assertEquals(baseJsonSchema.decode(baseJsonSchema.encode(derivedDerivedFoo)), foo);
// schema for derived class
JSONSchema<DerivedFoo> derivedJsonSchema = JSONSchema.of(SchemaDefinition.<DerivedFoo>builder().withPojo(DerivedFoo.class).build());
Assert.assertEquals(derivedJsonSchema.decode(derivedJsonSchema.encode(derivedFoo)), derivedFoo);
Assert.assertEquals(derivedJsonSchema.decode(derivedJsonSchema.encode(derivedDerivedFoo)), derivedFoo);
//schema for derived derived class
JSONSchema<SchemaTestUtils.DerivedDerivedFoo> derivedDerivedJsonSchema
= JSONSchema.of(SchemaDefinition.<SchemaTestUtils.DerivedDerivedFoo>builder().withPojo(SchemaTestUtils.DerivedDerivedFoo.class).build());
Assert.assertEquals(derivedDerivedJsonSchema.decode(derivedDerivedJsonSchema.encode(derivedDerivedFoo)), derivedDerivedFoo);
}
@Test(expectedExceptions = SchemaSerializationException.class)
public void testAllowNullDecodeWithInvalidContent() {
JSONSchema<Foo> jsonSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
jsonSchema.decode(new byte[0]);
}
@Test
public void testAllowNullCorrectPolymorphism() {
Bar bar = new Bar();
bar.setField1(true);
......@@ -146,25 +281,25 @@ public class JSONSchemaTest {
derivedDerivedFoo.setDerivedFoo(derivedFoo);
// schema for base class
JSONSchema<Foo> baseJsonSchema = JSONSchema.of(Foo.class);
JSONSchema<Foo> baseJsonSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
Assert.assertEquals(baseJsonSchema.decode(baseJsonSchema.encode(foo)), foo);
Assert.assertEquals(baseJsonSchema.decode(baseJsonSchema.encode(derivedFoo)), foo);
Assert.assertEquals(baseJsonSchema.decode(baseJsonSchema.encode(derivedDerivedFoo)), foo);
// schema for derived class
JSONSchema<DerivedFoo> derivedJsonSchema = JSONSchema.of(DerivedFoo.class);
JSONSchema<DerivedFoo> derivedJsonSchema = JSONSchema.of(SchemaDefinition.<DerivedFoo>builder().withPojo(DerivedFoo.class).withAlwaysAllowNull(false).build());
Assert.assertEquals(derivedJsonSchema.decode(derivedJsonSchema.encode(derivedFoo)), derivedFoo);
Assert.assertEquals(derivedJsonSchema.decode(derivedJsonSchema.encode(derivedDerivedFoo)), derivedFoo);
//schema for derived derived class
JSONSchema<SchemaTestUtils.DerivedDerivedFoo> derivedDerivedJsonSchema
= JSONSchema.of(SchemaTestUtils.DerivedDerivedFoo.class);
= JSONSchema.of(SchemaDefinition.<SchemaTestUtils.DerivedDerivedFoo>builder().withPojo(SchemaTestUtils.DerivedDerivedFoo.class).withAlwaysAllowNull(false).build());
Assert.assertEquals(derivedDerivedJsonSchema.decode(derivedDerivedJsonSchema.encode(derivedDerivedFoo)), derivedDerivedFoo);
}
@Test(expectedExceptions = SchemaSerializationException.class)
public void testDecodeWithInvalidContent() {
JSONSchema<Foo> jsonSchema = JSONSchema.of(Foo.class);
public void testNotAllowNullDecodeWithInvalidContent() {
JSONSchema<Foo> jsonSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
jsonSchema.decode(new byte[0]);
}
}
......@@ -22,9 +22,7 @@ import static org.testng.Assert.assertEquals;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Color;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
......@@ -37,9 +35,9 @@ import org.testng.annotations.Test;
public class KeyValueSchemaTest {
@Test
public void testAvroSchemaCreate() {
AvroSchema<Foo> fooSchema = AvroSchema.of(Foo.class);
AvroSchema<Bar> barSchema = AvroSchema.of(Bar.class);
public void testAllowNullAvroSchemaCreate() {
AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
Schema<KeyValue<Foo, Bar>> keyValueSchema1 = Schema.KeyValue(fooSchema, barSchema);
Schema<KeyValue<Foo, Bar>> keyValueSchema2 = Schema.KeyValue(Foo.class, Bar.class, SchemaType.AVRO);
......@@ -47,13 +45,39 @@ public class KeyValueSchemaTest {
assertEquals(keyValueSchema1.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
assertEquals(keyValueSchema2.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
SchemaType.AVRO);
assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
SchemaType.AVRO);
assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
SchemaType.AVRO);
assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
SchemaType.AVRO);
String schemaInfo1 = new String(keyValueSchema1.getSchemaInfo().getSchema());
String schemaInfo2 = new String(keyValueSchema2.getSchemaInfo().getSchema());
assertEquals(schemaInfo1, schemaInfo2);
}
@Test
public void testNotAllowNullAvroSchemaCreate() {
AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).withAlwaysAllowNull(false).build());
Schema<KeyValue<Foo, Bar>> keyValueSchema1 = Schema.KeyValue(fooSchema, barSchema);
Schema<KeyValue<Foo, Bar>> keyValueSchema2 = Schema.KeyValue(AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build()),
AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).withAlwaysAllowNull(false).build()));
assertEquals(keyValueSchema1.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
assertEquals(keyValueSchema2.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
SchemaType.AVRO);
assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
SchemaType.AVRO);
assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
SchemaType.AVRO);
assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
SchemaType.AVRO);
String schemaInfo1 = new String(keyValueSchema1.getSchemaInfo().getSchema());
......@@ -62,9 +86,9 @@ public class KeyValueSchemaTest {
}
@Test
public void testJsonSchemaCreate() {
JSONSchema<Foo> fooSchema = JSONSchema.of(Foo.class);
JSONSchema<Bar> barSchema = JSONSchema.of(Bar.class);
public void testAllowNullJsonSchemaCreate() {
JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
Schema<KeyValue<Foo, Bar>> keyValueSchema1 = Schema.KeyValue(fooSchema, barSchema);
Schema<KeyValue<Foo, Bar>> keyValueSchema2 = Schema.KeyValue(Foo.class, Bar.class, SchemaType.JSON);
......@@ -74,17 +98,53 @@ public class KeyValueSchemaTest {
assertEquals(keyValueSchema2.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
assertEquals(keyValueSchema3.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
SchemaType.JSON);
assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
SchemaType.JSON);
assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
SchemaType.JSON);
assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
SchemaType.JSON);
assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema3).getKeySchema().getSchemaInfo().getType(),
SchemaType.JSON);
assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema3).getValueSchema().getSchemaInfo().getType(),
SchemaType.JSON);
String schemaInfo1 = new String(keyValueSchema1.getSchemaInfo().getSchema());
String schemaInfo2 = new String(keyValueSchema2.getSchemaInfo().getSchema());
String schemaInfo3 = new String(keyValueSchema3.getSchemaInfo().getSchema());
assertEquals(schemaInfo1, schemaInfo2);
assertEquals(schemaInfo1, schemaInfo3);
}
@Test
public void testNotAllowNullJsonSchemaCreate() {
JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).withAlwaysAllowNull(false).build());
Schema<KeyValue<Foo, Bar>> keyValueSchema1 = Schema.KeyValue(fooSchema, barSchema);
Schema<KeyValue<Foo, Bar>> keyValueSchema2 = Schema.KeyValue(JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build()),
JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).withAlwaysAllowNull(false).build()));
Schema<KeyValue<Foo, Bar>> keyValueSchema3 = Schema.KeyValue(JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build()),
JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).withAlwaysAllowNull(false).build()));
assertEquals(keyValueSchema1.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
assertEquals(keyValueSchema2.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
assertEquals(keyValueSchema3.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
SchemaType.JSON);
assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
SchemaType.JSON);
assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
SchemaType.JSON);
assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
SchemaType.JSON);
assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema3).getKeySchema().getSchemaInfo().getType(),
assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema3).getKeySchema().getSchemaInfo().getType(),
SchemaType.JSON);
assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema3).getValueSchema().getSchemaInfo().getType(),
assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema3).getValueSchema().getSchemaInfo().getType(),
SchemaType.JSON);
String schemaInfo1 = new String(keyValueSchema1.getSchemaInfo().getSchema());
......@@ -95,7 +155,7 @@ public class KeyValueSchemaTest {
}
@Test
public void testSchemaEncodeAndDecode() {
public void testAllowNullSchemaEncodeAndDecode() {
Schema keyValueSchema = Schema.KeyValue(Foo.class, Bar.class);
Bar bar = new Bar();
......@@ -111,7 +171,7 @@ public class KeyValueSchemaTest {
byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
Assert.assertTrue(encodeBytes.length > 0);
KeyValue<Foo, Bar> keyValue = (KeyValue<Foo, Bar>)keyValueSchema.decode(encodeBytes);
KeyValue<Foo, Bar> keyValue = (KeyValue<Foo, Bar>) keyValueSchema.decode(encodeBytes);
Foo fooBack = keyValue.getKey();
Bar barBack = keyValue.getValue();
......@@ -120,9 +180,64 @@ public class KeyValueSchemaTest {
}
@Test
public void testBytesSchemaEncodeAndDecode() {
AvroSchema<Foo> fooAvroSchema = AvroSchema.of(Foo.class);
AvroSchema<Bar> barAvroSchema = AvroSchema.of(Bar.class);
public void testNotAllowNullSchemaEncodeAndDecode() {
Schema keyValueSchema = Schema.KeyValue(JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build()),
JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).withAlwaysAllowNull(false).build()));
Bar bar = new Bar();
bar.setField1(true);
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(3);
foo.setField4(bar);
foo.setColor(Color.RED);
byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
Assert.assertTrue(encodeBytes.length > 0);
KeyValue<Foo, Bar> keyValue = (KeyValue<Foo, Bar>) keyValueSchema.decode(encodeBytes);
Foo fooBack = keyValue.getKey();
Bar barBack = keyValue.getValue();
assertEquals(foo, fooBack);
assertEquals(bar, barBack);
}
@Test
public void testAllowNullBytesSchemaEncodeAndDecode() {
AvroSchema<Foo> fooAvroSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
AvroSchema<Bar> barAvroSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
Bar bar = new Bar();
bar.setField1(true);
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(3);
foo.setField4(bar);
foo.setColor(Color.RED);
foo.setFieldUnableNull("notNull");
byte[] fooBytes = fooAvroSchema.encode(foo);
byte[] barBytes = barAvroSchema.encode(bar);
byte[] encodeBytes = Schema.KV_BYTES().encode(new KeyValue<>(fooBytes, barBytes));
KeyValue<byte[], byte[]> decodeKV = Schema.KV_BYTES().decode(encodeBytes);
Foo fooBack = fooAvroSchema.decode(decodeKV.getKey());
Bar barBack = barAvroSchema.decode(decodeKV.getValue());
assertEquals(foo, fooBack);
assertEquals(bar, barBack);
}
@Test
public void testNotAllowNullBytesSchemaEncodeAndDecode() {
AvroSchema<Foo> fooAvroSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
AvroSchema<Bar> barAvroSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).withAlwaysAllowNull(false).build());
Bar bar = new Bar();
bar.setField1(true);
......@@ -133,6 +248,7 @@ public class KeyValueSchemaTest {
foo.setField3(3);
foo.setField4(bar);
foo.setColor(Color.RED);
foo.setFieldUnableNull("notNull");
byte[] fooBytes = fooAvroSchema.encode(foo);
byte[] barBytes = barAvroSchema.encode(bar);
......
......@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl.schema;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
import org.apache.avro.reflect.Nullable;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.client.api.schema.SchemaBuilder;
......@@ -34,11 +35,17 @@ import org.testng.annotations.Test;
public class SchemaBuilderTest {
private static class AllOptionalFields {
@Nullable
private Integer intField;
@Nullable
private Long longField;
@Nullable
private String stringField;
@Nullable
private Boolean boolField;
@Nullable
private Float floatField;
@Nullable
private Double doubleField;
}
......
......@@ -23,6 +23,8 @@ import java.util.List;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.avro.reflect.AvroDefault;
import org.apache.avro.reflect.Nullable;
/**
* Utils for testing avro.
......@@ -33,11 +35,17 @@ public class SchemaTestUtils {
@ToString
@EqualsAndHashCode
public static class Foo {
@Nullable
private String field1;
@Nullable
private String field2;
private int field3;
@Nullable
private Bar field4;
@Nullable
private Color color;
@AvroDefault("\"defaultValue\"")
private String fieldUnableNull;
}
@Data
......@@ -87,21 +95,35 @@ public class SchemaTestUtils {
private Foo foo2;
}
public static final String SCHEMA_JSON
= "{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache.pulsar.client.impl.schema" +
".SchemaTestUtils$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\",\"string\"],\"default\":null}," +
"{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"field3\"," +
"\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Bar\"," +
"\"fields\":[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null},{\"name\":\"color\"," +
"\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"Color\",\"symbols\":[\"RED\",\"BLUE\"]}]," +
"\"default\":null}]}";
public static final String SCHEMA_AVRO_NOT_ALLOW_NULL
= "{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache.pulsar.client.impl.schema.SchemaTestUtils$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\",\"string\"]," +
"\"default\":null},{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":" +
"\"record\",\"name\":\"Bar\",\"fields\":[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null},{\"name\":\"color\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"Color\"," +
"\"symbols\":[\"RED\",\"BLUE\"]}],\"default\":null},{\"name\":\"fieldUnableNull\",\"type\":\"string\",\"default\":\"defaultValue\"}]}";
public static final String SCHEMA_AVRO_ALLOW_NULL = "{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache.pulsar.client.impl.schema.SchemaTestUtils$\",\"fields\":[{\"name\":\"field1\"," +
"\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"" +
"null\",{\"type\":\"record\",\"name\":\"Bar\",\"fields\":[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null},{\"name\":\"color\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"Color\"" +
",\"symbols\":[\"RED\",\"BLUE\"]}],\"default\":null},{\"name\":\"fieldUnableNull\",\"type\":[\"null\",\"string\"],\"default\":\"defaultValue\"}]}";
public static final String SCHEMA_JSON_NOT_ALLOW_NULL
= "{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache.pulsar.client.impl.schema.SchemaTestUtils$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\"" +
":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Bar\",\"fields\":[{\"name\":\"" +
"field1\",\"type\":\"boolean\"}]}],\"default\":null},{\"name\":\"color\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"Color\",\"symbols\":[\"RED\",\"BLUE\"]}],\"default\":null},{\"name\":\"fieldUnableNull\"," +
"\"type\":\"string\",\"default\":\"defaultValue\"}]}";
public static final String SCHEMA_JSON_ALLOW_NULL
= "{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache.pulsar.client.impl.schema.SchemaTestUtils$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\",\"string\"],\"default\":null}," +
"{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Bar\",\"fields\":" +
"[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null},{\"name\":\"color\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"Color\",\"symbols\":[\"RED\",\"BLUE\"]}],\"default\":null},{\"name\":" +
"\"fieldUnableNull\",\"type\":[\"null\",\"string\"],\"default\":\"defaultValue\"}]}";
public static String[] FOO_FIELDS = {
"field1",
"field2",
"field3",
"field4",
"color"
"color",
"fieldUnableNull"
};
}
......@@ -77,7 +77,7 @@ public class GenericSchemaImplTest {
Bar bar = new Bar();
bar.setField1(i % 2 == 0);
foo.setField4(bar);
foo.setFieldUnableNull("fieldUnableNull-1-" + i);
byte[] data = encodeSchema.encode(foo);
log.info("Decoding : {}", new String(data, UTF_8));
......@@ -93,6 +93,8 @@ public class GenericSchemaImplTest {
assertTrue(field4 instanceof GenericRecord);
GenericRecord field4Record = (GenericRecord) field4;
assertEquals(i % 2 == 0, field4Record.getField("field1"));
Object fieldUnableNull = record.getField("fieldUnableNull");
assertEquals("fieldUnableNull-1-" + i, fieldUnableNull, "fieldUnableNull 1 is " + fieldUnableNull.getClass());
}
}
......
......@@ -27,15 +27,15 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.JSONSchema;
@Slf4j
public class SampleAsyncProducerWithSchema {
public static void main(String[] args) throws IOException {
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("http://localhost:8080").build();
Producer<JsonPojo> producer = pulsarClient.newProducer(JSONSchema.of(JsonPojo.class)).topic("persistent://my-property/use/my-ns/my-topic")
Producer<JsonPojo> producer = pulsarClient.newProducer(JSONSchema.of(SchemaDefinition.<JsonPojo>builder().withPojo(JsonPojo.class).build())).topic("persistent://my-property/use/my-ns/my-topic")
.sendTimeout(3, TimeUnit.SECONDS).create();
List<CompletableFuture<MessageId>> futures = Lists.newArrayList();
......
......@@ -23,6 +23,7 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.JSONSchema;
public class SampleConsumerWithSchema {
......@@ -30,7 +31,8 @@ public class SampleConsumerWithSchema {
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("http://localhost:8080").build();
Consumer<JsonPojo> consumer = pulsarClient.newConsumer(JSONSchema.of(JsonPojo.class)) //
Consumer<JsonPojo> consumer = pulsarClient.newConsumer(JSONSchema.of
(SchemaDefinition.<JsonPojo>builder().withPojo(JsonPojo.class).build())) //
.topic("persistent://my-property/use/my-ns/my-topic") //
.subscriptionName("my-subscription-name").subscribe();
......
......@@ -27,6 +27,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
......@@ -124,10 +125,10 @@ public class TopicSchema {
return (Schema<T>) Schema.STRING;
case AVRO:
return AvroSchema.of(clazz);
return AvroSchema.of(SchemaDefinition.<T>builder().withPojo(clazz).build());
case JSON:
return JSONSchema.of(clazz);
return JSONSchema.of(SchemaDefinition.<T>builder().withPojo(clazz).build());
case KEY_VALUE:
return (Schema<T>)Schema.KV_BYTES();
......
......@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
......@@ -108,7 +109,7 @@ public class HbaseGenericRecordSinkTest {
obj.setAddress("address_value");
obj.setAge(30);
obj.setFlag(true);
AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
byte[] bytes = schema.encode(obj);
ByteBuf payload = Unpooled.copiedBuffer(bytes);
......
......@@ -29,6 +29,7 @@ import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
......@@ -95,7 +96,7 @@ public class JdbcSinkTest {
obj.setField1("ValueOfField1");
obj.setField2("ValueOfField1");
obj.setField3(3);
AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
byte[] bytes = schema.encode(obj);
ByteBuf payload = Unpooled.copiedBuffer(bytes);
......
......@@ -41,6 +41,7 @@ import org.apache.pulsar.client.admin.Schemas;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.api.Commands;
......@@ -219,21 +220,21 @@ public abstract class TestPulsarConnector {
partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_6.toString(), 7);
topicsToSchemas = new HashMap<>();
topicsToSchemas.put(TOPIC_1.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
topicsToSchemas.put(TOPIC_2.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
topicsToSchemas.put(TOPIC_3.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
topicsToSchemas.put(TOPIC_4.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
topicsToSchemas.put(TOPIC_5.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
topicsToSchemas.put(TOPIC_6.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
topicsToSchemas.put(TOPIC_1.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
topicsToSchemas.put(TOPIC_2.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
topicsToSchemas.put(TOPIC_3.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
topicsToSchemas.put(TOPIC_4.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
topicsToSchemas.put(TOPIC_5.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
topicsToSchemas.put(TOPIC_6.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
topicsToSchemas.put(PARTITIONED_TOPIC_1.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
topicsToSchemas.put(PARTITIONED_TOPIC_1.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
topicsToSchemas.put(PARTITIONED_TOPIC_2.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
topicsToSchemas.put(PARTITIONED_TOPIC_3.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
topicsToSchemas.put(PARTITIONED_TOPIC_4.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
topicsToSchemas.put(PARTITIONED_TOPIC_5.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
topicsToSchemas.put(PARTITIONED_TOPIC_6.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
topicsToSchemas.put(PARTITIONED_TOPIC_2.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
topicsToSchemas.put(PARTITIONED_TOPIC_3.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
topicsToSchemas.put(PARTITIONED_TOPIC_4.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
topicsToSchemas.put(PARTITIONED_TOPIC_5.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
topicsToSchemas.put(PARTITIONED_TOPIC_6.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
fooTypes = new HashMap<>();
fooTypes.put("field1", IntegerType.INTEGER);
......@@ -622,7 +623,7 @@ public abstract class TestPulsarConnector {
.setProducerName("test-producer").setSequenceId(i)
.setPublishTime(currentTimeMs + i).build();
Schema schema = topicsToSchemas.get(topicSchemaName).getType() == SchemaType.AVRO ? AvroSchema.of(Foo.class) : JSONSchema.of(Foo.class);
Schema schema = topicsToSchemas.get(topicSchemaName).getType() == SchemaType.AVRO ? AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build()) : JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
org.apache.pulsar.shade.io.netty.buffer.ByteBuf payload = org.apache.pulsar.shade.io.netty.buffer.Unpooled
.copiedBuffer(schema.encode(foo));
......
......@@ -33,6 +33,7 @@ import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testcontainers.containers.MySQLContainer;
......@@ -60,7 +61,7 @@ public class JdbcSinkTester extends SinkTester<MySQLContainer> {
private static final String NAME = "jdbc";
private static final String MYSQL = "mysql";
private AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
private AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
private String tableName = "test";
private Connection connection;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册