提交 60475dd1 编写于 作者: S Sijie Guo 提交者: Jia Zhai

[client] Provide a clock for generating publish timestamp for producers (#4562)

*Motivation*

Currently producers uses `System.currentTimeMillis()` as publish timestamp by default.
However at some use cases, producers would like to a different way for generating publish timestamp.
E.g. in a database use case, a producer might be use HLC (Hybrid Logic Clock) as publish timestamp;
in integration tests, it might require the producer to use a deterministic way to generate publish timestamp.

*Changes*

This PR introduces a `clock` in building the client. This allows applications to override the system clock
with its own implementation.

*Verify the change*

Add unit test to test customized clock in both batch and non-batch cases.

(cherry picked from commit 7397b960)
上级 2c97da5e
......@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.spy;
......@@ -41,6 +42,9 @@ import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
......@@ -57,9 +61,11 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
......@@ -106,6 +112,140 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
super.internalCleanup();
}
@Test
public void testPublishTimestampBatchDisabled() throws Exception {
log.info("-- Starting {} test --", methodName);
AtomicLong ticker = new AtomicLong(0);
Clock clock = new Clock() {
@Override
public ZoneId getZone() {
return ZoneId.systemDefault();
}
@Override
public Clock withZone(ZoneId zone) {
return this;
}
@Override
public Instant instant() {
return Instant.ofEpochMilli(millis());
}
@Override
public long millis() {
return ticker.incrementAndGet();
}
};
@Cleanup
PulsarClient newPulsarClient = PulsarClient.builder()
.serviceUrl(lookupUrl.toString())
.clock(clock)
.build();
final String topic = "persistent://my-property/my-ns/test-publish-timestamp";
@Cleanup
Consumer<byte[]> consumer = newPulsarClient.newConsumer()
.topic(topic)
.subscriptionName("my-sub")
.subscribe();
@Cleanup
Producer<byte[]> producer = newPulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.create();
final int numMessages = 5;
for (int i = 0; i < numMessages; i++) {
producer.newMessage()
.value(("value-" + i).getBytes(UTF_8))
.eventTime((i + 1) * 100L)
.sendAsync();
}
producer.flush();
for (int i = 0; i < numMessages; i++) {
Message<byte[]> msg = consumer.receive();
log.info("Received message '{}'.", new String(msg.getValue(), UTF_8));
assertEquals(1L + i, msg.getPublishTime());
assertEquals(100L * (i + 1), msg.getEventTime());
}
}
@Test
public void testPublishTimestampBatchEnabled() throws Exception {
log.info("-- Starting {} test --", methodName);
AtomicLong ticker = new AtomicLong(0);
Clock clock = new Clock() {
@Override
public ZoneId getZone() {
return ZoneId.systemDefault();
}
@Override
public Clock withZone(ZoneId zone) {
return this;
}
@Override
public Instant instant() {
return Instant.ofEpochMilli(millis());
}
@Override
public long millis() {
return ticker.incrementAndGet();
}
};
@Cleanup
PulsarClient newPulsarClient = PulsarClient.builder()
.serviceUrl(lookupUrl.toString())
.clock(clock)
.build();
final String topic = "persistent://my-property/my-ns/test-publish-timestamp";
@Cleanup
Consumer<byte[]> consumer = newPulsarClient.newConsumer()
.topic(topic)
.subscriptionName("my-sub")
.subscribe();
final int numMessages = 5;
@Cleanup
Producer<byte[]> producer = newPulsarClient.newProducer()
.topic(topic)
.enableBatching(true)
.batchingMaxMessages(10 * numMessages)
.create();
for (int i = 0; i < numMessages; i++) {
producer.newMessage()
.value(("value-" + i).getBytes(UTF_8))
.eventTime((i + 1) * 100L)
.sendAsync();
}
producer.flush();
for (int i = 0; i < numMessages; i++) {
Message<byte[]> msg = consumer.receive();
log.info("Received message '{}'.", new String(msg.getValue(), UTF_8));
assertEquals(1L, msg.getPublishTime());
assertEquals(100L * (i + 1), msg.getEventTime());
}
}
@DataProvider(name = "batch")
public Object[][] codecProvider() {
return new Object[][] { { 0 }, { 1000 } };
......
......@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;
import java.time.Clock;
import java.util.Map;
import java.util.concurrent.TimeUnit;
......@@ -376,4 +377,20 @@ public interface ClientBuilder extends Cloneable {
* @return the client builder instance
*/
ClientBuilder maxBackoffInterval(long duration, TimeUnit unit);
/**
* The clock used by the pulsar client.
*
* <p>The clock is currently used by producer for setting publish timestamps.
* {@link Clock#millis()} is called to retrieve current timestamp as the publish
* timestamp when producers produce messages. The default clock is a system default zone
* clock. So the publish timestamp is same as calling {@link System#currentTimeMillis()}.
*
* <p>Warning: the clock is used for TTL enforcement and timestamp based seeks.
* so be aware of the impacts if you are going to use a different clock.
*
* @param clock the clock used by the pulsar client to retrieve time information
* @return the client builder instance
*/
ClientBuilder clock(Clock clock);
}
......@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;
import java.time.Clock;
import java.util.Map;
import java.util.concurrent.TimeUnit;
......@@ -223,4 +224,10 @@ public class ClientBuilderImpl implements ClientBuilder {
public ClientConfigurationData getClientConfigurationData() {
return conf;
}
@Override
public ClientBuilder clock(Clock clock) {
conf.setClock(clock);
return this;
}
}
......@@ -352,7 +352,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
sequenceId = msgMetadataBuilder.getSequenceId();
}
if (!msgMetadataBuilder.hasPublishTime()) {
msgMetadataBuilder.setPublishTime(System.currentTimeMillis());
msgMetadataBuilder.setPublishTime(client.getClientClock().millis());
checkArgument(!msgMetadataBuilder.hasProducerName());
......
......@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.client.impl;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.commons.lang3.StringUtils.isBlank;
import com.google.common.annotations.VisibleForTesting;
......@@ -33,7 +32,12 @@ import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.*;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
......@@ -69,7 +73,6 @@ import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
......@@ -115,6 +118,8 @@ public class PulsarClientImpl implements PulsarClient {
}
});
private final Clock clientClock;
public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
this(conf, getEventLoopGroup(conf));
}
......@@ -131,6 +136,7 @@ public class PulsarClientImpl implements PulsarClient {
this.eventLoopGroup = eventLoopGroup;
setAuth(conf);
this.conf = conf;
this.clientClock = conf.getClock();
conf.getAuthentication().start();
this.cnxPool = cnxPool;
externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), getThreadFactory("pulsar-external-listener"));
......@@ -157,6 +163,11 @@ public class PulsarClientImpl implements PulsarClient {
return conf;
}
@VisibleForTesting
public Clock getClientClock() {
return clientClock;
}
@Override
public ProducerBuilder<byte[]> newProducer() {
return new ProducerBuilderImpl<>(this, Schema.BYTES);
......
......@@ -19,8 +19,8 @@
package org.apache.pulsar.client.impl.conf;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.time.Clock;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.pulsar.client.api.Authentication;
......@@ -70,6 +70,9 @@ public class ClientConfigurationData implements Serializable, Cloneable {
private long defaultBackoffIntervalNanos = TimeUnit.MILLISECONDS.toNanos(100);
private long maxBackoffIntervalNanos = TimeUnit.SECONDS.toNanos(30);
@JsonIgnore
private Clock clock = Clock.systemDefaultZone();
public Authentication getAuthentication() {
if (authentication == null) {
this.authentication = new AuthenticationDisabled();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册