提交 b8aa2b0d 编写于 作者: S Sijie Guo 提交者: Jia Zhai

Fix the getSchema logic in pulsar proxy (#4975)

*Motivation*

The getSchema logic in Pulsar proxy handler doesn't handle the case that
the request doesn't have schema version.

*Modification*

- Fix the logic to handle the case that a GetSchema request doesn't have schema version.
- Forward the GetSchemaResponse back to the client

*Tests*

The GetSchema tests in ProxyTest was disabled by mistake. Turn it on.
(cherry picked from commit 676fd71a)
上级 6f553daf
...@@ -83,6 +83,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; ...@@ -83,6 +83,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError; import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil; import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -103,7 +104,7 @@ public class ClientCnx extends PulsarHandler { ...@@ -103,7 +104,7 @@ public class ClientCnx extends PulsarHandler {
private final ConcurrentLongHashMap<CompletableFuture<List<String>>> pendingGetTopicsRequests = private final ConcurrentLongHashMap<CompletableFuture<List<String>>> pendingGetTopicsRequests =
new ConcurrentLongHashMap<>(16, 1); new ConcurrentLongHashMap<>(16, 1);
private final ConcurrentLongHashMap<CompletableFuture<Optional<SchemaInfo>>> pendingGetSchemaRequests = new ConcurrentLongHashMap<>( private final ConcurrentLongHashMap<CompletableFuture<CommandGetSchemaResponse>> pendingGetSchemaRequests = new ConcurrentLongHashMap<>(
16, 1); 16, 1);
private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new ConcurrentLongHashMap<>(16, 1); private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new ConcurrentLongHashMap<>(16, 1);
...@@ -704,23 +705,12 @@ public class ClientCnx extends PulsarHandler { ...@@ -704,23 +705,12 @@ public class ClientCnx extends PulsarHandler {
long requestId = commandGetSchemaResponse.getRequestId(); long requestId = commandGetSchemaResponse.getRequestId();
CompletableFuture<Optional<SchemaInfo>> future = pendingGetSchemaRequests.remove(requestId); CompletableFuture<CommandGetSchemaResponse> future = pendingGetSchemaRequests.remove(requestId);
if (future == null) { if (future == null) {
log.warn("{} Received unknown request id from server: {}", ctx.channel(), requestId); log.warn("{} Received unknown request id from server: {}", ctx.channel(), requestId);
return; return;
} }
future.complete(commandGetSchemaResponse);
if (commandGetSchemaResponse.hasErrorCode()) {
// Request has failed
ServerError rc = commandGetSchemaResponse.getErrorCode();
if (rc == ServerError.TopicNotFound) {
future.complete(Optional.empty());
} else {
future.completeExceptionally(getPulsarClientException(rc, commandGetSchemaResponse.getErrorMessage()));
}
} else {
future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(commandGetSchemaResponse.getSchema())));
}
} }
Promise<Void> newPromise() { Promise<Void> newPromise() {
...@@ -774,7 +764,25 @@ public class ClientCnx extends PulsarHandler { ...@@ -774,7 +764,25 @@ public class ClientCnx extends PulsarHandler {
} }
public CompletableFuture<Optional<SchemaInfo>> sendGetSchema(ByteBuf request, long requestId) { public CompletableFuture<Optional<SchemaInfo>> sendGetSchema(ByteBuf request, long requestId) {
CompletableFuture<Optional<SchemaInfo>> future = new CompletableFuture<>(); return sendGetRawSchema(request, requestId).thenCompose(commandGetSchemaResponse -> {
if (commandGetSchemaResponse.hasErrorCode()) {
// Request has failed
ServerError rc = commandGetSchemaResponse.getErrorCode();
if (rc == ServerError.TopicNotFound) {
return CompletableFuture.completedFuture(Optional.empty());
} else {
return FutureUtil.failedFuture(
getPulsarClientException(rc, commandGetSchemaResponse.getErrorMessage()));
}
} else {
return CompletableFuture.completedFuture(
Optional.of(SchemaInfoUtil.newSchemaInfo(commandGetSchemaResponse.getSchema())));
}
});
}
public CompletableFuture<CommandGetSchemaResponse> sendGetRawSchema(ByteBuf request, long requestId) {
CompletableFuture<CommandGetSchemaResponse> future = new CompletableFuture<>();
pendingGetSchemaRequests.put(requestId, future); pendingGetSchemaRequests.put(requestId, future);
......
...@@ -938,13 +938,26 @@ public class Commands { ...@@ -938,13 +938,26 @@ public class Commands {
schema.setSchemaVersion(ByteString.copyFrom(version.get().bytes())); schema.setSchemaVersion(ByteString.copyFrom(version.get().bytes()));
} }
CommandGetSchema getSchema = schema.build();
ByteBuf res = serializeWithSize(BaseCommand.newBuilder() ByteBuf res = serializeWithSize(BaseCommand.newBuilder()
.setType(Type.GET_SCHEMA) .setType(Type.GET_SCHEMA)
.setGetSchema(schema.build())); .setGetSchema(getSchema));
schema.recycle(); schema.recycle();
return res; return res;
} }
public static ByteBuf newGetSchemaResponse(long requestId, CommandGetSchemaResponse response) {
CommandGetSchemaResponse.Builder schemaResponseBuilder = CommandGetSchemaResponse.newBuilder(response)
.setRequestId(requestId);
ByteBuf res = serializeWithSize(BaseCommand.newBuilder()
.setType(Type.GET_SCHEMA_RESPONSE)
.setGetSchemaResponse(schemaResponseBuilder.build()));
schemaResponseBuilder.recycle();
return res;
}
public static ByteBuf newGetSchemaResponse(long requestId, SchemaInfo schema, SchemaVersion version) { public static ByteBuf newGetSchemaResponse(long requestId, SchemaInfo schema, SchemaVersion version) {
PulsarApi.CommandGetSchemaResponse.Builder schemaResponse = PulsarApi.CommandGetSchemaResponse.newBuilder() PulsarApi.CommandGetSchemaResponse.Builder schemaResponse = PulsarApi.CommandGetSchemaResponse.newBuilder()
.setRequestId(requestId) .setRequestId(requestId)
......
...@@ -27,6 +27,7 @@ import java.net.URISyntaxException; ...@@ -27,6 +27,7 @@ import java.net.URISyntaxException;
import java.util.Optional; import java.util.Optional;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse;
import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema; import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema;
...@@ -347,11 +348,12 @@ public class LookupProxyHandler { ...@@ -347,11 +348,12 @@ public class LookupProxyHandler {
public void handleGetSchema(CommandGetSchema commandGetSchema) { public void handleGetSchema(CommandGetSchema commandGetSchema) {
getSchemaRequests.inc(); getSchemaRequests.inc();
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("[{}] Received GetSchema", clientAddress); log.debug("[{}] Received GetSchema", clientAddress, commandGetSchema);
} }
final long clientRequestId = commandGetSchema.getRequestId(); final long clientRequestId = commandGetSchema.getRequestId();
String serviceUrl = getServiceUrl(clientRequestId); String serviceUrl = getServiceUrl(clientRequestId);
String topic = commandGetSchema.getTopic();
if(!StringUtils.isNotBlank(serviceUrl)) { if(!StringUtils.isNotBlank(serviceUrl)) {
return; return;
...@@ -363,24 +365,24 @@ public class LookupProxyHandler { ...@@ -363,24 +365,24 @@ public class LookupProxyHandler {
} }
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Getting connections to '{}' for getting schema of topic '{}' with clientReq Id '{}'", log.debug("Getting connections to '{}' for getting schema of topic '{}' with clientReq Id '{}'",
addr, commandGetSchema.getTopic(), clientRequestId); addr, topic, clientRequestId);
} }
proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> { proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
// Connected to backend broker // Connected to backend broker
long requestId = proxyConnection.newRequestId(); long requestId = proxyConnection.newRequestId();
ByteBuf command; ByteBuf command;
byte[] schemaVersion = commandGetSchema.getSchemaVersion().toByteArray(); byte[] schemaVersion = null;
command = Commands.newGetSchema(requestId, commandGetSchema.getTopic(), if (commandGetSchema.hasSchemaVersion()) {
Optional.ofNullable(BytesSchemaVersion.of(schemaVersion))); schemaVersion = commandGetSchema.getSchemaVersion().toByteArray();
clientCnx.sendGetSchema(command, requestId).thenAccept(optionalSchemaInfo -> { }
SchemaInfo schemaInfo = optionalSchemaInfo.get(); command = Commands.newGetSchema(requestId, topic,
proxyConnection.ctx().writeAndFlush( Optional.ofNullable(schemaVersion).map(BytesSchemaVersion::of));
Commands.newGetSchemaResponse(clientRequestId, clientCnx.sendGetRawSchema(command, requestId).thenAccept(response -> {
schemaInfo, proxyConnection.ctx().writeAndFlush(
BytesSchemaVersion.of(schemaVersion))); Commands.newGetSchemaResponse(clientRequestId, response));
}).exceptionally(ex -> { }).exceptionally(ex -> {
log.warn("[{}] Failed to get schema {}: {}", clientAddress, commandGetSchema.getTopic(), ex.getMessage()); log.warn("[{}] Failed to get schema {}: {}", clientAddress, topic, ex);
proxyConnection.ctx().writeAndFlush( proxyConnection.ctx().writeAndFlush(
Commands.newError(clientRequestId, ServerError.ServiceNotReady, ex.getMessage())); Commands.newError(clientRequestId, ServerError.ServiceNotReady, ex.getMessage()));
return null; return null;
......
...@@ -227,7 +227,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest { ...@@ -227,7 +227,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
} }
@Test @Test
private void testGetSchema() throws Exception { public void testGetSchema() throws Exception {
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort().get()) PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort().get())
.build(); .build();
Producer<Foo> producer; Producer<Foo> producer;
...@@ -250,7 +250,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest { ...@@ -250,7 +250,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
} }
@Test @Test
private void testProtocolVersionAdvertisement() throws Exception { public void testProtocolVersionAdvertisement() throws Exception {
final String url = "pulsar://localhost:" + proxyConfig.getServicePort().get(); final String url = "pulsar://localhost:" + proxyConfig.getServicePort().get();
final String topic = "persistent://sample/test/local/protocol-version-advertisement"; final String topic = "persistent://sample/test/local/protocol-version-advertisement";
final String sub = "my-sub"; final String sub = "my-sub";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册