- 30 3月, 2019 40 次提交
-
-
由 Ezequiel Lovelle 提交于
Add reference count feature to keep track of reused instances of a consumer instance, for more details please see commit ff4db8db. *Modifications* - Add refCount instance variable on ConsumerImpl. - Use new safeDecrRefCount() on consumer close() in order to know whether effective close call should occur or not. - Increment reference count when a previous built consumer instance is being used by caller. *Future considerations* Thereafter when feature preventing duplicated consumer is made for PartitionedConsumer, MultiTopicsConsumer and PatternMultiTopicsConsumer, incrRefCount() member could be turned into a pure virtual method.
-
由 congbo 提交于
## Motivation To compromise avro‘s bug for avro filed logical type https://issues.apache.org/jira/browse/AVRO-1891 ## Modifications add some initialize class ## Verifying this change Add logical type test in AvroSchemaTest
-
由 Sijie Guo 提交于
*Motivation* Schema version is set on single message's header. But it isn't propagated to the batch message's header. So when the consumer receives the message batch, it doesn't contain the schema version. *Modifications* When intializing a message batch, propagate the schema version from single message to batch message header. *Verify this change* Verified with an integration test. Need to write some unit tests for it.
-
由 Fangbin Sun 提交于
* Fix NPE in ElasticSearchSink. * Init by an empty string
-
由 Sanjeev Kulkarni 提交于
-
由 Sanjeev Kulkarni 提交于
-
由 Sanjeev Kulkarni 提交于
* Enable generation of swagger definitions for functions/sources/sinks * Added more changes to make it backwards compatible * Add logic to server functions rest api * Deleted swagger file since its supposed to be generated. Also added some missing python packages for building * Added back deleted file * Fix comments * Seperated out sources and sinks * More changes to collapse all rest apis into one dropdown
-
由 Matteo Merli 提交于
-
由 penghui 提交于
### Motivation Fix bug of reader.hasMessageAvailable(). ```java while (reader.hasMessageAvailable()) { Message msg = reader.readNext(); // Do something } ``` If lastDequeuedMessage with a batchMessageId, reader.hasMessageAvailable() will return false after read first message of batchMessage, because compared message id is a MessageIdImpl(lastMessageIdInBroker) ### Modifications Add batching message check. ### Verifying this change - [x] This change added tests and can be verified as follows: - *Added UT tests for read batch message and non-batch message*
-
由 Rajan Dhabalia 提交于
fix: compilation fix test fix test
-
由 Fangbin Sun 提交于
* Fix NPE while call PartitionedProducerImpl.getStats(). * Add unit tests and fix NPE while call MultiTopicsConsumerImpl.getStats()
-
由 se7enkings 提交于
Fix the loop of consumer poll, so the consumer can cache more than one record in signal poll. (#3852)
-
由 Le Labourier Marc 提交于
[Issue #3712][python-client] exposing InitialPosition management in the ConsumerConfiguration. (#3714) ### Motivation PR #3567 introduced the SubscriptionInitialPosition option in ConsumerConfiguration in the CPP client but did not exposed any methods to be able to do it with the Python client. This PR aims to expose the code introduced in the previous PR to allows Python user to choose the initial position of the consumer in python. ### Modifications - Implemented a boost object to expose InitialPosition Enum. - Added to boost ConsumerConfiguration object the getter/setter of the InitialPosition attribute. - Added a initial_position parameter to Client.subscribe in order to modify the ConsumerConfiguration instance created. ### Verifying this change This change is a trivial rework / code cleanup without any test coverage.
-
由 congbo 提交于
Fix #3741 Support define not not allow null field in schema Add not allow null field schema verify Does this pull request potentially affect one of the following parts: If yes was chosen, please highlight the changes Dependencies (does it add or upgrade a dependency): (no) The public API: (no) The schema: (yes) The default values of configurations: (no) The wire protocol: (no) The rest endpoints: (no) The admin cli options: (no) Anything that affects deployment: (no)
-
由 Sijie Guo 提交于
* Support passing schema definition for JSON and AVRO schemas *Motivation* Currently AVRO and Schema generated schemas from POJO directly. Sometime people would like to use pre-generated/defined schemas, so allow passing in schema definitions would clear the confusions on parsing schemas from POJO. *Modifications* - Abstract a common base class `StructSchema` for AVRO/PROTOBUF/JSON - Standarize on using avro schema for defining schema (we already did that. this change only makes it clearer) - Add methods to pass schema definition for JSON and AVRO schemas *NOTES* We don't support passing schema definition for PROTOBUF. since we only supported generated messages as POJO class for protobuf schema, and we generate schema definition from the generated messages. it doesn't make sense to pass in a different schema definition. * Add missing license header
-
由 Sijie Guo 提交于
*Motivation* Currently we don't specify charset. It will be causing troubles when client and broker are using different system charsets. *Modifications* Use UTF_8 for the schema information for AVRO/PROTOBUF/JSON
-
由 Sijie Guo 提交于
*Motivation* In order to introduce `GenericRecordBuilder`, we need to know the fields in a `GenericSchema`. Otherwise, there is no way for us to build a GenericRecord. *Modifications* This proposal refactors current generic schema by introducing a `GenericSchema`. This generic schema provides interfaces to retrieve the fields of a `GenericRecordSchema`. *Additionally* This proposal adding the primitive schemas into `Schema` class. So people can program primitive schemas using Schema interface rather than specific implementations.
-
由 Sijie Guo 提交于
*Motivation* Currently we are supporting POJO based schema in java clients. POJO schema is only useful when the POJO is predefined. However in applications like a CDC pipeline, POJO is no predefined, there is no other way to define a schema. Since we are using avro schema for schema management, this PR is proposing a simple schema builder wrapper on avro schema builder. *Modifications* Introduce schema builder to build a record schema. *NOTES* Currently we only support primitives in defining fields in a record schema in this PR. We will add nested types in future PRs.
-
由 Sijie Guo 提交于
*Motivation* Currently AUTO_CONSUME only supports decoding records from latest schema. All the schema versions are lost. It makes AUTO_CONSUME less useful in some use cases, such as CDC. Because there is no way for the applications to know which version of schema that a message is using. In order to support multi-version schema, we need to propagate schema version from message header through schema#decode method to the decoded record. *Modifications* - Introduce a new decode method `decode(byte[] data, byte[] schemaVersion)`. This allows the implementation to leverage the schema version. - Introduce a method `supportSchemaVersioning` to tell which decode methods to use. Because most of the schema implementations such as primitive schemas and POJO based schema doesn't make any sense to use schema version. - Introduce a SchemaProvider which returns a specific schema instance for a given schema version - Implement a MultiVersionGenericRecordSchema which decode the messages based on schema version. All the records decoded by this schema will have schema version and its corresponding schema definitions. *NOTES This implementation only introduce the mechanism. But it doesn't wire the multi-version schema with auto_consume schema. There will be a subsequent pull request on implementing a schema provider that fetches and caches schemas from brokers.
-
由 Jia Zhai 提交于
-
由 Rajan Dhabalia 提交于
-
由 Sijie Guo 提交于
*Motivation* The script doesn't update the parent version for protobuf-shaded module *Modifications* use `versions:update-parent` to update parent version
-
由 Sijie Guo 提交于
*Motivation* Topics count is a namespace level metric. Currently if `exposeTopicLevelMetricsInPrometheus=true` we don't expose `pulsar_topics_count`. However `pulsar_topics_count` is a very useful metric for monitoring the healthy of the cluster. *Modifications* if `exposeTopicLevelMetricsInPrometheus=true`, we expose namespace level metrics that are not exposed at topic level. e.g. `pulsar_topics_count`.
-
由 Sijie Guo 提交于
*Motivation* Currently integration tests are broken due to #3827 `PULSAR_` is used almost everywhere in our scripts. If we are using `PULSAR_` for appending new keys, then we are potentially appending a wrong key into the config file. Currently this happens on presto. We are appending `PULSAR_ROOT_LOGGER` into presto's config file and cause presto fail to load the config. *Modifications* Use `PULSAR_PREFIX_` instead of `PULSAR_`
-
由 Sanjeev Kulkarni 提交于
* Add config variables if absent * Took feedback into account
-
由 Matteo Merli 提交于
-
由 Ezequiel Lovelle 提交于
Same fix as #3746 but for cpp client. - Filter consumers for the same topic name. - Add test to verify that same subscription names with different topics are allowed to be different consumers subscription instead of reused.
-
由 Tevic 提交于
* fix message_id_serialize to empty slice * test for serialize and deserialize
-
由 Kai 提交于
-
由 Matteo Merli 提交于
* In Java allow messages that compress to <5mb to be sent with batching enabled * Test is not anymore accurate since behavior changed and we're already testing in ProducerConsumerTest
-
由 Sijie Guo 提交于
*Motivation* Fixes #3803 Hardcoding is a very bad practice. It means we have no way to alter system behavior when production issues occur. *Modifications* introduce a few read batch related settings to make them configurable
-
由 Ezequiel Lovelle 提交于
* Feature - implement reference count for ConsumerImpl Add reference count for ConsumerImpl in order to track reused instances of a consumer instance returned by `subscribe()` method call. Having the reference of subscribed consumer instances offers the ability to not close a consumer until the last corresponding `close()` is being called. Modifications: - Add field on ConsumerBase to track references of consumer instances subscribed by the user. - Add checks on ConsumerImpl to know whether close() action should be performed regarding of reference count being zero value. - Increment reference count when a previous built consumer instance is being used by caller. Future considerations: When optimization #3312 is going to be made for other consumers implementation aside from ConsumerImpl it should add refCount checks on close() method call. * Add tests for reference count on ConsumerImpl - Add test to verify ConsumerImpl reference count on close() method. - Fix test from dup consumers feature with refcount.
-
由 Rajan Dhabalia 提交于
* [pulsar-webscoket] avoid creating temp list of metrics on every-metrics generator * fix non-sync getMetrics
-
由 Sijie Guo 提交于
*Motivation* Fixes #3686
-
由 Sijie Guo 提交于
Fixes #3734 *Motivation* Exception occurred when using `BytesSchema.of()` ``` Exception in thread "main" java.lang.ExceptionInInitializerError at org.apache.pulsar.examples.simple.ProducerExample.main(ProducerExample.java:32) Caused by: java.lang.RuntimeException: java.lang.reflect.InvocationTargetException at org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:36) at org.apache.pulsar.client.internal.DefaultImplementation.newKeyValueSchema(DefaultImplementation.java:158) at org.apache.pulsar.client.api.Schema.<clinit>(Schema.java:123) ... 1 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.pulsar.client.internal.DefaultImplementation.lambda$newKeyValueSchema$16(DefaultImplementation.java:160) at org.apache.pulsar.client.internal.ReflectionUtils.catchExceptions(ReflectionUtils.java:34) ... 3 more Caused by: java.lang.NullPointerException at org.apache.pulsar.client.impl.schema.KeyValueSchema.<init>(KeyValueSchema.java:68) ... 9 more ``` The problem introduced because the weird class loading and reflection sequence. When accessing `BytesSchema`, `BytesSchema` will try to initialize `Schema`. When initializing Schema, it will attempts to initialize `KV_BYTES` using reflection, and initializing KV_BYTES requires `BytesSchema`. Hence it causes KV_BYTES not being initialized correctly. The change is to avoid this recursive class loading.
-
由 Matteo Merli 提交于
### Motivation Fixes #3779 The behavior of acknowledgment has always been to throw exception if consumer is not connected. In reality it doesn't make much sense to bubble back exception to user in this case because: * There's no ack on the ack anyway * The only reason for ack to fail is if consumer is disconnected * If consumer is disconnected, messages are going to be replayed in any case * Therefore, there's no action to do when acking while disconnected This is already the case in the Java and Go clients (additionally Java client performs client side grouping of acks based on time). Changing the Python client behavior to be the same.
-
由 Matteo Merli 提交于
-
由 Matteo Merli 提交于
-
由 Matteo Merli 提交于
-
由 Boyang Jerry Peng 提交于
* fix: function config cleanupSubscription update bug * add and fix unit tests
-