提交 70ae6710 编写于 作者: R Rajan 提交者: Matteo Merli

Initialize lookup-request variables locally before request-instance recycle (#117)

上级 4563540c
......@@ -151,17 +151,17 @@ public class ServerCnx extends PulsarHandler {
if (log.isDebugEnabled()) {
log.debug("Received Lookup from {}", remoteAddress);
}
lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(lookup.getTopic()),
lookup.getAuthoritative(), getRole(), lookup.getRequestId()).thenAccept(lookupResponse -> {
final long requestId = lookup.getRequestId();
final String topic = lookup.getTopic();
lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(),
getRole(), lookup.getRequestId()).thenAccept(lookupResponse -> {
ctx.writeAndFlush(lookupResponse);
}).exceptionally(ex -> {
// it should never happen
log.warn("[{}] lookup failed with error {}", remoteAddress, ex.getMessage(), ex);
ctx.writeAndFlush(
newLookupResponse(ServerError.ServiceNotReady, ex.getMessage(), lookup.getRequestId()));
log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topic, ex.getMessage(), ex);
ctx.writeAndFlush(newLookupResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
return null;
});
}
@Override
......@@ -169,21 +169,23 @@ public class ServerCnx extends PulsarHandler {
if (log.isDebugEnabled()) {
log.debug("Received PartitionMetadataLookup from {}", remoteAddress);
}
getPartitionedTopicMetadata(getBrokerService().pulsar(), getRole(),
DestinationName.get(partitionMetadata.getTopic())).thenAccept(metadata -> {
final long requestId = partitionMetadata.getRequestId();
final String topic = partitionMetadata.getTopic();
getPartitionedTopicMetadata(getBrokerService().pulsar(), getRole(), DestinationName.get(topic))
.thenAccept(metadata -> {
int partitions = metadata.partitions;
ctx.writeAndFlush(
Commands.newPartitionMetadataResponse(partitions, partitionMetadata.getRequestId()));
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId));
}).exceptionally(ex -> {
if (ex instanceof PulsarClientException) {
log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(), remoteAddress,
partitionMetadata.getTopic(), ex.getMessage());
log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(), remoteAddress, topic,
ex.getMessage());
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError,
ex.getMessage(), partitionMetadata.getRequestId()));
ex.getMessage(), requestId));
} else {
log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress, ex.getMessage());
log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress, topic,
ex.getMessage(), ex);
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
ex.getMessage(), partitionMetadata.getRequestId()));
ex.getMessage(), requestId));
}
return null;
});
......@@ -238,17 +240,18 @@ public class ServerCnx extends PulsarHandler {
} else {
authorizationFuture = CompletableFuture.completedFuture(true);
}
final String topicName = subscribe.getTopic();
final String subscriptionName = subscribe.getSubscription();
final long requestId = subscribe.getRequestId();
final long consumerId = subscribe.getConsumerId();
final SubType subType = subscribe.getSubType();
final String consumerName = subscribe.getConsumerName();
authorizationFuture.thenApply(isAuthorized -> {
if (isAuthorized) {
if (log.isDebugEnabled()) {
log.debug("[{}] Client is authorized to subscribe with role {}", remoteAddress, authRole);
}
final String topicName = subscribe.getTopic();
final String subscriptionName = subscribe.getSubscription();
final long requestId = subscribe.getRequestId();
final long consumerId = subscribe.getConsumerId();
final SubType subType = subscribe.getSubType();
final String consumerName = subscribe.getConsumerName();
log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName);
......@@ -299,8 +302,7 @@ public class ServerCnx extends PulsarHandler {
subscriptionName, exception.getCause().getMessage());
// If client timed out, the future would have been completed by subsequent close. Send error
// back to
// client, only if not completed already.
// back to client, only if not completed already.
if (consumerFuture.completeExceptionally(exception)) {
ctx.writeAndFlush(Commands.newError(requestId,
BrokerServiceException.getClientErrorCode(exception.getCause()),
......@@ -314,7 +316,7 @@ public class ServerCnx extends PulsarHandler {
} else {
String msg = "Client is not authorized to subscribe";
log.warn("[{}] {} with role {}", remoteAddress, msg, authRole);
ctx.writeAndFlush(Commands.newError(subscribe.getRequestId(), ServerError.AuthorizationError, msg));
ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
}
return null;
});
......@@ -330,26 +332,19 @@ public class ServerCnx extends PulsarHandler {
} else {
authorizationFuture = CompletableFuture.completedFuture(true);
}
// Use producer name provided by client if present
final String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName()
: service.generateUniqueProducerName();
final String topicName = cmdProducer.getTopic();
final long producerId = cmdProducer.getProducerId();
final long requestId = cmdProducer.getRequestId();
authorizationFuture.thenApply(isAuthorized -> {
if (isAuthorized) {
if (log.isDebugEnabled()) {
log.debug("[{}] Client is authorized to Produce with role {}", remoteAddress, authRole);
}
final String producerName;
if (cmdProducer.hasProducerName()) {
// Use producer name provided by client
producerName = cmdProducer.getProducerName();
} else {
// Need to generate a unique id
producerName = service.generateUniqueProducerName();
}
final String topicName = cmdProducer.getTopic();
final long producerId = cmdProducer.getProducerId();
final long requestId = cmdProducer.getRequestId();
CompletableFuture<Producer> producerFuture = new CompletableFuture<>();
CompletableFuture<Producer> existingProducerFuture = producers.putIfAbsent(producerId, producerFuture);
if (existingProducerFuture != null) {
......@@ -359,9 +354,11 @@ public class ServerCnx extends PulsarHandler {
ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producer.getProducerName()));
return null;
} else {
// There was an early request to create a producer with same producerId. This can happen when
// There was an early request to create a producer with
// same producerId. This can happen when
// client
// timeout is lower the broker timeouts. We need to wait until the previous producer creation
// timeout is lower the broker timeouts. We need to wait
// until the previous producer creation
// request
// either complete or fails.
log.warn("[{}][{}] Producer is already present on the connection", remoteAddress, topicName);
......@@ -374,7 +371,8 @@ public class ServerCnx extends PulsarHandler {
log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId);
service.getTopic(topicName).thenAccept((Topic topic) -> {
// Before creating producer, check if backlog quota exceeded on topic
// Before creating producer, check if backlog quota exceeded
// on topic
if (topic.isBacklogQuotaExceeded(producerName)) {
IllegalStateException illegalStateException = new IllegalStateException(
"Cannot create producer on topic with backlog quota exceeded");
......@@ -405,7 +403,8 @@ public class ServerCnx extends PulsarHandler {
ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName));
return;
} else {
// The producer's future was completed before by a close command
// The producer's future was completed before by
// a close command
producer.closeNow();
log.info("[{}] Cleared producer created after timeout on client side {}", remoteAddress,
producer);
......@@ -433,7 +432,8 @@ public class ServerCnx extends PulsarHandler {
log.error("[{}] Failed to create topic {}", remoteAddress, topicName, exception);
}
// If client timed out, the future would have been completed by subsequent close. Send error back to
// If client timed out, the future would have been completed
// by subsequent close. Send error back to
// client, only if not completed already.
if (producerFuture.completeExceptionally(exception)) {
ctx.writeAndFlush(Commands.newError(requestId, BrokerServiceException.getClientErrorCode(cause),
......@@ -446,7 +446,7 @@ public class ServerCnx extends PulsarHandler {
} else {
String msg = "Client is not authorized to Produce";
log.warn("[{}] {} with role {}", remoteAddress, msg, authRole);
ctx.writeAndFlush(Commands.newError(cmdProducer.getRequestId(), ServerError.AuthorizationError, msg));
ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
}
return null;
});
......
......@@ -497,6 +497,7 @@ public class ServerCnxTest {
doReturn(authorizationManager).when(brokerService).getAuthorizationManager();
doReturn(true).when(brokerService).isAuthenticationEnabled();
doReturn(true).when(brokerService).isAuthorizationEnabled();
doReturn("prod1").when(brokerService).generateUniqueProducerName();
resetChannel();
setChannelConnected();
......
......@@ -15,7 +15,6 @@
*/
package com.yahoo.pulsar.client.impl;
import static com.yahoo.pulsar.client.impl.PulsarClientImpl.requestIdGenerator;
import static java.lang.String.format;
import java.net.InetSocketAddress;
......@@ -34,13 +33,13 @@ import io.netty.buffer.ByteBuf;
class BinaryProtoLookupService implements LookupService {
private final ConnectionPool cnxPool;
private final PulsarClientImpl client;
protected final InetSocketAddress serviceAddress;
private final boolean useTls;
public BinaryProtoLookupService(ConnectionPool cnxPool, String serviceUrl, boolean useTls)
public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, boolean useTls)
throws PulsarClientException {
this.cnxPool = cnxPool;
this.client = client;
this.useTls = useTls;
URI uri;
try {
......@@ -75,8 +74,8 @@ class BinaryProtoLookupService implements LookupService {
DestinationName destination) {
CompletableFuture<InetSocketAddress> addressFuture = new CompletableFuture<InetSocketAddress>();
cnxPool.getConnection(socketAddress).thenAccept(clientCnx -> {
long requestId = requestIdGenerator.getAndIncrement();
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newLookup(destination.toString(), authoritative, requestId);
clientCnx.newLookup(request, requestId).thenAccept(lookupDataResult -> {
URI uri = null;
......@@ -133,8 +132,8 @@ class BinaryProtoLookupService implements LookupService {
CompletableFuture<PartitionedTopicMetadata> partitionFuture = new CompletableFuture<PartitionedTopicMetadata>();
cnxPool.getConnection(socketAddress).thenAccept(clientCnx -> {
long requestId = requestIdGenerator.getAndIncrement();
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newPartitionMetadataRequest(destination.toString(), requestId);
clientCnx.newLookup(request, requestId).thenAccept(lookupDataResult -> {
try {
......
......@@ -71,7 +71,7 @@ public class PulsarClientImpl implements PulsarClient {
private final AtomicLong producerIdGenerator = new AtomicLong();
private final AtomicLong consumerIdGenerator = new AtomicLong();
protected static final AtomicLong requestIdGenerator = new AtomicLong();
private final AtomicLong requestIdGenerator = new AtomicLong();
private final EventLoopGroup eventLoopGroup;
......@@ -93,7 +93,7 @@ public class PulsarClientImpl implements PulsarClient {
conf.isTlsAllowInsecureConnection(), conf.getTlsTrustCertsFilePath());
lookup = new HttpLookupService(httpClient, conf.isUseTls());
} else {
lookup = new BinaryProtoLookupService(cnxPool, serviceUrl, conf.isUseTls());
lookup = new BinaryProtoLookupService(this, serviceUrl, conf.isUseTls());
}
timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
externalExecutorProvider = new ExecutorProvider(conf.getListenerThreads(), "pulsar-external-listener");
......@@ -365,6 +365,10 @@ public class PulsarClientImpl implements PulsarClient {
return requestIdGenerator.getAndIncrement();
}
ConnectionPool getCnxPool() {
return cnxPool;
}
EventLoopGroup eventLoopGroup() {
return eventLoopGroup;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册