1. 30 3月, 2019 40 次提交
    • S
      [schema] Batch message should propagate the schema version from single message (#3870) · e8002734
      Sijie Guo 提交于
      *Motivation*
      
      Schema version is set on single message's header. But it isn't propagated to
      the batch message's header. So when the consumer receives the message batch,
      it doesn't contain the schema version.
      
      *Modifications*
      
      When intializing a message batch, propagate the schema version from single message to batch message header.
      
      *Verify this change*
      
      Verified with an integration test. Need to write some unit tests for it.
      e8002734
    • F
      [Issue 3896] [pulsar-io] Fix NPE in ElasticSearchSink (#3899) · 3bfc264e
      Fangbin Sun 提交于
      * Fix NPE in ElasticSearchSink.
      
      * Init by an empty string
      3bfc264e
    • S
      Added sensitive annotations (#3866) · a9c47380
      Sanjeev Kulkarni 提交于
      a9c47380
    • S
    • S
      Enable generation of swagger definitions for functions/sources/sinks (#3871) · fa3bc7e4
      Sanjeev Kulkarni 提交于
      * Enable generation of swagger definitions for functions/sources/sinks
      
      * Added more changes to make it backwards compatible
      
      * Add logic to server functions rest api
      
      * Deleted swagger file since its supposed to be generated.
      Also added some missing python packages for building
      
      * Added back deleted file
      
      * Fix comments
      
      * Seperated out sources and sinks
      
      * More changes to collapse all rest apis into one dropdown
      fa3bc7e4
    • M
      Fixed reading from file with newlines (#3864) · c216dc0b
      Matteo Merli 提交于
      c216dc0b
    • P
      Fix read batching message by pulsar reader (#3830) · 8e3f0140
      penghui 提交于
      ### Motivation
      
      Fix bug of reader.hasMessageAvailable().
      
      ```java
      while (reader.hasMessageAvailable()) {
              Message msg = reader.readNext();
          // Do something
      }
      ```
      
      If lastDequeuedMessage with a batchMessageId, reader.hasMessageAvailable() will return false after read first message of batchMessage, because compared message id is a MessageIdImpl(lastMessageIdInBroker) 
      
      ### Modifications
      
      Add batching message check.
      
      ### Verifying this change
      
      - [x] This change added tests and can be verified as follows:
      - *Added UT tests for read batch message and non-batch message*
      8e3f0140
    • R
      [pulsar-broker] Fix deadlock: add zk-operation timeout for blocking call on zk-cache (#3633) · a76222f9
      Rajan Dhabalia 提交于
      fix: compilation
      
      fix test
      
      fix test
      a76222f9
    • F
      [Issue 3806] Fix NPE while call PartitionedProducerImpl.getStats() (#3829) · ede6f6e7
      Fangbin Sun 提交于
      * Fix NPE while call PartitionedProducerImpl.getStats().
      
      * Add unit tests and fix NPE while call MultiTopicsConsumerImpl.getStats()
      ede6f6e7
    • S
      Fix the loop of consumer poll, so the consumer can cache more than one record... · 95b44148
      se7enkings 提交于
      Fix the loop of consumer poll, so the consumer can cache more than one record in signal poll. (#3852)
      
      95b44148
    • L
      [Issue #3712][python-client] exposing InitialPosition management in the... · c078e71a
      Le Labourier Marc 提交于
      [Issue #3712][python-client] exposing InitialPosition management in the ConsumerConfiguration. (#3714)
      
      ### Motivation
      PR #3567 introduced the SubscriptionInitialPosition option in ConsumerConfiguration in the CPP client but did not exposed any methods to be able to do it with the Python client.
      
      This PR aims to expose the code introduced in the previous PR to allows Python user to choose the initial position of the consumer in python. 
      
      ### Modifications
      
      - Implemented a boost object to expose InitialPosition Enum.
      - Added to boost ConsumerConfiguration object the getter/setter of the InitialPosition attribute.
      - Added a initial_position parameter to Client.subscribe in order to modify the ConsumerConfiguration instance created.
      
      ### Verifying this change
      
      This change is a trivial rework / code cleanup without any test coverage.
      c078e71a
    • C
      revise the schema default type not null (#3752) · 6e76af2f
      congbo 提交于
      Fix #3741
      
      Support define not not allow null field in schema
      
      Add not allow null field schema verify
      
      Does this pull request potentially affect one of the following parts:
      If yes was chosen, please highlight the changes
      
      Dependencies (does it add or upgrade a dependency): (no)
      The public API: (no)
      The schema: (yes)
      The default values of configurations: (no)
      The wire protocol: (no)
      The rest endpoints: (no)
      The admin cli options: (no)
      Anything that affects deployment: (no)
      6e76af2f
    • S
      Support passing schema definition for JSON and AVRO schemas (#3766) · 41d09796
      Sijie Guo 提交于
      * Support passing schema definition for JSON and AVRO schemas
      
      *Motivation*
      
      Currently AVRO and Schema generated schemas from POJO directly.
      Sometime people would like to use pre-generated/defined schemas,
      so allow passing in schema definitions would clear the confusions
      on parsing schemas from POJO.
      
      *Modifications*
      
      - Abstract a common base class `StructSchema` for AVRO/PROTOBUF/JSON
      - Standarize on using avro schema for defining schema (we already did that. this change only makes it clearer)
      - Add methods to pass schema definition for JSON and AVRO schemas
      
      *NOTES*
      
      We don't support passing schema definition for PROTOBUF. since we only supported generated messages as POJO
      class for protobuf schema, and we generate schema definition from the generated messages. it doesn't make sense
      to pass in a different schema definition.
      
      * Add missing license header
      41d09796
    • S
      [schema] use UTF_8 for storing schema information (#3666) · f1f781be
      Sijie Guo 提交于
      *Motivation*
      
      Currently we don't specify charset. It will be causing troubles when client
      and broker are using different system charsets.
      
      *Modifications*
      
      Use UTF_8 for the schema information for AVRO/PROTOBUF/JSON
      f1f781be
    • S
      [schema] Introduce `GenericSchema` interface (#3683) · ad888f13
      Sijie Guo 提交于
      *Motivation*
      
      In order to introduce `GenericRecordBuilder`, we need to know the fields in a `GenericSchema`.
      Otherwise, there is no way for us to build a GenericRecord.
      
      *Modifications*
      
      This proposal refactors current generic schema by introducing a `GenericSchema`. This generic schema
      provides interfaces to retrieve the fields of a `GenericRecordSchema`.
      
      *Additionally*
      
      This proposal adding the primitive schemas into `Schema` class. So people can program primitive schemas
      using Schema interface rather than specific implementations.
      ad888f13
    • S
      [schema] Introduce schema builder to build schema. (#3682) · f458ec34
      Sijie Guo 提交于
      *Motivation*
      
      Currently we are supporting POJO based schema in java clients.
      POJO schema is only useful when the POJO is predefined. However
      in applications like a CDC pipeline, POJO is no predefined, there
      is no other way to define a schema.
      
      Since we are using avro schema for schema management, this PR
      is proposing a simple schema builder wrapper on avro schema builder.
      
      *Modifications*
      
      Introduce schema builder to build a record schema.
      
      *NOTES*
      
      Currently we only support primitives in defining fields in a record schema in this PR.
      We will add nested types in future PRs.
      f458ec34
    • S
      [schema] Introduce multi version generic record schema (#3670) · 89e3410b
      Sijie Guo 提交于
      *Motivation*
      
      Currently AUTO_CONSUME only supports decoding records from latest schema.
      All the schema versions are lost. It makes AUTO_CONSUME less useful in some use cases,
      such as CDC. Because there is no way for the applications to know which version of schema
      that a message is using.
      
      In order to support multi-version schema, we need to propagate schema version from
      message header through schema#decode method to the decoded record.
      
      *Modifications*
      
      - Introduce a new decode method `decode(byte[] data, byte[] schemaVersion)`. This allows the implementation
        to leverage the schema version.
      - Introduce a method `supportSchemaVersioning` to tell which decode methods to use. Because most of the schema
        implementations such as primitive schemas and POJO based schema doesn't make any sense to use schema version.
      - Introduce a SchemaProvider which returns a specific schema instance for a given schema version
      - Implement a MultiVersionGenericRecordSchema which decode the messages based on schema version. All the records
        decoded by this schema will have schema version and its corresponding schema definitions.
      
      *NOTES
      
      This implementation only introduce the mechanism. But it doesn't wire the multi-version schema
      with auto_consume schema. There will be a subsequent pull request on implementing a schema provider
      that fetches and caches schemas from brokers.
      89e3410b
    • J
      fix s3 spport for s3 api (#3845) · eb96f937
      Jia Zhai 提交于
      eb96f937
    • R
      709d49be
    • S
      Fix set-project-version.sh (#3847) · 9160c149
      Sijie Guo 提交于
      *Motivation*
      
      The script doesn't update the parent version for protobuf-shaded module
      
      *Modifications*
      
      use `versions:update-parent` to update parent version
      9160c149
    • S
      [stats] Expose namespace topics count when exposing topic level metrics (#3849) · 5e79dd48
      Sijie Guo 提交于
      *Motivation*
      
      Topics count is a namespace level metric. Currently if `exposeTopicLevelMetricsInPrometheus=true`
      we don't expose `pulsar_topics_count`. However `pulsar_topics_count` is a very useful metric for
      monitoring the healthy of the cluster.
      
      *Modifications*
      
      if `exposeTopicLevelMetricsInPrometheus=true`, we expose namespace level metrics that are not
      exposed at topic level. e.g. `pulsar_topics_count`.
      5e79dd48
    • S
      Use `PULSAR_PREFIX_` for appending new keys (#3858) · 780ba31a
      Sijie Guo 提交于
      *Motivation*
      
      Currently integration tests are broken due to #3827
      
      `PULSAR_` is used almost everywhere in our scripts. If we are using
      `PULSAR_` for appending new keys, then we are potentially appending
      a wrong key into the config file.
      
      Currently this happens on presto. We are appending `PULSAR_ROOT_LOGGER`
      into presto's config file and cause presto fail to load the config.
      
      *Modifications*
      
      Use `PULSAR_PREFIX_` instead of `PULSAR_`
      780ba31a
    • S
      Expand add env functionality to add variables if not present (#3827) · b6c767ef
      Sanjeev Kulkarni 提交于
      * Add config variables if absent
      
      * Took feedback into account
      b6c767ef
    • M
    • E
      [cpp client] Bugfix prevent dup consumer for same topic subscription (#3748) · c6aa56fe
      Ezequiel Lovelle 提交于
      Same fix as #3746 but for cpp client.
      
        - Filter consumers for the same topic name.
        - Add test to verify that same subscription names with different topics are
          allowed to be different consumers subscription instead of reused.
      c6aa56fe
    • T
      fix message_id_serialize to empty slice (#3801) · d131a2b5
      Tevic 提交于
      * fix message_id_serialize to empty slice
      
      * test for serialize and deserialize
      d131a2b5
    • K
      Implement configurable token auth claim (#3826) · 9355a19f
      Kai 提交于
      9355a19f
    • M
      In Java allow messages that compress to <5mb to be sent with batching enabled (#3718) · b725d87a
      Matteo Merli 提交于
      * In Java allow messages that compress to <5mb to be sent with batching enabled
      
      * Test is not anymore accurate since behavior changed and we're already testing in ProducerConsumerTest
      b725d87a
    • S
      Issue #3803: Make ManagedLedger read batch size configurable (#3808) · e091a9b4
      Sijie Guo 提交于
      *Motivation*
      
      Fixes #3803
      
      Hardcoding is a very bad practice. It means we have no way to alter system behavior
      when production issues occur.
      
      *Modifications*
      
      introduce a few read batch related settings to make them configurable
      e091a9b4
    • E
      Feature - implement reference count for ConsumerImpl (#3795) · e190534c
      Ezequiel Lovelle 提交于
      * Feature - implement reference count for ConsumerImpl
      
      Add reference count for ConsumerImpl in order to track reused instances of a
      consumer instance returned by `subscribe()` method call.
      Having the reference of subscribed consumer instances offers the ability to not
      close a consumer until the last corresponding `close()` is being called.
      
      Modifications:
      
        - Add field on ConsumerBase to track references of consumer instances
          subscribed by the user.
        - Add checks on ConsumerImpl to know whether close() action should be
          performed regarding of reference count being zero value.
        - Increment reference count when a previous built consumer instance is being
          used by caller.
      
      Future considerations:
      
      When optimization #3312 is going to be made for other consumers implementation
      aside from ConsumerImpl it should add refCount checks on close() method call.
      
      * Add tests for reference count on ConsumerImpl
      
        - Add test to verify ConsumerImpl reference count on close() method.
        - Fix test from dup consumers feature with refcount.
      e190534c
    • R
      [pulsar-webscoket] avoid creating temp list of metrics on every-metrics generator (#3792) · 449f2f5b
      Rajan Dhabalia 提交于
      * [pulsar-webscoket] avoid creating temp list of metrics on every-metrics generator
      
      * fix non-sync getMetrics
      449f2f5b
    • S
      [python client] set protobuf version to be larger than 3.6.0 (#3737) · 18aa4302
      Sijie Guo 提交于
      *Motivation*
      
      Fixes #3686
      18aa4302
    • S
      NullPointerException at using BytesSchema.of() (#3754) · 01f868c9
      Sijie Guo 提交于
      Fixes #3734
      
      *Motivation*
      
      Exception occurred when using `BytesSchema.of()`
      
      ```
      Exception in thread "main" java.lang.ExceptionInInitializerError
      	at org.apache.pulsar.examples.simple.ProducerExample.main(ProducerExample.java:32)
      Caused by: java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
      	at org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:36)
      	at org.apache.pulsar.client.internal.DefaultImplementation.newKeyValueSchema(DefaultImplementation.java:158)
      	at org.apache.pulsar.client.api.Schema.<clinit>(Schema.java:123)
      	... 1 more
      Caused by: java.lang.reflect.InvocationTargetException
      	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
      	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
      	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
      	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
      	at org.apache.pulsar.client.internal.DefaultImplementation.lambda$newKeyValueSchema$16(DefaultImplementation.java:160)
      	at org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:34)
      	... 3 more
      Caused by: java.lang.NullPointerException
      	at org.apache.pulsar.client.impl.schema.KeyValueSchema.<init>(KeyValueSchema.java:68)
      	... 9 more
      ```
      
      The problem introduced because the weird class loading and reflection sequence.
      
      When accessing `BytesSchema`, `BytesSchema` will try to initialize `Schema`. When initializing Schema, it will attempts
      to initialize `KV_BYTES` using reflection, and initializing KV_BYTES requires `BytesSchema`. Hence it causes KV_BYTES not being
      initialized correctly.
      
      The change is to avoid this recursive class loading.
      01f868c9
    • M
      Make Python consumers acks async (#3782) · 15d2c3cb
      Matteo Merli 提交于
      ### Motivation
      
      Fixes #3779 
      
      The behavior of acknowledgment has always been to throw exception if consumer is not connected.
      
      In reality it doesn't make much sense to bubble back exception to user in this case because: 
       * There's no ack on the ack anyway
       * The only reason for ack to fail is if consumer is disconnected
       * If consumer is disconnected, messages are going to be replayed in any case
       * Therefore, there's no action to do when acking while disconnected
      
      This is already the case in the Java and Go clients (additionally Java client performs client side grouping of acks based on time). 
      
      Changing the Python client behavior to be the same.
      15d2c3cb
    • M
      2d924ab1
    • M
    • M
      399b6dad
    • B
      fix: function config cleanupSubscription update bug (#3771) · 84dba37e
      Boyang Jerry Peng 提交于
      * fix: function config cleanupSubscription update bug
      
      * add and fix unit tests
      84dba37e
    • B
      propagate default resource values in Pulsar Functions (#3636) · f46bed75
      Boyang Jerry Peng 提交于
      * propagate default resource values in Pulsar Functions
      
      * fix unit tests
      f46bed75
    • M
      Fixed Reader.HasNext() in Go client (#3764) · 36362ad0
      Matteo Merli 提交于
      * Fixed Reader.HasNext() in Go client
      
      * Fixed formatting
      
      * Removed commented code
      36362ad0