diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 8943ed5752d2770f13659f0db04db60deb107edf..32f73c993a11c457316e1899e8f0adeed90b619d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -83,6 +83,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; import org.apache.pulsar.common.api.proto.PulsarApi.ServerError; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,7 +104,7 @@ public class ClientCnx extends PulsarHandler { private final ConcurrentLongHashMap>> pendingGetTopicsRequests = new ConcurrentLongHashMap<>(16, 1); - private final ConcurrentLongHashMap>> pendingGetSchemaRequests = new ConcurrentLongHashMap<>( + private final ConcurrentLongHashMap> pendingGetSchemaRequests = new ConcurrentLongHashMap<>( 16, 1); private final ConcurrentLongHashMap> producers = new ConcurrentLongHashMap<>(16, 1); @@ -704,23 +705,12 @@ public class ClientCnx extends PulsarHandler { long requestId = commandGetSchemaResponse.getRequestId(); - CompletableFuture> future = pendingGetSchemaRequests.remove(requestId); + CompletableFuture future = pendingGetSchemaRequests.remove(requestId); if (future == null) { log.warn("{} Received unknown request id from server: {}", ctx.channel(), requestId); return; } - - if (commandGetSchemaResponse.hasErrorCode()) { - // Request has failed - ServerError rc = commandGetSchemaResponse.getErrorCode(); - if (rc == ServerError.TopicNotFound) { - future.complete(Optional.empty()); - } else { - future.completeExceptionally(getPulsarClientException(rc, commandGetSchemaResponse.getErrorMessage())); - } - } else { - future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(commandGetSchemaResponse.getSchema()))); - } + future.complete(commandGetSchemaResponse); } Promise newPromise() { @@ -774,7 +764,25 @@ public class ClientCnx extends PulsarHandler { } public CompletableFuture> sendGetSchema(ByteBuf request, long requestId) { - CompletableFuture> future = new CompletableFuture<>(); + return sendGetRawSchema(request, requestId).thenCompose(commandGetSchemaResponse -> { + if (commandGetSchemaResponse.hasErrorCode()) { + // Request has failed + ServerError rc = commandGetSchemaResponse.getErrorCode(); + if (rc == ServerError.TopicNotFound) { + return CompletableFuture.completedFuture(Optional.empty()); + } else { + return FutureUtil.failedFuture( + getPulsarClientException(rc, commandGetSchemaResponse.getErrorMessage())); + } + } else { + return CompletableFuture.completedFuture( + Optional.of(SchemaInfoUtil.newSchemaInfo(commandGetSchemaResponse.getSchema()))); + } + }); + } + + public CompletableFuture sendGetRawSchema(ByteBuf request, long requestId) { + CompletableFuture future = new CompletableFuture<>(); pendingGetSchemaRequests.put(requestId, future); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index c04917699a3b964760a9a6c56aa2e0cbff61406d..cfe50485e44db14d85ff706f008854e94769305d 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -938,13 +938,26 @@ public class Commands { schema.setSchemaVersion(ByteString.copyFrom(version.get().bytes())); } + CommandGetSchema getSchema = schema.build(); + ByteBuf res = serializeWithSize(BaseCommand.newBuilder() .setType(Type.GET_SCHEMA) - .setGetSchema(schema.build())); + .setGetSchema(getSchema)); schema.recycle(); return res; } + public static ByteBuf newGetSchemaResponse(long requestId, CommandGetSchemaResponse response) { + CommandGetSchemaResponse.Builder schemaResponseBuilder = CommandGetSchemaResponse.newBuilder(response) + .setRequestId(requestId); + + ByteBuf res = serializeWithSize(BaseCommand.newBuilder() + .setType(Type.GET_SCHEMA_RESPONSE) + .setGetSchemaResponse(schemaResponseBuilder.build())); + schemaResponseBuilder.recycle(); + return res; + } + public static ByteBuf newGetSchemaResponse(long requestId, SchemaInfo schema, SchemaVersion version) { PulsarApi.CommandGetSchemaResponse.Builder schemaResponse = PulsarApi.CommandGetSchemaResponse.newBuilder() .setRequestId(requestId) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java index 469e16890f5de4e8b1e11d9a23cc17eb556a7833..9e3893d84e7315bb7c6501db5f73946a0f84af01 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java @@ -27,6 +27,7 @@ import java.net.URISyntaxException; import java.util.Optional; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchemaResponse; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema; @@ -347,11 +348,12 @@ public class LookupProxyHandler { public void handleGetSchema(CommandGetSchema commandGetSchema) { getSchemaRequests.inc(); if (log.isDebugEnabled()) { - log.debug("[{}] Received GetSchema", clientAddress); + log.debug("[{}] Received GetSchema", clientAddress, commandGetSchema); } final long clientRequestId = commandGetSchema.getRequestId(); String serviceUrl = getServiceUrl(clientRequestId); + String topic = commandGetSchema.getTopic(); if(!StringUtils.isNotBlank(serviceUrl)) { return; @@ -363,24 +365,24 @@ public class LookupProxyHandler { } if (log.isDebugEnabled()) { log.debug("Getting connections to '{}' for getting schema of topic '{}' with clientReq Id '{}'", - addr, commandGetSchema.getTopic(), clientRequestId); + addr, topic, clientRequestId); } proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> { // Connected to backend broker long requestId = proxyConnection.newRequestId(); ByteBuf command; - byte[] schemaVersion = commandGetSchema.getSchemaVersion().toByteArray(); - command = Commands.newGetSchema(requestId, commandGetSchema.getTopic(), - Optional.ofNullable(BytesSchemaVersion.of(schemaVersion))); - clientCnx.sendGetSchema(command, requestId).thenAccept(optionalSchemaInfo -> { - SchemaInfo schemaInfo = optionalSchemaInfo.get(); - proxyConnection.ctx().writeAndFlush( - Commands.newGetSchemaResponse(clientRequestId, - schemaInfo, - BytesSchemaVersion.of(schemaVersion))); + byte[] schemaVersion = null; + if (commandGetSchema.hasSchemaVersion()) { + schemaVersion = commandGetSchema.getSchemaVersion().toByteArray(); + } + command = Commands.newGetSchema(requestId, topic, + Optional.ofNullable(schemaVersion).map(BytesSchemaVersion::of)); + clientCnx.sendGetRawSchema(command, requestId).thenAccept(response -> { + proxyConnection.ctx().writeAndFlush( + Commands.newGetSchemaResponse(clientRequestId, response)); }).exceptionally(ex -> { - log.warn("[{}] Failed to get schema {}: {}", clientAddress, commandGetSchema.getTopic(), ex.getMessage()); + log.warn("[{}] Failed to get schema {}: {}", clientAddress, topic, ex); proxyConnection.ctx().writeAndFlush( Commands.newError(clientRequestId, ServerError.ServiceNotReady, ex.getMessage())); return null; diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java index 42a990e5824ff5b258dab4b7a396e3522bf2d263..07490412bf632d99884ad6e02dab59c0e297b423 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java @@ -227,7 +227,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest { } @Test - private void testGetSchema() throws Exception { + public void testGetSchema() throws Exception { PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort().get()) .build(); Producer producer; @@ -250,7 +250,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest { } @Test - private void testProtocolVersionAdvertisement() throws Exception { + public void testProtocolVersionAdvertisement() throws Exception { final String url = "pulsar://localhost:" + proxyConfig.getServicePort().get(); final String topic = "persistent://sample/test/local/protocol-version-advertisement"; final String sub = "my-sub";