diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java index 1ad1dbbb36540d7ea91c341c03bfd11e00fdc2a7..793a6418ac055754b3c6bcd237d0ee30f823c4b3 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java @@ -128,8 +128,18 @@ public class PulsarKafkaProducer implements Producer { pulsarProducerConf.setCompressionType(CompressionType.LZ4); } - pulsarProducerConf.setBlockIfQueueFull( - Boolean.parseBoolean(properties.getProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false"))); + pulsarProducerConf.setSendTimeout( + Integer.parseInt(properties.getProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000")), + TimeUnit.MILLISECONDS); + + boolean blockOnBufferFull = Boolean + .parseBoolean(properties.getProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false")); + + // Kafka blocking semantic when blockOnBufferFull=false is different from Pulsar client + // Pulsar throws error immediately when the queue is full and blockIfQueueFull=false + // Kafka, on the other hand, still blocks for "max.block.ms" time and then gives error. + boolean shouldBlockPulsarProducer = pulsarProducerConf.getSendTimeoutMs() > 0 || blockOnBufferFull; + pulsarProducerConf.setBlockIfQueueFull(shouldBlockPulsarProducer); } @Override @@ -217,7 +227,7 @@ public class PulsarKafkaProducer implements Producer { private org.apache.pulsar.client.api.Producer createNewProducer(String topic) { try { - return client.createProducer(topic); + return client.createProducer(topic, pulsarProducerConf); } catch (PulsarClientException e) { throw new RuntimeException(e); }