From 2dae33d6816fd304913bc31352d086e2fde2b38c Mon Sep 17 00:00:00 2001 From: Boyang Jerry Peng Date: Mon, 11 Jun 2018 10:46:09 -0700 Subject: [PATCH] adding avro schema (#1917) * adding avro schema * improving implementation * finishing implementation * remove unnecessary newlines * fixing poms * adding avro schema check * add missing license header * Add types to proto definitions * adding compatibiliy unit tests * shade avro dependencies * add shading to pulsar client kafka --- pom.xml | 1 + .../pulsar/broker/ServiceConfiguration.java | 3 +- pulsar-broker-shaded/pom.xml | 29 ++++ pulsar-broker/pom.xml | 6 + .../pulsar/broker/service/ServerCnx.java | 4 + .../schema/AvroSchemaCompatibilityCheck.java | 87 ++++++++++ .../schema/SchemaRegistryServiceImpl.java | 9 ++ .../src/main/proto/SchemaRegistryFormat.proto | 2 + .../AvroSchemaCompatibilityCheckTest.java | 149 ++++++++++++++++++ .../api/SimpleTypedProducerConsumerTest.java | 125 +++++++++++++++ pulsar-client-admin-shaded/pom.xml | 29 ++++ .../pulsar-client-kafka-shaded/pom.xml | 29 ++++ pulsar-client-shaded/pom.xml | 29 ++++ pulsar-client/pom.xml | 6 + .../pulsar/client/impl/schema/AvroSchema.java | 98 ++++++++++++ .../pulsar/client/schemas/AvroSchemaTest.java | 109 +++++++++++++ .../apache/pulsar/common/api/Commands.java | 4 + .../pulsar/common/api/proto/PulsarApi.java | 6 + .../pulsar/common/schema/SchemaType.java | 10 +- pulsar-common/src/main/proto/PulsarApi.proto | 2 + 20 files changed, 735 insertions(+), 2 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheck.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheckTest.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/schemas/AvroSchemaTest.java diff --git a/pom.xml b/pom.xml index c99eb803098..734c9081efb 100644 --- a/pom.xml +++ b/pom.xml @@ -152,6 +152,7 @@ flexible messaging model and an intuitive client API. 0.10.2.1 5.1.1 1.11.297 + 1.8.2 3.4.0 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 77e19f5fbfe..212c5c42b63 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -455,7 +455,8 @@ public class ServiceConfiguration implements PulsarConfiguration { private String schemaRegistryStorageClassName = "org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory"; private Set schemaRegistryCompatibilityCheckers = Sets.newHashSet( - "org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck" + "org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck", + "org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck" ); /**** --- WebSocket --- ****/ diff --git a/pulsar-broker-shaded/pom.xml b/pulsar-broker-shaded/pom.xml index 181d822084e..645ab96a3e1 100644 --- a/pulsar-broker-shaded/pom.xml +++ b/pulsar-broker-shaded/pom.xml @@ -107,6 +107,14 @@ org.apache.httpcomponents:httpclient commons-logging:commons-logging org.apache.httpcomponents:httpcore + org.apache.avro:avro + + org.codehaus.jackson:jackson-core-asl + org.codehaus.jackson:jackson-mapper-asl + com.thoughtworks.paranamer:paranamer + org.xerial.snappy:snappy-java + org.apache.commons:commons-compress + org.tukaani:xz @@ -311,6 +319,27 @@ org.apache.http org.apache.pulsar.shade.org.apache.http + + org.apache.avro + org.apache.pulsar.shade.org.apache.avro + + + + org.codehaus.jackson + org.apache.pulsar.shade.org.codehaus.jackson + + + com.thoughtworks.paranamer + org.apache.pulsar.shade.com.thoughtworks.paranamer + + + org.xerial.snappy + org.apache.pulsar.shade.org.xerial.snappy + + + org.tukaani + org.apache.pulsar.shade.org.tukaani + diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml index dd4118d43ce..0a118a8d912 100644 --- a/pulsar-broker/pom.xml +++ b/pulsar-broker/pom.xml @@ -247,6 +247,12 @@ java-semver + + org.apache.avro + avro + ${avro.version} + + org.aspectj diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 490f9a47048..fcf40453f08 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -682,6 +682,10 @@ public class ServerCnx extends PulsarHandler { return SchemaType.STRING; case Json: return SchemaType.JSON; + case Protobuf: + return SchemaType.PROTOBUF; + case Avro: + return SchemaType.AVRO; default: return SchemaType.NONE; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheck.java new file mode 100644 index 00000000000..5d5a77e18d0 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheck.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaValidationException; +import org.apache.avro.SchemaValidator; +import org.apache.avro.SchemaValidatorBuilder; +import org.apache.pulsar.common.schema.SchemaData; +import org.apache.pulsar.common.schema.SchemaType; + + +import java.util.Arrays; + +public class AvroSchemaCompatibilityCheck implements SchemaCompatibilityCheck { + + private final CompatibilityStrategy compatibilityStrategy; + + public AvroSchemaCompatibilityCheck () { + this(CompatibilityStrategy.FULL); + } + + public AvroSchemaCompatibilityCheck(CompatibilityStrategy compatibilityStrategy) { + this.compatibilityStrategy = compatibilityStrategy; + } + + @Override + public SchemaType getSchemaType() { + return SchemaType.AVRO; + } + + @Override + public boolean isCompatible(SchemaData from, SchemaData to) { + + Schema.Parser fromParser = new Schema.Parser(); + Schema fromSchema = fromParser.parse(new String(from.getData())); + Schema.Parser toParser = new Schema.Parser(); + Schema toSchema = toParser.parse(new String(to.getData())); + + SchemaValidator schemaValidator = createSchemaValidator(this.compatibilityStrategy, true); + try { + schemaValidator.validate(toSchema, Arrays.asList(fromSchema)); + } catch (SchemaValidationException e) { + return false; + } + return true; + } + + public enum CompatibilityStrategy { + BACKWARD, + FORWARD, + FULL + } + + private static SchemaValidator createSchemaValidator(CompatibilityStrategy compatibilityStrategy, + boolean onlyLatestValidator) { + final SchemaValidatorBuilder validatorBuilder = new SchemaValidatorBuilder(); + switch (compatibilityStrategy) { + case BACKWARD: + return createLatestOrAllValidator(validatorBuilder.canReadStrategy(), onlyLatestValidator); + case FORWARD: + return createLatestOrAllValidator(validatorBuilder.canBeReadStrategy(), onlyLatestValidator); + default: + return createLatestOrAllValidator(validatorBuilder.mutualReadStrategy(), onlyLatestValidator); + } + } + + private static SchemaValidator createLatestOrAllValidator(SchemaValidatorBuilder validatorBuilder, boolean onlyLatest) { + return onlyLatest ? validatorBuilder.validateLatest() : validatorBuilder.validateAll(); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java index b1a7d2c0f40..8cecad4b22e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java @@ -136,6 +136,7 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService { } private CompletableFuture checkCompatibilityWithLatest(String schemaId, SchemaData schema) { + return getSchema(schemaId).thenApply(storedSchema -> (storedSchema == null) || compatibilityChecks.getOrDefault( @@ -154,6 +155,10 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService { return SchemaType.STRING; case JSON: return SchemaType.JSON; + case PROTOBUF: + return SchemaType.PROTOBUF; + case AVRO: + return SchemaType.AVRO; default: return SchemaType.NONE; } @@ -167,6 +172,10 @@ public class SchemaRegistryServiceImpl implements SchemaRegistryService { return SchemaRegistryFormat.SchemaInfo.SchemaType.STRING; case JSON: return SchemaRegistryFormat.SchemaInfo.SchemaType.JSON; + case PROTOBUF: + return SchemaRegistryFormat.SchemaInfo.SchemaType.PROTOBUF; + case AVRO: + return SchemaRegistryFormat.SchemaInfo.SchemaType.AVRO; default: return SchemaRegistryFormat.SchemaInfo.SchemaType.NONE; } diff --git a/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto b/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto index 8776ddf45fe..90fa14512c7 100644 --- a/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto +++ b/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto @@ -27,6 +27,8 @@ message SchemaInfo { NONE = 1; STRING = 2; JSON = 3; + PROTOBUF = 4; + AVRO = 5; } message KeyValuePair { required string key = 1; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheckTest.java new file mode 100644 index 00000000000..de63f54cbfc --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/AvroSchemaCompatibilityCheckTest.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema; + +import org.apache.pulsar.common.schema.SchemaData; +import org.apache.pulsar.common.schema.SchemaType; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class AvroSchemaCompatibilityCheckTest { + + private static final String schemaJson1 = + "{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" + + ".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}"; + private static final SchemaData schemaData1 = getSchemaData(schemaJson1); + + private static final String schemaJson2 = + "{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" + + ".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," + + "{\"name\":\"field2\",\"type\":\"string\",\"default\":\"foo\"}]}"; + private static final SchemaData schemaData2 = getSchemaData(schemaJson2); + + private static final String schemaJson3 = + "{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org" + + ".apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheckTest$\"," + + "\"fields\":[{\"name\":\"field1\",\"type\":\"string\"},{\"name\":\"field2\",\"type\":\"string\"}]}"; + private static final SchemaData schemaData3 = getSchemaData(schemaJson3); + + private static final String schemaJson4 = + "{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" + + ".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1_v2\",\"type\":\"string\"," + + "\"aliases\":[\"field1\"]}]}"; + private static final SchemaData schemaData4 = getSchemaData(schemaJson4); + + private static final String schemaJson5 = + "{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" + + ".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\"," + + "\"string\"]}]}"; + private static final SchemaData schemaData5 = getSchemaData(schemaJson5); + + private static final String schemaJson6 = + "{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" + + ".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\"," + + "\"string\",\"int\"]}]}"; + private static final SchemaData schemaData6 = getSchemaData(schemaJson6); + + private static final String schemaJson7 = + "{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" + + ".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," + + "{\"name\":\"field2\",\"type\":\"string\",\"default\":\"foo\"},{\"name\":\"field3\"," + + "\"type\":\"string\",\"default\":\"bar\"}]}"; + private static final SchemaData schemaData7 = getSchemaData(schemaJson7); + + /** + * make sure new schema is backwards compatible with latest + */ + @Test + public void testBackwardCompatibility() { + + AvroSchemaCompatibilityCheck avroSchemaCompatibilityCheck = new AvroSchemaCompatibilityCheck( + AvroSchemaCompatibilityCheck.CompatibilityStrategy.BACKWARD + ); + + // adding a field with default is backwards compatible + Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData2), + "adding a field with default is backwards compatible"); + // adding a field without default is NOT backwards compatible + Assert.assertFalse(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData3), + "adding a field without default is NOT backwards compatible"); + // Modifying a field name is not backwards compatible + Assert.assertFalse(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData4), + "Modifying a field name is not backwards compatible"); + // evolving field to a union is backwards compatible + Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData5), + "evolving field to a union is backwards compatible"); + // removing a field from a union is NOT backwards compatible + Assert.assertFalse(avroSchemaCompatibilityCheck.isCompatible(schemaData5, schemaData1), + "removing a field from a union is NOT backwards compatible"); + // adding a field to a union is backwards compatible + Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData5, schemaData6), + "adding a field to a union is backwards compatible"); + // removing a field a union is NOT backwards compatible + Assert.assertFalse(avroSchemaCompatibilityCheck.isCompatible(schemaData6, schemaData5), + "removing a field a union is NOT backwards compatible"); + } + + /** + * Check to make sure the last schema version is forward-compatible with new schemas + */ + @Test + public void testForwardCompatibility() { + + AvroSchemaCompatibilityCheck avroSchemaCompatibilityCheck = new AvroSchemaCompatibilityCheck( + AvroSchemaCompatibilityCheck.CompatibilityStrategy.FORWARD + ); + + Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData2), + "adding a field is forward compatible"); + Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData3), + "adding a field is forward compatible"); + Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData2, schemaData3), + "adding a field is forward compatible"); + Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData3, schemaData2), + "adding a field is forward compatible"); + Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData3, schemaData2), + "adding a field is forward compatible"); + Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData2, schemaData7), + "removing fields is forward compatible"); + Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData2, schemaData1), + "removing fields with defaults forward compatible"); + } + + /** + * Make sure the new schema is forward- and backward-compatible from the latest to newest and from the newest to latest. + */ + @Test + public void testFullCompatibility() { + AvroSchemaCompatibilityCheck avroSchemaCompatibilityCheck = new AvroSchemaCompatibilityCheck( + AvroSchemaCompatibilityCheck.CompatibilityStrategy.FULL + ); + Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData2), + "adding a field with default fully compatible"); + Assert.assertFalse(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData3), + "adding a field without default is not fully compatible"); + Assert.assertFalse(avroSchemaCompatibilityCheck.isCompatible(schemaData3, schemaData1), + "adding a field without default is not fully compatible"); + + } + + private static SchemaData getSchemaData(String schemaJson) { + return SchemaData.builder().data(schemaJson.getBytes()).type(SchemaType.AVRO).build(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java index fd76393aa65..873080f0ff6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java @@ -26,6 +26,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.service.schema.SchemaRegistry; +import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.common.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaType; @@ -193,6 +194,130 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase { log.info("-- Exiting {} test --", methodName); } + @Test + public void testAvroProducerAndConsumer() throws Exception { + log.info("-- Starting {} test --", methodName); + + AvroSchema avroSchema = + AvroSchema.of(AvroEncodedPojo.class); + + Consumer consumer = pulsarClient + .newConsumer(avroSchema) + .topic("persistent://my-property/use/my-ns/my-topic1") + .subscriptionName("my-subscriber-name") + .subscribe(); + + Producer producer = pulsarClient + .newProducer(avroSchema) + .topic("persistent://my-property/use/my-ns/my-topic1") + .create(); + + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(new AvroEncodedPojo(message)); + } + + Message msg = null; + Set messageSet = Sets.newHashSet(); + for (int i = 0; i < 10; i++) { + msg = consumer.receive(5, TimeUnit.SECONDS); + AvroEncodedPojo receivedMessage = msg.getValue(); + log.debug("Received message: [{}]", receivedMessage); + AvroEncodedPojo expectedMessage = new AvroEncodedPojo("my-message-" + i); + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + consumer.close(); + + SchemaRegistry.SchemaAndMetadata storedSchema = pulsar.getSchemaRegistryService() + .getSchema("my-property/my-ns/my-topic1") + .get(); + + Assert.assertEquals(storedSchema.schema.getData(), avroSchema.getSchemaInfo().getSchema()); + + log.info("-- Exiting {} test --", methodName); + + } + + @Test(expectedExceptions = {PulsarClientException.class}) + public void testAvroConsumerWithWrongPrestoredSchema() throws Exception { + log.info("-- Starting {} test --", methodName); + + byte[] randomSchemaBytes = ("{\n" + + " \"type\": \"record\",\n" + + " \"namespace\": \"com.example\",\n" + + " \"name\": \"FullName\",\n" + + " \"fields\": [\n" + + " { \"name\": \"first\", \"type\": \"string\" },\n" + + " { \"name\": \"last\", \"type\": \"string\" }\n" + + " ]\n" + + "} ").getBytes(); + + pulsar.getSchemaRegistryService() + .putSchemaIfAbsent("my-property/my-ns/my-topic1", + SchemaData.builder() + .type(SchemaType.AVRO) + .isDeleted(false) + .timestamp(Clock.systemUTC().millis()) + .user("me") + .data(randomSchemaBytes) + .props(Collections.emptyMap()) + .build() + ).get(); + + Consumer consumer = pulsarClient + .newConsumer(AvroSchema.of(AvroEncodedPojo.class)) + .topic("persistent://my-property/use/my-ns/my-topic1") + .subscriptionName("my-subscriber-name") + .subscribe(); + + log.info("-- Exiting {} test --", methodName); + } + + public static class AvroEncodedPojo { + private String message; + + public AvroEncodedPojo() { + } + + public AvroEncodedPojo(String message) { + this.message = message; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AvroEncodedPojo that = (AvroEncodedPojo) o; + return Objects.equals(message, that.message); + } + + @Override + public int hashCode() { + return Objects.hash(message); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("message", message) + .toString(); + } + } + public static class JsonEncodedPojo { private String message; diff --git a/pulsar-client-admin-shaded/pom.xml b/pulsar-client-admin-shaded/pom.xml index 18e1ba914a6..839091f7eb3 100644 --- a/pulsar-client-admin-shaded/pom.xml +++ b/pulsar-client-admin-shaded/pom.xml @@ -84,6 +84,14 @@ javax.annotation:* org.glassfish.hk2*:* com.fasterxml.jackson.*:* + org.apache.avro:avro + + org.codehaus.jackson:jackson-core-asl + org.codehaus.jackson:jackson-mapper-asl + com.thoughtworks.paranamer:paranamer + org.xerial.snappy:snappy-java + org.apache.commons:commons-compress + org.tukaani:xz @@ -187,6 +195,27 @@ org.reactivestreams org.apache.pulsar.admin.shade.org.reactivestreams + + org.apache.avro + org.apache.pulsar.shade.org.apache.avro + + + + org.codehaus.jackson + org.apache.pulsar.shade.org.codehaus.jackson + + + com.thoughtworks.paranamer + org.apache.pulsar.shade.com.thoughtworks.paranamer + + + org.xerial.snappy + org.apache.pulsar.shade.org.xerial.snappy + + + org.tukaani + org.apache.pulsar.shade.org.tukaani + diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-shaded/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka-shaded/pom.xml index e3c6ddb3a41..0bd9453da33 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka-shaded/pom.xml +++ b/pulsar-client-kafka-compat/pulsar-client-kafka-shaded/pom.xml @@ -91,6 +91,14 @@ org.apache.httpcomponents:httpclient commons-logging:commons-logging org.apache.httpcomponents:httpcore + org.apache.avro:avro + + org.codehaus.jackson:jackson-core-asl + org.codehaus.jackson:jackson-mapper-asl + com.thoughtworks.paranamer:paranamer + org.xerial.snappy:snappy-java + org.apache.commons:commons-compress + org.tukaani:xz @@ -173,6 +181,27 @@ org.apache.http org.apache.pulsar.shade.org.apache.http + + org.apache.avro + org.apache.pulsar.shade.org.apache.avro + + + + org.codehaus.jackson + org.apache.pulsar.shade.org.codehaus.jackson + + + com.thoughtworks.paranamer + org.apache.pulsar.shade.com.thoughtworks.paranamer + + + org.xerial.snappy + org.apache.pulsar.shade.org.xerial.snappy + + + org.tukaani + org.apache.pulsar.shade.org.tukaani + diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml index e1e6abf4008..3df4105b6a5 100644 --- a/pulsar-client-shaded/pom.xml +++ b/pulsar-client-shaded/pom.xml @@ -86,6 +86,14 @@ org.apache.httpcomponents:httpclient commons-logging:commons-logging org.apache.httpcomponents:httpcore + org.apache.avro:avro + + org.codehaus.jackson:jackson-core-asl + org.codehaus.jackson:jackson-mapper-asl + com.thoughtworks.paranamer:paranamer + org.xerial.snappy:snappy-java + org.apache.commons:commons-compress + org.tukaani:xz @@ -167,6 +175,27 @@ org.apache.http org.apache.pulsar.shade.org.apache.http + + org.apache.avro + org.apache.pulsar.shade.org.apache.avro + + + + org.codehaus.jackson + org.apache.pulsar.shade.org.codehaus.jackson + + + com.thoughtworks.paranamer + org.apache.pulsar.shade.com.thoughtworks.paranamer + + + org.xerial.snappy + org.apache.pulsar.shade.org.xerial.snappy + + + org.tukaani + org.apache.pulsar.shade.org.tukaani + diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml index 16ff2863413..16a7224ce09 100644 --- a/pulsar-client/pom.xml +++ b/pulsar-client/pom.xml @@ -91,6 +91,12 @@ + + org.apache.avro + avro + ${avro.version} + + com.google.protobuf protobuf-java diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java new file mode 100644 index 00000000000..4bca9998620 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SchemaSerializationException; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +@Slf4j +public class AvroSchema implements Schema { + + private SchemaInfo schemaInfo; + private org.apache.avro.Schema schema; + private ReflectDatumWriter datumWriter; + private ReflectDatumReader reader; + private BinaryEncoder encoder; + private ByteArrayOutputStream byteArrayOutputStream; + + public AvroSchema(Class pojo, Map properties) { + this.schema = ReflectData.AllowNull.get().getSchema(pojo); + + this.schemaInfo = new SchemaInfo(); + this.schemaInfo.setName(""); + this.schemaInfo.setProperties(properties); + this.schemaInfo.setType(SchemaType.AVRO); + this.schemaInfo.setSchema(this.schema.toString().getBytes()); + + this.byteArrayOutputStream = new ByteArrayOutputStream(); + this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, this.encoder); + this.datumWriter = new ReflectDatumWriter<>(this.schema); + this.reader = new ReflectDatumReader<>(this.schema); + } + + @Override + public byte[] encode(T message) { + + try { + datumWriter.write(message, this.encoder); + this.encoder.flush(); + return this.byteArrayOutputStream.toByteArray(); + } catch (Exception e) { + throw new SchemaSerializationException(e); + } finally { + this.byteArrayOutputStream.reset(); + } + } + + @Override + public T decode(byte[] bytes) { + try { + return reader.read(null, DecoderFactory.get().binaryDecoder(bytes, null)); + } catch (IOException e) { + throw new SchemaSerializationException(e); + } + } + + @Override + public SchemaInfo getSchemaInfo() { + return this.schemaInfo; + } + + public static AvroSchema of(Class pojo) { + return new AvroSchema<>(pojo, Collections.emptyMap()); + } + + public static AvroSchema of(Class pojo, Map properties) { + return new AvroSchema<>(pojo, properties); + } +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/schemas/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/schemas/AvroSchemaTest.java new file mode 100644 index 00000000000..a8b94deca1e --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/schemas/AvroSchemaTest.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.schemas; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.Schema; +import org.apache.pulsar.client.impl.schema.AvroSchema; +import org.apache.pulsar.common.schema.SchemaType; +import org.testng.Assert; +import org.testng.annotations.Test; + +@Slf4j +public class AvroSchemaTest { + + @Data + @ToString + @EqualsAndHashCode + private static class Foo { + private String field1; + private String field2; + private int field3; + private Bar field4; + } + + @Data + @ToString + @EqualsAndHashCode + private static class Bar { + private boolean field1; + } + + private static final String SCHEMA_JSON = "{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache" + + ".pulsar.client" + + ".schemas.AvroSchemaTest$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\",\"string\"]," + + "\"default\":null},{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null}," + + "{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":\"record\"," + + "\"name\":\"Bar\",\"fields\":[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null}]}"; + + private static String[] FOO_FIELDS = { + "field1", + "field2", + "field3", + "field4" + }; + + @Test + public void testSchema() { + AvroSchema avroSchema = AvroSchema.of(Foo.class); + Assert.assertEquals(avroSchema.getSchemaInfo().getType(), SchemaType.AVRO); + Schema.Parser parser = new Schema.Parser(); + String schemaJson = new String(avroSchema.getSchemaInfo().getSchema()); + Assert.assertEquals(schemaJson, SCHEMA_JSON); + Schema schema = parser.parse(schemaJson); + + for (String fieldName : FOO_FIELDS) { + Schema.Field field = schema.getField(fieldName); + Assert.assertNotNull(field); + + if (field.name().equals("field4")) { + Assert.assertNotNull(field.schema().getTypes().get(1).getField("field1")); + } + } + } + + @Test + public void testEncodeAndDecode() { + AvroSchema avroSchema = AvroSchema.of(Foo.class, null); + + Foo foo1 = new Foo(); + foo1.setField1("foo1"); + foo1.setField2("bar1"); + foo1.setField4(new Bar()); + + Foo foo2 = new Foo(); + foo2.setField1("foo2"); + foo2.setField2("bar2"); + + byte[] bytes1 = avroSchema.encode(foo1); + Assert.assertTrue(bytes1.length > 0); + + byte[] bytes2 = avroSchema.encode(foo2); + Assert.assertTrue(bytes2.length > 0); + + Foo object1 = avroSchema.decode(bytes1); + Foo object2 = avroSchema.decode(bytes2); + + Assert.assertEquals(object1, foo1); + Assert.assertEquals(object2, foo2); + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java index 84d021881ba..1fd38a0c1d0 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java @@ -453,6 +453,10 @@ public class Commands { return PulsarApi.Schema.Type.String; case JSON: return PulsarApi.Schema.Type.Json; + case PROTOBUF: + return PulsarApi.Schema.Type.Protobuf; + case AVRO: + return PulsarApi.Schema.Type.Avro; default: return PulsarApi.Schema.Type.None; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index ce0124b7847..5a6329be7b1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -319,11 +319,15 @@ public final class PulsarApi { None(0, 0), String(1, 1), Json(2, 2), + Protobuf(3, 3), + Avro(4, 4), ; public static final int None_VALUE = 0; public static final int String_VALUE = 1; public static final int Json_VALUE = 2; + public static final int Protobuf_VALUE = 3; + public static final int Avro_VALUE = 4; public final int getNumber() { return value; } @@ -333,6 +337,8 @@ public final class PulsarApi { case 0: return None; case 1: return String; case 2: return Json; + case 3: return Protobuf; + case 4: return Avro; default: return null; } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java index 44d78c9bc97..cbf7c91768e 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java @@ -37,5 +37,13 @@ public enum SchemaType { */ JSON, - PROTOBUF + /** + * Protobuf message encoding and decoding + */ + PROTOBUF, + + /** + * Serialize and deserialize via avro + */ + AVRO } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 3f9de1a26d9..d2ac4fd5124 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -27,6 +27,8 @@ message Schema { None = 0; String = 1; Json = 2; + Protobuf = 3; + Avro = 4; } required string name = 1; -- GitLab