提交 fb92a51a 编写于 作者: M Matteo Merli 提交者: GitHub

Correct Kafka wrapper producer blocking semantic and passing the producer config object (#833)

上级 18aec9a3
......@@ -128,8 +128,18 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
pulsarProducerConf.setCompressionType(CompressionType.LZ4);
}
pulsarProducerConf.setBlockIfQueueFull(
Boolean.parseBoolean(properties.getProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false")));
pulsarProducerConf.setSendTimeout(
Integer.parseInt(properties.getProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000")),
TimeUnit.MILLISECONDS);
boolean blockOnBufferFull = Boolean
.parseBoolean(properties.getProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false"));
// Kafka blocking semantic when blockOnBufferFull=false is different from Pulsar client
// Pulsar throws error immediately when the queue is full and blockIfQueueFull=false
// Kafka, on the other hand, still blocks for "max.block.ms" time and then gives error.
boolean shouldBlockPulsarProducer = pulsarProducerConf.getSendTimeoutMs() > 0 || blockOnBufferFull;
pulsarProducerConf.setBlockIfQueueFull(shouldBlockPulsarProducer);
}
@Override
......@@ -217,7 +227,7 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
private org.apache.pulsar.client.api.Producer createNewProducer(String topic) {
try {
return client.createProducer(topic);
return client.createProducer(topic, pulsarProducerConf);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册