提交 1893323b 编写于 作者: B Boyang Jerry Peng 提交者: Sijie Guo

change JSONSchema to generate and store an avro schema (#2071)

Change JSONSchema to generate a Avro schema from POJO so we can standardize on using Avro schema
上级 ec04dc07
......@@ -20,13 +20,29 @@ package org.apache.pulsar.broker.service.schema;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.avro.SchemaValidationException;
import org.apache.avro.SchemaValidator;
import org.apache.avro.SchemaValidatorBuilder;
import org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import java.io.IOException;
import java.util.Arrays;
@SuppressWarnings("unused")
public class JsonSchemaCompatibilityCheck implements SchemaCompatibilityCheck {
private final ObjectMapper objectMapper = new ObjectMapper();
private final SchemaCompatibilityStrategy compatibilityStrategy;
public JsonSchemaCompatibilityCheck () {
this(SchemaCompatibilityStrategy.FULL);
}
public JsonSchemaCompatibilityCheck(SchemaCompatibilityStrategy compatibilityStrategy) {
this.compatibilityStrategy = compatibilityStrategy;
}
@Override
public SchemaType getSchemaType() {
......@@ -35,7 +51,66 @@ public class JsonSchemaCompatibilityCheck implements SchemaCompatibilityCheck {
@Override
public boolean isCompatible(SchemaData from, SchemaData to) {
if (isAvroSchema(from)) {
if (isAvroSchema(to)) {
// if both producer and broker have the schema in avro format
return isCompatibleAvroSchema(from, to);
} else if (isJsonSchema(to)) {
// if broker have the schema in avro format but producer sent a schema in the old json format
// allow old schema format for backwards compatiblity
return true;
} else {
// unknown schema format
return false;
}
} else if (isJsonSchema(from)){
if (isAvroSchema(to)) {
// if broker have the schema in old json format but producer sent a schema in the avro format
// return true and overwrite the old format
return true;
} else if (isJsonSchema(to)) {
// if both producer and broker have the schema in old json format
return isCompatibleJsonSchema(from, to);
} else {
// unknown schema format
return false;
}
} else {
// broker has schema format with unknown format
// maybe corrupted?
// return true to overwrite
return true;
}
}
private boolean isCompatibleAvroSchema(SchemaData from, SchemaData to) {
Schema.Parser fromParser = new Schema.Parser();
Schema fromSchema = fromParser.parse(new String(from.getData()));
Schema.Parser toParser = new Schema.Parser();
Schema toSchema = toParser.parse(new String(to.getData()));
SchemaValidator schemaValidator = createSchemaValidator(this.compatibilityStrategy, true);
try {
schemaValidator.validate(toSchema, Arrays.asList(fromSchema));
} catch (SchemaValidationException e) {
return false;
}
return true;
}
private ObjectMapper objectMapper;
private ObjectMapper getObjectMapper() {
if (objectMapper == null) {
objectMapper = new ObjectMapper();
}
return objectMapper;
}
private boolean isCompatibleJsonSchema(SchemaData from, SchemaData to) {
try {
ObjectMapper objectMapper = getObjectMapper();
JsonSchema fromSchema = objectMapper.readValue(from.getData(), JsonSchema.class);
JsonSchema toSchema = objectMapper.readValue(to.getData(), JsonSchema.class);
return fromSchema.getId().equals(toSchema.getId());
......@@ -43,4 +118,42 @@ public class JsonSchemaCompatibilityCheck implements SchemaCompatibilityCheck {
return false;
}
}
private boolean isAvroSchema(SchemaData schemaData) {
try {
Schema.Parser fromParser = new Schema.Parser();
Schema fromSchema = fromParser.parse(new String(schemaData.getData()));
return true;
} catch (SchemaParseException e) {
return false;
}
}
private boolean isJsonSchema(SchemaData schemaData) {
ObjectMapper objectMapper = getObjectMapper();
try {
JsonSchema fromSchema = objectMapper.readValue(schemaData.getData(), JsonSchema.class);
return true;
} catch (IOException e) {
return false;
}
}
private static SchemaValidator createSchemaValidator(SchemaCompatibilityStrategy compatibilityStrategy,
boolean onlyLatestValidator) {
final SchemaValidatorBuilder validatorBuilder = new SchemaValidatorBuilder();
switch (compatibilityStrategy) {
case BACKWARD:
return createLatestOrAllValidator(validatorBuilder.canReadStrategy(), onlyLatestValidator);
case FORWARD:
return createLatestOrAllValidator(validatorBuilder.canBeReadStrategy(), onlyLatestValidator);
default:
return createLatestOrAllValidator(validatorBuilder.mutualReadStrategy(), onlyLatestValidator);
}
}
private static SchemaValidator createLatestOrAllValidator(SchemaValidatorBuilder validatorBuilder, boolean onlyLatest) {
return onlyLatest ? validatorBuilder.validateLatest() : validatorBuilder.validateAll();
}
}
......@@ -23,6 +23,13 @@ import org.apache.pulsar.common.schema.SchemaType;
public interface SchemaCompatibilityCheck {
SchemaType getSchemaType();
/**
*
* @param from the current schema i.e. schema that the broker has
* @param to the future schema i.e. the schema sent by the producer
* @return whether the schemas are compatible
*/
boolean isCompatible(SchemaData from, SchemaData to);
SchemaCompatibilityCheck DEFAULT = new SchemaCompatibilityCheck() {
......
......@@ -23,127 +23,20 @@ import org.apache.pulsar.common.schema.SchemaType;
import org.testng.Assert;
import org.testng.annotations.Test;
public class AvroSchemaCompatibilityCheckTest {
public class AvroSchemaCompatibilityCheckTest extends BaseAvroSchemaCompatibilityTest{
private static final String schemaJson1 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}";
private static final SchemaData schemaData1 = getSchemaData(schemaJson1);
private static final String schemaJson2 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
"{\"name\":\"field2\",\"type\":\"string\",\"default\":\"foo\"}]}";
private static final SchemaData schemaData2 = getSchemaData(schemaJson2);
private static final String schemaJson3 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org" +
".apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheckTest$\"," +
"\"fields\":[{\"name\":\"field1\",\"type\":\"string\"},{\"name\":\"field2\",\"type\":\"string\"}]}";
private static final SchemaData schemaData3 = getSchemaData(schemaJson3);
private static final String schemaJson4 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1_v2\",\"type\":\"string\"," +
"\"aliases\":[\"field1\"]}]}";
private static final SchemaData schemaData4 = getSchemaData(schemaJson4);
private static final String schemaJson5 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\"," +
"\"string\"]}]}";
private static final SchemaData schemaData5 = getSchemaData(schemaJson5);
private static final String schemaJson6 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\"," +
"\"string\",\"int\"]}]}";
private static final SchemaData schemaData6 = getSchemaData(schemaJson6);
private static final String schemaJson7 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
"{\"name\":\"field2\",\"type\":\"string\",\"default\":\"foo\"},{\"name\":\"field3\"," +
"\"type\":\"string\",\"default\":\"bar\"}]}";
private static final SchemaData schemaData7 = getSchemaData(schemaJson7);
/**
* make sure new schema is backwards compatible with latest
*/
@Test
public void testBackwardCompatibility() {
AvroSchemaCompatibilityCheck avroSchemaCompatibilityCheck = new AvroSchemaCompatibilityCheck(
SchemaCompatibilityStrategy.BACKWARD
);
// adding a field with default is backwards compatible
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData2),
"adding a field with default is backwards compatible");
// adding a field without default is NOT backwards compatible
Assert.assertFalse(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData3),
"adding a field without default is NOT backwards compatible");
// Modifying a field name is not backwards compatible
Assert.assertFalse(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData4),
"Modifying a field name is not backwards compatible");
// evolving field to a union is backwards compatible
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData5),
"evolving field to a union is backwards compatible");
// removing a field from a union is NOT backwards compatible
Assert.assertFalse(avroSchemaCompatibilityCheck.isCompatible(schemaData5, schemaData1),
"removing a field from a union is NOT backwards compatible");
// adding a field to a union is backwards compatible
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData5, schemaData6),
"adding a field to a union is backwards compatible");
// removing a field a union is NOT backwards compatible
Assert.assertFalse(avroSchemaCompatibilityCheck.isCompatible(schemaData6, schemaData5),
"removing a field a union is NOT backwards compatible");
}
/**
* Check to make sure the last schema version is forward-compatible with new schemas
*/
@Test
public void testForwardCompatibility() {
AvroSchemaCompatibilityCheck avroSchemaCompatibilityCheck = new AvroSchemaCompatibilityCheck(
SchemaCompatibilityStrategy.FORWARD
);
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData2),
"adding a field is forward compatible");
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData3),
"adding a field is forward compatible");
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData2, schemaData3),
"adding a field is forward compatible");
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData3, schemaData2),
"adding a field is forward compatible");
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData3, schemaData2),
"adding a field is forward compatible");
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData2, schemaData7),
"removing fields is forward compatible");
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData2, schemaData1),
"removing fields with defaults forward compatible");
@Override
public SchemaCompatibilityCheck getBackwardsCompatibleSchemaCheck() {
return new AvroSchemaCompatibilityCheck(SchemaCompatibilityStrategy.BACKWARD);
}
/**
* Make sure the new schema is forward- and backward-compatible from the latest to newest and from the newest to latest.
*/
@Test
public void testFullCompatibility() {
AvroSchemaCompatibilityCheck avroSchemaCompatibilityCheck = new AvroSchemaCompatibilityCheck(
SchemaCompatibilityStrategy.FULL
);
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData2),
"adding a field with default fully compatible");
Assert.assertFalse(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData3),
"adding a field without default is not fully compatible");
Assert.assertFalse(avroSchemaCompatibilityCheck.isCompatible(schemaData3, schemaData1),
"adding a field without default is not fully compatible");
@Override
public SchemaCompatibilityCheck getForwardCompatibleSchemaCheck() {
return new AvroSchemaCompatibilityCheck(SchemaCompatibilityStrategy.FORWARD);
}
private static SchemaData getSchemaData(String schemaJson) {
return SchemaData.builder().data(schemaJson.getBytes()).type(SchemaType.AVRO).build();
@Override
public SchemaCompatibilityCheck getFullCompatibleSchemaCheck() {
return new AvroSchemaCompatibilityCheck(SchemaCompatibilityStrategy.FULL);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema;
import org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.Assert;
import org.testng.annotations.Test;
public abstract class BaseAvroSchemaCompatibilityTest {
SchemaCompatibilityCheck schemaCompatibilityCheck;
private static final String schemaJson1 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}";
private static final SchemaData schemaData1 = getSchemaData(schemaJson1);
private static final String schemaJson2 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
"{\"name\":\"field2\",\"type\":\"string\",\"default\":\"foo\"}]}";
private static final SchemaData schemaData2 = getSchemaData(schemaJson2);
private static final String schemaJson3 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org" +
".apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheckTest$\"," +
"\"fields\":[{\"name\":\"field1\",\"type\":\"string\"},{\"name\":\"field2\",\"type\":\"string\"}]}";
private static final SchemaData schemaData3 = getSchemaData(schemaJson3);
private static final String schemaJson4 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1_v2\",\"type\":\"string\"," +
"\"aliases\":[\"field1\"]}]}";
private static final SchemaData schemaData4 = getSchemaData(schemaJson4);
private static final String schemaJson5 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\"," +
"\"string\"]}]}";
private static final SchemaData schemaData5 = getSchemaData(schemaJson5);
private static final String schemaJson6 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\"," +
"\"string\",\"int\"]}]}";
private static final SchemaData schemaData6 = getSchemaData(schemaJson6);
private static final String schemaJson7 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
"{\"name\":\"field2\",\"type\":\"string\",\"default\":\"foo\"},{\"name\":\"field3\"," +
"\"type\":\"string\",\"default\":\"bar\"}]}";
private static final SchemaData schemaData7 = getSchemaData(schemaJson7);
public abstract SchemaCompatibilityCheck getBackwardsCompatibleSchemaCheck();
public abstract SchemaCompatibilityCheck getForwardCompatibleSchemaCheck();
public abstract SchemaCompatibilityCheck getFullCompatibleSchemaCheck();
/**
* make sure new schema is backwards compatible with latest
*/
@Test
public void testBackwardCompatibility() {
SchemaCompatibilityCheck schemaCompatibilityCheck = getBackwardsCompatibleSchemaCheck();
// adding a field with default is backwards compatible
Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData2),
"adding a field with default is backwards compatible");
// adding a field without default is NOT backwards compatible
Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData3),
"adding a field without default is NOT backwards compatible");
// Modifying a field name is not backwards compatible
Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData4),
"Modifying a field name is not backwards compatible");
// evolving field to a union is backwards compatible
Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData5),
"evolving field to a union is backwards compatible");
// removing a field from a union is NOT backwards compatible
Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData5, schemaData1),
"removing a field from a union is NOT backwards compatible");
// adding a field to a union is backwards compatible
Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData5, schemaData6),
"adding a field to a union is backwards compatible");
// removing a field a union is NOT backwards compatible
Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData6, schemaData5),
"removing a field a union is NOT backwards compatible");
}
/**
* Check to make sure the last schema version is forward-compatible with new schemas
*/
@Test
public void testForwardCompatibility() {
SchemaCompatibilityCheck schemaCompatibilityCheck = getForwardCompatibleSchemaCheck();
Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData2),
"adding a field is forward compatible");
Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData3),
"adding a field is forward compatible");
Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData2, schemaData3),
"adding a field is forward compatible");
Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData3, schemaData2),
"adding a field is forward compatible");
Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData3, schemaData2),
"adding a field is forward compatible");
Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData2, schemaData7),
"removing fields is forward compatible");
Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData2, schemaData1),
"removing fields with defaults forward compatible");
}
/**
* Make sure the new schema is forward- and backward-compatible from the latest to newest and from the newest to latest.
*/
@Test
public void testFullCompatibility() {
SchemaCompatibilityCheck schemaCompatibilityCheck = getFullCompatibleSchemaCheck();
Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData2),
"adding a field with default fully compatible");
Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData3),
"adding a field without default is not fully compatible");
Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData3, schemaData1),
"adding a field without default is not fully compatible");
}
private static SchemaData getSchemaData(String schemaJson) {
return SchemaData.builder().data(schemaJson.getBytes()).type(SchemaType.AVRO).build();
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
public class JsonSchemaCompatibilityCheckTest extends BaseAvroSchemaCompatibilityTest{
@Override
public SchemaCompatibilityCheck getBackwardsCompatibleSchemaCheck() {
return new JsonSchemaCompatibilityCheck(SchemaCompatibilityStrategy.BACKWARD);
}
@Override
public SchemaCompatibilityCheck getForwardCompatibleSchemaCheck() {
return new JsonSchemaCompatibilityCheck(SchemaCompatibilityStrategy.FORWARD);
}
@Override
public SchemaCompatibilityCheck getFullCompatibleSchemaCheck() {
return new JsonSchemaCompatibilityCheck(SchemaCompatibilityStrategy.FULL);
}
@Test
public void testJsonSchemaBackwardsCompatibility() throws JsonProcessingException {
SchemaData from = SchemaData.builder().data(OldJSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
SchemaData to = SchemaData.builder().data(JSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
JsonSchemaCompatibilityCheck jsonSchemaCompatibilityCheck = new JsonSchemaCompatibilityCheck();
Assert.assertTrue(jsonSchemaCompatibilityCheck.isCompatible(from, to));
from = SchemaData.builder().data(JSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
to = SchemaData.builder().data(OldJSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
Assert.assertTrue(jsonSchemaCompatibilityCheck.isCompatible(from, to));
}
@Data
@ToString
@EqualsAndHashCode
private static class Foo {
private String field1;
private String field2;
private int field3;
private Bar field4;
}
@Data
@ToString
@EqualsAndHashCode
private static class Bar {
private boolean field1;
}
public static class OldJSONSchema<T> implements Schema<T> {
private final SchemaInfo info;
private final ObjectMapper objectMapper;
private final Class<T> pojo;
private OldJSONSchema(SchemaInfo info, Class<T> pojo, ObjectMapper objectMapper) {
this.info = info;
this.pojo = pojo;
this.objectMapper = objectMapper;
}
@Override
public byte[] encode(T message) throws SchemaSerializationException {
try {
return objectMapper.writeValueAsBytes(message);
} catch (JsonProcessingException e) {
throw new SchemaSerializationException(e);
}
}
@Override
public T decode(byte[] bytes) {
try {
return objectMapper.readValue(new String(bytes), pojo);
} catch (IOException e) {
throw new RuntimeException(new SchemaSerializationException(e));
}
}
@Override
public SchemaInfo getSchemaInfo() {
return info;
}
public static <T> OldJSONSchema<T> of(Class<T> pojo) throws JsonProcessingException {
return of(pojo, Collections.emptyMap());
}
public static <T> OldJSONSchema<T> of(Class<T> pojo, Map<String, String> properties) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(mapper);
JsonSchema schema = schemaGen.generateSchema(pojo);
SchemaInfo info = new SchemaInfo();
info.setName("");
info.setProperties(properties);
info.setType(SchemaType.JSON);
info.setSchema(mapper.writeValueAsBytes(schema));
return new OldJSONSchema<>(info, pojo, mapper);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema;
public class ProtobufSchemaCompatibilityCheckTest extends BaseAvroSchemaCompatibilityTest{
@Override
public SchemaCompatibilityCheck getBackwardsCompatibleSchemaCheck() {
return new ProtobufSchemaCompatibilityCheck(SchemaCompatibilityStrategy.BACKWARD);
}
@Override
public SchemaCompatibilityCheck getForwardCompatibleSchemaCheck() {
return new ProtobufSchemaCompatibilityCheck(SchemaCompatibilityStrategy.FORWARD);
}
@Override
public SchemaCompatibilityCheck getFullCompatibleSchemaCheck() {
return new ProtobufSchemaCompatibilityCheck(SchemaCompatibilityStrategy.FULL);
}
}
......@@ -141,8 +141,8 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
@Test(expectedExceptions = {PulsarClientException.class})
public void testJsonConsumerWithWrongPrestoredSchema() throws Exception {
@Test
public void testJsonConsumerWithWrongCorruptedSchema() throws Exception {
log.info("-- Starting {} test --", methodName);
byte[] randomSchemaBytes = "hello".getBytes();
......@@ -168,8 +168,8 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
@Test(expectedExceptions = {PulsarClientException.class})
public void testJsonProducerWithWrongPrestoredSchema() throws Exception {
@Test
public void testJsonProducerWithWrongCorruptedSchema() throws Exception {
log.info("-- Starting {} test --", methodName);
byte[] randomSchemaBytes = "hello".getBytes();
......
......@@ -57,6 +57,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.CryptoException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.Commands.ChecksumType;
......@@ -66,6 +67,8 @@ import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
import org.slf4j.Logger;
......@@ -845,9 +848,28 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
long requestId = client.newRequestId();
SchemaInfo schemaInfo = null;
if (schema != null) {
if (schema.getSchemaInfo() != null) {
if (schema.getSchemaInfo().getType() == SchemaType.JSON) {
// for backwards compatibility purposes
// JSONSchema originally generated a schema for pojo based of of the JSON schema standard
// but now we have standardized on every schema to generate an Avro based schema
if (Commands.peerSupportJsonSchemaAvroFormat(cnx.getRemoteEndpointProtocolVersion())) {
schemaInfo = schema.getSchemaInfo();
} else {
JSONSchema jsonSchema = (JSONSchema) schema;
schemaInfo = jsonSchema.getBackwardsCompatibleJsonSchemaInfo();
}
} else {
schemaInfo = schema.getSchemaInfo();
}
}
}
cnx.sendRequestWithId(
Commands.newProducer(topic, producerId, requestId, producerName, conf.isEncryptionEnabled(), metadata,
schema == null ? null : schema.getSchemaInfo()),
schemaInfo),
requestId).thenAccept(response -> {
String producerName = response.getProducerName();
long lastSequenceId = response.getLastSequenceId();
......
......@@ -45,7 +45,7 @@ public class AvroSchema<T> implements Schema<T> {
private BinaryEncoder encoder;
private ByteArrayOutputStream byteArrayOutputStream;
public AvroSchema(Class<T> pojo, Map<String, String> properties) {
private AvroSchema(Class<T> pojo, Map<String, String> properties) {
this.schema = ReflectData.AllowNull.get().getSchema(pojo);
this.schemaInfo = new SchemaInfo();
......
......@@ -22,24 +22,35 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import org.apache.avro.reflect.ReflectData;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
public class JSONSchema<T> implements Schema<T> {
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
private final SchemaInfo info;
public class JSONSchema<T> implements Schema<T>{
private final org.apache.avro.Schema schema;
private final SchemaInfo schemaInfo;
private final ObjectMapper objectMapper;
private final Class<T> pojo;
private Map<String, String> properties;
private JSONSchema(SchemaInfo info, Class<T> pojo, ObjectMapper objectMapper) {
this.info = info;
private JSONSchema(Class<T> pojo, Map<String, String> properties) {
this.pojo = pojo;
this.objectMapper = objectMapper;
this.properties = properties;
this.objectMapper = new ObjectMapper();
this.schema = ReflectData.AllowNull.get().getSchema(pojo);
this.schemaInfo = new SchemaInfo();
this.schemaInfo.setName("");
this.schemaInfo.setProperties(properties);
this.schemaInfo.setType(SchemaType.JSON);
this.schemaInfo.setSchema(this.schema.toString().getBytes());
}
@Override
......@@ -62,23 +73,36 @@ public class JSONSchema<T> implements Schema<T> {
@Override
public SchemaInfo getSchemaInfo() {
return info;
return this.schemaInfo;
}
public static <T> JSONSchema<T> of(Class<T> pojo) throws JsonProcessingException {
return of(pojo, Collections.emptyMap());
/**
* Implemented for backwards compatibility reasons
* since the original schema generated by JSONSchema was based off the json schema standard
* since then we have standardized on Avro
* @return
*/
public SchemaInfo getBackwardsCompatibleJsonSchemaInfo() {
SchemaInfo backwardsCompatibleSchemaInfo;
try {
JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(objectMapper);
JsonSchema jsonBackwardsCompatibileSchema = schemaGen.generateSchema(pojo);
backwardsCompatibleSchemaInfo = new SchemaInfo();
backwardsCompatibleSchemaInfo.setName("");
backwardsCompatibleSchemaInfo.setProperties(properties);
backwardsCompatibleSchemaInfo.setType(SchemaType.JSON);
backwardsCompatibleSchemaInfo.setSchema(objectMapper.writeValueAsBytes(jsonBackwardsCompatibileSchema));
} catch (JsonProcessingException ex) {
throw new RuntimeException(ex);
}
return backwardsCompatibleSchemaInfo;
}
public static <T> JSONSchema<T> of(Class<T> pojo, Map<String, String> properties) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(mapper);
JsonSchema schema = schemaGen.generateSchema(pojo);
public static <T> JSONSchema<T> of(Class<T> pojo) {
return new JSONSchema<>(pojo, Collections.emptyMap());
}
SchemaInfo info = new SchemaInfo();
info.setName("");
info.setProperties(properties);
info.setType(SchemaType.JSON);
info.setSchema(mapper.writeValueAsBytes(schema));
return new JSONSchema<>(info, pojo, mapper);
public static <T> JSONSchema<T> of(Class<T> pojo, Map<String, String> properties) {
return new JSONSchema<>(pojo, properties);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.schemas;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.avro.Schema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.Assert;
import org.testng.annotations.Test;
public class JSONSchemaTest {
@Data
@ToString
@EqualsAndHashCode
private static class Foo {
private String field1;
private String field2;
private int field3;
private Bar field4;
}
@Data
@ToString
@EqualsAndHashCode
private static class Bar {
private boolean field1;
}
private static final String SCHEMA_JSON = "{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache" +
".pulsar.client" +
".schemas.JSONSchemaTest$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\",\"string\"]," +
"\"default\":null},{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null}," +
"{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":\"record\"," +
"\"name\":\"Bar\",\"fields\":[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null}]}";
private static String[] FOO_FIELDS = {
"field1",
"field2",
"field3",
"field4"
};
@Test
public void testSchema() {
JSONSchema<Foo> jsonSchema = JSONSchema.of(Foo.class);
Assert.assertEquals(jsonSchema.getSchemaInfo().getType(), SchemaType.JSON);
Schema.Parser parser = new Schema.Parser();
String schemaJson = new String(jsonSchema.getSchemaInfo().getSchema());
Assert.assertEquals(schemaJson, SCHEMA_JSON);
Schema schema = parser.parse(schemaJson);
for (String fieldName : FOO_FIELDS) {
Schema.Field field = schema.getField(fieldName);
Assert.assertNotNull(field);
if (field.name().equals("field4")) {
Assert.assertNotNull(field.schema().getTypes().get(1).getField("field1"));
}
}
}
@Test
public void testEncodeAndDecode() {
JSONSchema<Foo> jsonSchema = JSONSchema.of(Foo.class, null);
Foo foo1 = new Foo();
foo1.setField1("foo1");
foo1.setField2("bar1");
foo1.setField4(new Bar());
Foo foo2 = new Foo();
foo2.setField1("foo2");
foo2.setField2("bar2");
byte[] bytes1 = jsonSchema.encode(foo1);
Assert.assertTrue(bytes1.length > 0);
byte[] bytes2 = jsonSchema.encode(foo2);
Assert.assertTrue(bytes2.length > 0);
Foo object1 = jsonSchema.decode(bytes1);
Foo object2 = jsonSchema.decode(bytes2);
Assert.assertEquals(object1, foo1);
Assert.assertEquals(object2, foo2);
}
}
......@@ -1074,4 +1074,7 @@ public class Commands {
return peerVersion >= ProtocolVersion.v12.getNumber();
}
public static boolean peerSupportJsonSchemaAvroFormat(int peerVersion) {
return peerVersion >= ProtocolVersion.v13.getNumber();
}
}
......@@ -200,6 +200,7 @@ public final class PulsarApi {
v10(10, 10),
v11(11, 11),
v12(12, 12),
v13(13, 13),
;
public static final int v0_VALUE = 0;
......@@ -215,6 +216,7 @@ public final class PulsarApi {
public static final int v10_VALUE = 10;
public static final int v11_VALUE = 11;
public static final int v12_VALUE = 12;
public static final int v13_VALUE = 13;
public final int getNumber() { return value; }
......@@ -234,6 +236,7 @@ public final class PulsarApi {
case 10: return v10;
case 11: return v11;
case 12: return v12;
case 13: return v13;
default: return null;
}
}
......@@ -160,6 +160,7 @@ enum ProtocolVersion {
v12 = 12;// Added get topic's last messageId from broker
// Added CommandActiveConsumerChange
// Added CommandGetTopicsOfNamespace
v13 = 13;
}
message CommandConnect {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册