提交 536d353e 编写于 作者: R Rajan 提交者: Matteo Merli

Fix: Discovery-lookup returns correct PartitionMetadataResponse on partition-lookup (#112)

上级 f9993942
......@@ -377,10 +377,28 @@ public class Commands {
return res;
}
public static ByteBuf newPartitionMetadataResponse(String brokerServiceUrl, String brokerServiceUrlTls,
long requestId) {
CommandPartitionedTopicMetadataResponse.Builder partitionMetadataResponseBuilder = CommandPartitionedTopicMetadataResponse
.newBuilder();
partitionMetadataResponseBuilder.setBrokerServiceUrl(brokerServiceUrl);
partitionMetadataResponseBuilder.setBrokerServiceUrlTls(brokerServiceUrlTls);
partitionMetadataResponseBuilder.setResponse(CommandPartitionedTopicMetadataResponse.LookupType.Redirect);
partitionMetadataResponseBuilder.setRequestId(requestId);
CommandPartitionedTopicMetadataResponse partitionMetadataResponse = partitionMetadataResponseBuilder.build();
ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.PARTITIONED_METADATA_RESPONSE)
.setPartitionMetadataResponse(partitionMetadataResponse));
partitionMetadataResponseBuilder.recycle();
partitionMetadataResponse.recycle();
return res;
}
public static ByteBuf newPartitionMetadataResponse(int partitions, long requestId) {
CommandPartitionedTopicMetadataResponse.Builder partitionMetadataResponseBuilder = CommandPartitionedTopicMetadataResponse
.newBuilder();
partitionMetadataResponseBuilder.setPartitions(partitions);
partitionMetadataResponseBuilder.setResponse(CommandPartitionedTopicMetadataResponse.LookupType.Success);
partitionMetadataResponseBuilder.setRequestId(requestId);
CommandPartitionedTopicMetadataResponse partitionMetadataResponse = partitionMetadataResponseBuilder.build();
......
......@@ -70,7 +70,7 @@ public class ServerConnection extends PulsarHandler {
if (LOG.isDebugEnabled()) {
LOG.debug("Received PartitionMetadataLookup from {}", remoteAddress);
}
sendLookupResponse(partitionMetadata.getRequestId());
sendPartitionMetadataResponse(partitionMetadata.getRequestId());
}
/**
......@@ -97,6 +97,17 @@ public class ServerConnection extends PulsarHandler {
}
}
private void sendPartitionMetadataResponse(long requestId) {
try {
LoadReport availableBroker = discoveryProvider.nextBroker();
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(availableBroker.getPulsarServiceUrl(),
availableBroker.getPulsarServieUrlTls(), requestId));
} catch (PulsarServerException e) {
LOG.warn("[{}] Failed to get next active broker {}", remoteAddress, e.getMessage(), e);
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady, e.getMessage(), requestId));
}
}
@Override
protected boolean isHandshakeCompleted() {
return state == State.Connected;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册