From fb92a51a8af6cd226d72155c2e75dd15981261f6 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 17 Oct 2017 23:02:18 -0700 Subject: [PATCH] Correct Kafka wrapper producer blocking semantic and passing the producer config object (#833) --- .../clients/producer/PulsarKafkaProducer.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java index 1ad1dbbb365..793a6418ac0 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java @@ -128,8 +128,18 @@ public class PulsarKafkaProducer implements Producer { pulsarProducerConf.setCompressionType(CompressionType.LZ4); } - pulsarProducerConf.setBlockIfQueueFull( - Boolean.parseBoolean(properties.getProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false"))); + pulsarProducerConf.setSendTimeout( + Integer.parseInt(properties.getProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000")), + TimeUnit.MILLISECONDS); + + boolean blockOnBufferFull = Boolean + .parseBoolean(properties.getProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false")); + + // Kafka blocking semantic when blockOnBufferFull=false is different from Pulsar client + // Pulsar throws error immediately when the queue is full and blockIfQueueFull=false + // Kafka, on the other hand, still blocks for "max.block.ms" time and then gives error. + boolean shouldBlockPulsarProducer = pulsarProducerConf.getSendTimeoutMs() > 0 || blockOnBufferFull; + pulsarProducerConf.setBlockIfQueueFull(shouldBlockPulsarProducer); } @Override @@ -217,7 +227,7 @@ public class PulsarKafkaProducer implements Producer { private org.apache.pulsar.client.api.Producer createNewProducer(String topic) { try { - return client.createProducer(topic); + return client.createProducer(topic, pulsarProducerConf); } catch (PulsarClientException e) { throw new RuntimeException(e); } -- GitLab