Schema.java 11.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.client.api;

21
import java.nio.ByteBuffer;
22 23 24
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Date;
25

26
import org.apache.pulsar.client.api.schema.GenericRecord;
27
import org.apache.pulsar.client.api.schema.GenericSchema;
28
import org.apache.pulsar.client.api.schema.SchemaDefinition;
29
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
30
import org.apache.pulsar.client.internal.DefaultImplementation;
31
import org.apache.pulsar.common.schema.KeyValue;
32
import org.apache.pulsar.common.schema.KeyValueEncodingType;
D
Dave Rusek 已提交
33
import org.apache.pulsar.common.schema.SchemaInfo;
34
import org.apache.pulsar.common.schema.SchemaType;
D
Dave Rusek 已提交
35

36 37 38
/**
 * Message schema definition
 */
39
public interface Schema<T> {
40

41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
    /**
     * Check if the message is a valid object for this schema.
     *
     * <p>The implementation can choose what its most efficient approach to validate the schema.
     * If the implementation doesn't provide it, it will attempt to use {@link #decode(byte[])}
     * to see if this schema can decode this message or not as a validation mechanism to verify
     * the bytes.
     *
     * @param message the messages to verify
     * @return true if it is a valid message
     * @throws SchemaSerializationException if it is not a valid message
     */
    default void validate(byte[] message) {
        decode(message);
    }

57 58 59 60 61 62 63 64 65
    /**
     * Encode an object representing the message content into a byte array.
     *
     * @param message
     *            the message object
     * @return a byte array with the serialized content
     * @throws SchemaSerializationException
     *             if the serialization fails
     */
66
    byte[] encode(T message);
67

68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
    /**
     * Returns whether this schema supports versioning.
     *
     * <p>Most of the schema implementations don't really support schema versioning, or it just doesn't
     * make any sense to support schema versionings (e.g. primitive schemas). Only schema returns
     * {@link GenericRecord} should support schema versioning.
     *
     * <p>If a schema implementation returns <tt>false</tt>, it should implement {@link #decode(byte[])};
     * while a schema implementation returns <tt>true</tt>, it should implement {@link #decode(byte[], byte[])}
     * instead.
     *
     * @return true if this schema implementation supports schema versioning; otherwise returns false.
     */
    default boolean supportSchemaVersioning() {
        return false;
    }

85 86 87
    default void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
    }

88 89 90 91 92 93 94
    /**
     * Decode a byte array into an object using the schema definition and deserializer implementation
     *
     * @param bytes
     *            the byte array to decode
     * @return the deserialized object
     */
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
    default T decode(byte[] bytes) {
        // use `null` to indicate ignoring schema version
        return decode(bytes, null);
    }

    /**
     * Decode a byte array into an object using a given version.
     *
     * @param bytes
     *            the byte array to decode
     * @param schemaVersion
     *            the schema version to decode the object. null indicates using latest version.
     * @return the deserialized object
     */
    default T decode(byte[] bytes, byte[] schemaVersion) {
        // ignore version by default (most of the primitive schema implementations ignore schema version)
        return decode(bytes);
    }
113

114 115 116
    /**
     * @return an object that represents the Schema associated metadata
     */
D
Dave Rusek 已提交
117 118
    SchemaInfo getSchemaInfo();

119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
    /**
     * Check if this schema requires fetching schema info to configure the schema.
     *
     * @return true if the schema requires fetching schema info to configure the schema,
     *         otherwise false.
     */
    default boolean requireFetchingSchemaInfo() {
        return false;
    }

    /**
     * Configure the schema to use the provided schema info.
     *
     * @param topic topic name
     * @param componentName component name
     * @param schemaInfo schema info
     */
    default void configureSchemaInfo(String topic, String componentName,
                                     SchemaInfo schemaInfo) {
        // no-op
    }

141 142 143
    /**
     * Schema that doesn't perform any encoding on the message payloads. Accepts a byte array and it passes it through.
     */
144
    Schema<byte[]> BYTES = DefaultImplementation.newBytesSchema();
145

146 147 148 149 150
    /**
     * ByteBuffer Schema.
     */
    Schema<ByteBuffer> BYTEBUFFER = DefaultImplementation.newByteBufferSchema();

151 152 153
    /**
     * Schema that can be used to encode/decode messages whose values are String. The payload is encoded with UTF-8.
     */
154
    Schema<String> STRING = DefaultImplementation.newStringSchema();
155

156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
    /**
     * INT8 Schema
     */
    Schema<Byte> INT8 = DefaultImplementation.newByteSchema();

    /**
     * INT16 Schema
     */
    Schema<Short> INT16 = DefaultImplementation.newShortSchema();

    /**
     * INT32 Schema
     */
    Schema<Integer> INT32 = DefaultImplementation.newIntSchema();

    /**
     * INT64 Schema
     */
    Schema<Long> INT64 = DefaultImplementation.newLongSchema();

    /**
     * Boolean Schema
     */
    Schema<Boolean> BOOL = DefaultImplementation.newBooleanSchema();

    /**
     * Float Schema
     */
    Schema<Float> FLOAT = DefaultImplementation.newFloatSchema();

    /**
     * Double Schema
     */
    Schema<Double> DOUBLE = DefaultImplementation.newDoubleSchema();

191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
    /**
     * Date Schema
     */
    Schema<Date> DATE = DefaultImplementation.newDateSchema();

    /**
     * Time Schema
     */
    Schema<Time> TIME = DefaultImplementation.newTimeSchema();

    /**
     * Timestamp Schema
     */
    Schema<Timestamp> TIMESTAMP = DefaultImplementation.newTimestampSchema();

206 207 208 209 210 211
    /**
     * Create a Protobuf schema type by extracting the fields of the specified class.
     *
     * @param clazz the Protobuf generated class to be used to extract the schema
     * @return a Schema instance
     */
212
    static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF(Class<T> clazz) {
213 214 215 216 217 218 219 220 221 222 223
        return DefaultImplementation.newProtobufSchema(SchemaDefinition.builder().withPojo(clazz).build());
    }

    /**
     * Create a Protobuf schema type with schema definition.
     *
     * @param schemaDefinition schemaDefinition the definition of the schema
     * @return a Schema instance
     */
    static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF(SchemaDefinition<T> schemaDefinition) {
        return DefaultImplementation.newProtobufSchema(schemaDefinition);
224 225
    }

226
    /**
227
     * Create a  Avro schema type by default configuration of the class
228
     *
229
     * @param pojo the POJO class to be used to extract the Avro schema
230 231
     * @return a Schema instance
     */
232 233
    static <T> Schema<T> AVRO(Class<T> pojo) {
        return DefaultImplementation.newAvroSchema(SchemaDefinition.builder().withPojo(pojo).build());
234 235 236
    }

    /**
237
     * Create a Avro schema type with schema definition
238
     *
239
     * @param schemaDefinition the definition of the schema
240 241
     * @return a Schema instance
     */
242 243
    static <T> Schema<T> AVRO(SchemaDefinition<T> schemaDefinition) {
        return DefaultImplementation.newAvroSchema(schemaDefinition);
244 245
    }

246 247 248
    /**
     * Create a JSON schema type by extracting the fields of the specified class.
     *
249
     * @param pojo the POJO class to be used to extract the JSON schema
250 251
     * @return a Schema instance
     */
252 253
    static <T> Schema<T> JSON(Class<T> pojo) {
        return DefaultImplementation.newJSONSchema(SchemaDefinition.builder().withPojo(pojo).build());
254
    }
255

256
    /**
257
     * Create a JSON schema type with schema definition
258
     *
259
     * @param schemaDefinition the definition of the schema
260 261
     * @return a Schema instance
     */
262 263
    static <T> Schema<T> JSON(SchemaDefinition schemaDefinition) {
        return DefaultImplementation.newJSONSchema(schemaDefinition);
264 265
    }

266 267 268 269
    /**
     * Key Value Schema using passed in schema type, support JSON and AVRO currently.
     */
    static <K, V> Schema<KeyValue<K, V>> KeyValue(Class<K> key, Class<V> value, SchemaType type) {
270
        return DefaultImplementation.newKeyValueSchema(key, value, type);
271 272
    }

273 274 275
    /**
     * Schema that can be used to encode/decode KeyValue.
     */
276 277 278
    static Schema<KeyValue<byte[], byte[]>> KV_BYTES() {
        return DefaultImplementation.newKeyValueBytesSchema();
    }
279

280 281 282 283
    /**
     * Key Value Schema whose underneath key and value schemas are JSONSchema.
     */
    static <K, V> Schema<KeyValue<K, V>> KeyValue(Class<K> key, Class<V> value) {
284
        return DefaultImplementation.newKeyValueSchema(key, value, SchemaType.JSON);
285 286 287 288 289 290
    }

    /**
     * Key Value Schema using passed in key and value schemas.
     */
    static <K, V> Schema<KeyValue<K, V>> KeyValue(Schema<K> key, Schema<V> value) {
291
        return DefaultImplementation.newKeyValueSchema(key, value);
292 293
    }

294 295 296 297 298 299 300
    /**
     * Key Value Schema using passed in key, value and encoding type schemas.
     */
    static <K, V> Schema<KeyValue<K, V>> KeyValue(Schema<K> key, Schema<V> value, KeyValueEncodingType keyValueEncodingType) {
        return DefaultImplementation.newKeyValueSchema(key, value, keyValueEncodingType);
    }

301
    @Deprecated
302
    static Schema<GenericRecord> AUTO() {
303 304 305
        return AUTO_CONSUME();
    }

306 307 308 309 310 311 312 313 314 315
    /**
     * Create a schema instance that automatically deserialize messages
     * based on the current topic schema.
     * <p>
     * The messages values are deserialized into a {@link GenericRecord} object.
     * <p>
     * Currently this is only supported with Avro and JSON schema types.
     *
     * @return the auto schema instance
     */
316
    static Schema<GenericRecord> AUTO_CONSUME() {
317
        return DefaultImplementation.newAutoConsumeSchema();
318 319
    }

320 321 322 323 324 325 326 327 328 329 330
    /**
     * Create a schema instance that accepts a serialized payload
     * and validates it against the topic schema.
     * <p>
     * Currently this is only supported with Avro and JSON schema types.
     * <p>
     * This method can be used when publishing a raw JSON payload,
     * for which the format is known and a POJO class is not avaialable.
     *
     * @return the auto schema instance
     */
331
    static Schema<byte[]> AUTO_PRODUCE_BYTES() {
332
        return DefaultImplementation.newAutoProduceSchema();
333 334 335
    }

    static Schema<?> getSchema(SchemaInfo schemaInfo) {
336
        return DefaultImplementation.getSchema(schemaInfo);
337
    }
338

339 340 341 342 343 344 345 346
    /**
     * Returns a generic schema of existing schema info.
     *
     * <p>Only supports AVRO and JSON.
     *
     * @param schemaInfo schema info
     * @return a generic schema instance
     */
347
    static GenericSchema<GenericRecord> generic(SchemaInfo schemaInfo) {
348 349
        return DefaultImplementation.getGenericSchema(schemaInfo);
    }
350
}