未验证 提交 88fc4541 编写于 作者: J Jia Zhai 提交者: GitHub

fix reader builder clone error (#5923)

Motivation
In reader builder, clone() method does not clone the config, and will cause error once we call a builder.clone() concurrently to create readers. This PR mainly try to fix this issue.

Modifications
use conf.clone(), to make sure conf is cloned.
add test to verify it.
fix other small issues, like get() methods, and access level, make it align with consumer/producer builder.
Verifying this change
ut passed.
上级 c7094c9e
......@@ -23,16 +23,19 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -730,4 +733,32 @@ public class TopicReaderTest extends ProducerConsumerBase {
reader.close();
producer.close();
}
@Test
public void testReaderBuilderConcurrentCreate() throws Exception {
String topicName = "persistent://my-property/my-ns/testReaderBuilderConcurrentCreate_";
int numTopic = 30;
ReaderBuilder<byte[]> builder = pulsarClient.newReader().startMessageId(MessageId.earliest);
List<CompletableFuture<Reader<byte[]>>> readers = Lists.newArrayListWithExpectedSize(numTopic);
List<Producer<byte[]>> producers = Lists.newArrayListWithExpectedSize(numTopic);
// create producer firstly
for (int i = 0; i < numTopic; i++) {
producers.add(pulsarClient.newProducer()
.topic(topicName + i)
.create());
}
// create reader concurrently
for (int i = 0; i < numTopic; i++) {
readers.add(builder.clone().topic(topicName + i).createAsync());
}
// verify readers config are different for topic name.
for (int i = 0; i < numTopic; i++) {
assertEquals(readers.get(i).get().getTopic(), topicName + i);
readers.get(i).get().close();
producers.get(i).close();
}
}
}
......@@ -29,6 +29,8 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
......@@ -53,6 +55,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import com.google.common.collect.Lists;
import lombok.NonNull;
@Getter(AccessLevel.PUBLIC)
public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
private final PulsarClientImpl client;
......@@ -338,10 +341,6 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
return this;
}
public ConsumerConfigurationData<T> getConf() {
return conf;
}
@Override
public String toString() {
return conf != null ? conf.toString() : null;
......
......@@ -105,7 +105,7 @@ public class ConsumerStatsRecorderImpl implements ConsumerStatsRecorder {
ObjectWriter w = m.writerWithDefaultPrettyPrinter();
try {
log.info("Starting Pulsar consumer perf with config: {}", w.writeValueAsString(conf));
log.info("Starting Pulsar consumer status recorder with config: {}", w.writeValueAsString(conf));
log.info("Pulsar client config: {}", w.withoutAttribute("authentication").writeValueAsString(pulsarClient.getConfiguration()));
} catch (IOException e) {
log.error("Failed to dump config info: {}", e);
......@@ -291,4 +291,4 @@ public class ConsumerStatsRecorderImpl implements ConsumerStatsRecorder {
}
private static final Logger log = LoggerFactory.getLogger(ConsumerStatsRecorderImpl.class);
}
\ No newline at end of file
}
......@@ -28,6 +28,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
......@@ -50,6 +52,7 @@ import lombok.NonNull;
import static com.google.common.base.Preconditions.checkArgument;
@Getter(AccessLevel.PUBLIC)
public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
private final PulsarClientImpl client;
......
......@@ -22,6 +22,8 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.AccessLevel;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
......@@ -35,6 +37,7 @@ import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.common.util.FutureUtil;
@Getter(AccessLevel.PUBLIC)
public class ReaderBuilderImpl<T> implements ReaderBuilder<T> {
private final PulsarClientImpl client;
......@@ -43,7 +46,7 @@ public class ReaderBuilderImpl<T> implements ReaderBuilder<T> {
private final Schema<T> schema;
ReaderBuilderImpl(PulsarClientImpl client, Schema<T> schema) {
public ReaderBuilderImpl(PulsarClientImpl client, Schema<T> schema) {
this(client, new ReaderConfigurationData<T>(), schema);
}
......@@ -56,11 +59,7 @@ public class ReaderBuilderImpl<T> implements ReaderBuilder<T> {
@Override
@SuppressWarnings("unchecked")
public ReaderBuilder<T> clone() {
try {
return (ReaderBuilder<T>) super.clone();
} catch (CloneNotSupportedException e) {
throw new RuntimeException("Failed to clone ReaderBuilderImpl");
}
return new ReaderBuilderImpl<>(client, conf.clone(), schema);
}
@Override
......@@ -112,7 +111,7 @@ public class ReaderBuilderImpl<T> implements ReaderBuilder<T> {
conf.setStartMessageFromRollbackDurationInSec(timeunit.toSeconds(rollbackDuration));
return this;
}
@Override
public ReaderBuilder<T> startMessageIdInclusive() {
conf.setResetIncludeHead(true);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册