提交 004e022f 编写于 作者: R Rajan 提交者: Matteo Merli

Support Topic lookup using Pulsar binary protocol (#5)

上级 afaa63fd
......@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.cache.LocalZooKeeperCacheService;
import com.yahoo.pulsar.broker.web.PulsarWebResource;
import com.yahoo.pulsar.broker.web.RestException;
......
......@@ -93,6 +93,8 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.client.api.PulsarClientException;
/**
*/
......@@ -356,21 +358,7 @@ public class PersistentTopics extends AdminResource {
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, property, cluster, namespace, domain(),
dn.getEncodedLocalName());
PartitionedTopicMetadata partitionMetadata;
try {
// gets the number of partitions from the zk cache
partitionMetadata = globalZkCache().getData(path, new Deserializer<PartitionedTopicMetadata>() {
@Override
public PartitionedTopicMetadata deserialize(String key, byte[] content) throws Exception {
return jsonMapper().readValue(content, PartitionedTopicMetadata.class);
}
}).orElse(
// if the partitioned topic is not found in zk, then the topic is not partitioned
new PartitionedTopicMetadata());
} catch (Exception e) {
throw new RestException(e);
}
PartitionedTopicMetadata partitionMetadata = fetchPartitionedTopicMetadata(pulsar(), path);
if (log.isDebugEnabled()) {
log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId(), dn,
......@@ -993,7 +981,79 @@ public class PersistentTopics extends AdminResource {
return offlineTopicStats;
}
/**
public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(PulsarService pulsar,
String clientAppId, DestinationName dn) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
try {
// (1) authorize client
try {
checkAuthorization(pulsar, dn, clientAppId);
} catch (RestException e) {
try {
validateAdminAccessOnProperty(pulsar, clientAppId, dn.getProperty());
} catch (RestException authException) {
log.warn("Failed to authorize {} on cluster {}", clientAppId, dn.toString());
throw new PulsarClientException(String.format("Authorization failed %s on cluster %s with error %s",
clientAppId, dn.toString(), authException.getMessage()));
}
}
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, dn.getProperty(), dn.getCluster(),
dn.getNamespacePortion(), "persistent", dn.getEncodedLocalName());
fetchPartitionedTopicMetadataAsync(pulsar, path).thenAccept(metadata -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, dn,
metadata.partitions);
}
metadataFuture.complete(metadata);
}).exceptionally(ex -> {
metadataFuture.completeExceptionally(ex);
return null;
});
} catch (Exception ex) {
metadataFuture.completeExceptionally(ex);
}
return metadataFuture;
}
private static PartitionedTopicMetadata fetchPartitionedTopicMetadata(PulsarService pulsar, String path) {
try {
return fetchPartitionedTopicMetadataAsync(pulsar, path).get();
} catch (Exception e) {
if (e.getCause() instanceof RestException) {
throw (RestException) e;
}
throw new RestException(e);
}
}
private static CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataAsync(PulsarService pulsar,
String path) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
try {
// gets the number of partitions from the zk cache
pulsar.getGlobalZkCache().getDataAsync(path, new Deserializer<PartitionedTopicMetadata>() {
@Override
public PartitionedTopicMetadata deserialize(String key, byte[] content) throws Exception {
return jsonMapper().readValue(content, PartitionedTopicMetadata.class);
}
}).thenAccept(metadata -> {
// if the partitioned topic is not found in zk, then the topic is not partitioned
if (metadata.isPresent()) {
metadataFuture.complete(metadata.get());
} else {
metadataFuture.complete(new PartitionedTopicMetadata());
}
}).exceptionally(ex -> {
metadataFuture.complete(new PartitionedTopicMetadata());
return null;
});
} catch (Exception e) {
metadataFuture.completeExceptionally(e);
}
return metadataFuture;
}
/**
* Get the Topic object reference from the Pulsar broker
*/
private PersistentTopic getTopicReference(DestinationName dn) {
......
......@@ -30,6 +30,7 @@ import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -37,6 +38,16 @@ import org.slf4j.LoggerFactory;
import com.yahoo.pulsar.broker.web.NoSwaggerDocumentation;
import com.yahoo.pulsar.broker.web.PulsarWebResource;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.web.RestException;
import static com.yahoo.pulsar.common.api.Commands.newLookupResponse;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.ServerError;
import com.yahoo.pulsar.common.lookup.data.LookupData;
import com.yahoo.pulsar.common.policies.data.ClusterData;
import static com.google.common.base.Preconditions.checkNotNull;
import io.netty.buffer.ByteBuf;
@Path("/v2/destination/")
@NoSwaggerDocumentation
......@@ -55,7 +66,7 @@ public class DestinationLookup extends PulsarWebResource {
try {
validateClusterOwnership(topic.getCluster());
checkConnect(topic);
validateReplicationSettingsOnNamespace(topic.getNamespaceObject());
validateReplicationSettingsOnNamespace(pulsar(), topic.getNamespaceObject());
} catch (Throwable t) {
// Validation checks failed
log.error("Validation check failed: {}", t.getMessage());
......@@ -74,23 +85,24 @@ public class DestinationLookup extends PulsarWebResource {
}
// We have found either a broker that owns the topic, or a broker to which we should redirect the client to
if (result.isHttpRedirect()) {
if (result.isRedirect()) {
boolean newAuthoritative = this.isLeaderBroker();
URI redirect;
try {
redirect = new URI(String.format("%s%s%s?authoritative=%s", result.getHttpRedirectAddress(),
"/lookup/v2/destination/", topic.getLookupName(), newAuthoritative));
String redirectUrl = isRequestHttps() ? result.getLookupData().getHttpUrlTls()
: result.getLookupData().getHttpUrl();
redirect = new URI(String.format("%s%s%s?authoritative=%s", redirectUrl, "/lookup/v2/destination/",
topic.getLookupName(), newAuthoritative));
} catch (URISyntaxException e) {
log.error("Error in preparing redirect url for {}: {}", topic, e.getMessage(), e);
asyncResponse.resume(e);
return;
}
if (log.isDebugEnabled()) {
log.debug("Redirect lookup for topic {} to {}", topic, redirect);
}
asyncResponse.resume(new WebApplicationException(Response.temporaryRedirect(redirect).build()));
} else {
// Found broker owning the topic
if (log.isDebugEnabled()) {
......@@ -103,7 +115,113 @@ public class DestinationLookup extends PulsarWebResource {
asyncResponse.resume(exception);
return null;
});
}
/**
*
* Lookup broker-service address for a given namespace-bundle which contains given topic.
*
* a. Returns broker-address if namespace-bundle is already owned by any broker
* b. If current-broker receives lookup-request and if it's not a leader
* then current broker redirects request to leader by returning leader-service address.
* c. If current-broker is leader then it finds out least-loaded broker to own namespace bundle and
* redirects request by returning least-loaded broker.
* d. If current-broker receives request to own the namespace-bundle then it owns a bundle and returns
* success(connect) response to client.
*
* @param pulsarService
* @param fqdn
* @param authoritative
* @param clientAppId
* @param requestId
* @return
*/
public static CompletableFuture<ByteBuf> lookupDestinationAsync(PulsarService pulsarService, DestinationName fqdn, boolean authoritative,
String clientAppId, long requestId) {
final CompletableFuture<ByteBuf> validationFuture = new CompletableFuture<>();
final CompletableFuture<ByteBuf> lookupfuture = new CompletableFuture<>();
final String cluster = fqdn.getCluster();
// (1) validate cluster
getClusterDataIfDifferentCluster(pulsarService, cluster, clientAppId).thenAccept(differentClusterData -> {
if (differentClusterData != null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Redirecting the lookup call to {}/{} cluster={}", clientAppId,
differentClusterData.getBrokerServiceUrl(), differentClusterData.getBrokerServiceUrlTls(), cluster);
}
validationFuture.complete(newLookupResponse(differentClusterData.getBrokerServiceUrl(),
differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, requestId));
} else {
// (2) authorize client
try {
checkAuthorization(pulsarService, fqdn, clientAppId);
} catch (Exception e) {
log.warn("Failed to authorized {} on cluster {}", clientAppId, fqdn.toString());
validationFuture
.complete(newLookupResponse(ServerError.AuthorizationError, e.getMessage(), requestId));
return;
}
// (3) validate global namespace
validateReplicationSettingsOnNamespaceAsync(pulsarService, fqdn.getNamespaceObject())
.thenAccept(success -> {
// (4) all validation passed: initiate lookup
validationFuture.complete(null);
}).exceptionally(ex -> {
validationFuture
.complete(newLookupResponse(ServerError.MetadataError, ex.getMessage(), requestId));
return null;
});
}
}).exceptionally(ex -> {
validationFuture.completeExceptionally(ex);
return null;
});
// Initiate lookup once validation completes
validationFuture.thenAccept(validaitonFailureResponse -> {
if (validaitonFailureResponse != null) {
lookupfuture.complete(validaitonFailureResponse);
} else {
pulsarService.getNamespaceService().getBrokerServiceUrlAsync(fqdn, authoritative)
.thenAccept(lookupResult -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Lookup result {}", fqdn.toString(), lookupResult);
}
LookupData lookupData = lookupResult.getLookupData();
if (lookupResult.isRedirect()) {
boolean newAuthoritative = isLeaderBroker(pulsarService);
lookupfuture.complete(
newLookupResponse(lookupData.getBrokerUrl(), lookupData.getBrokerUrlTls(),
newAuthoritative, LookupType.Redirect, requestId));
} else {
lookupfuture.complete(
newLookupResponse(lookupData.getBrokerUrl(), lookupData.getBrokerUrlTls(),
true /* authoritative */, LookupType.Connect, requestId));
}
}).exceptionally(e -> {
log.warn("Failed to lookup {} for topic {} with error {}", clientAppId, fqdn.toString(),
e.getMessage(), e);
lookupfuture.complete(
newLookupResponse(ServerError.ServiceNotReady, e.getMessage(), requestId));
return null;
});
}
}).exceptionally(ex -> {
log.warn("Failed to lookup {} for topic {} with error {}", clientAppId, fqdn.toString(), ex.getMessage(),
ex);
lookupfuture.complete(newLookupResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
return null;
});
return lookupfuture;
}
private static final Logger log = LoggerFactory.getLogger(DestinationLookup.class);
}
......@@ -30,41 +30,41 @@ import com.yahoo.pulsar.broker.namespace.NamespaceEphemeralData;
*/
public class LookupResult {
enum Type {
BrokerUrl, HttpRedirectUrl
BrokerUrl, RedirectUrl
}
private final Type type;
private final LookupData lookupData;
private final URI httpRedirectAddress;
public LookupResult(NamespaceEphemeralData namespaceEphemeralData) {
this.type = Type.BrokerUrl;
this.lookupData = new LookupData(namespaceEphemeralData.getNativeUrl(),
namespaceEphemeralData.getNativeUrlTls(), namespaceEphemeralData.getHttpUrl());
this.httpRedirectAddress = null;
namespaceEphemeralData.getNativeUrlTls(), namespaceEphemeralData.getHttpUrl(),
namespaceEphemeralData.getHttpUrlTls());
}
public LookupResult(URI httpRedirectAddress) {
this.type = Type.HttpRedirectUrl;
this.lookupData = null;
this.httpRedirectAddress = httpRedirectAddress;
public LookupResult(String httpUrl, String httpUrlTls, String brokerServiceUrl, String brokerServiceUrlTls) {
this.type = Type.RedirectUrl; // type = reidrect => as current broker is
// not owner and prepares LookupResult
// with other broker's urls
this.lookupData = new LookupData(brokerServiceUrl, brokerServiceUrlTls, httpUrl, httpUrlTls);
}
public boolean isBrokerUrl() {
return type == Type.BrokerUrl;
}
public boolean isHttpRedirect() {
return type == Type.HttpRedirectUrl;
public boolean isRedirect() {
return type == Type.RedirectUrl;
}
public LookupData getLookupData() {
checkArgument(isBrokerUrl());
return lookupData;
}
public URI getHttpRedirectAddress() {
checkArgument(isHttpRedirect());
return httpRedirectAddress;
@Override
public String toString() {
return "LookupResult [type=" + type + ", lookupData=" + lookupData + "]";
}
}
......@@ -22,11 +22,8 @@ import static com.yahoo.pulsar.broker.web.PulsarWebResource.joinPath;
import static com.yahoo.pulsar.common.naming.NamespaceBundleFactory.getBundlesData;
import static java.lang.String.format;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -37,6 +34,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException;
......@@ -51,6 +49,7 @@ import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.admin.AdminResource;
import com.yahoo.pulsar.broker.loadbalance.LoadManager;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import com.yahoo.pulsar.broker.lookup.LookupResult;
import com.yahoo.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
......@@ -70,6 +69,8 @@ import com.yahoo.pulsar.common.policies.data.NamespaceOwnershipStatus;
import com.yahoo.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import com.yahoo.pulsar.common.util.Codec;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
import static com.yahoo.pulsar.broker.admin.AdminResource.jsonMapper;
/**
* The <code>NamespaceService</code> provides resource ownership lookup as well as resource ownership claiming services
......@@ -147,56 +148,51 @@ public class NamespaceService {
return bundleFactory.getFullBundle(fqnn);
}
public URL getWebServiceUrl(ServiceUnitId suName, boolean authoritative, boolean readOnly) throws Exception {
private static final Deserializer<LoadReport> loadReportDeserializer = new Deserializer<LoadReport>() {
@Override
public LoadReport deserialize(String key, byte[] content) throws Exception {
return jsonMapper().readValue(content, LoadReport.class);
}
};
public URL getWebServiceUrl(ServiceUnitId suName, boolean authoritative, boolean isRequestHttps, boolean readOnly)
throws Exception {
if (suName instanceof DestinationName) {
DestinationName name = (DestinationName) suName;
if (LOG.isDebugEnabled()) {
LOG.debug("Getting web service URL of destination: {} - auth: {}", name, authoritative);
}
return this.internalGetWebServiceUrl(getBundle(name), authoritative, readOnly).get();
return this.internalGetWebServiceUrl(getBundle(name), authoritative, isRequestHttps, readOnly).get();
}
if (suName instanceof NamespaceName) {
return this.internalGetWebServiceUrl(getFullBundle((NamespaceName) suName), authoritative, readOnly).get();
return this.internalGetWebServiceUrl(getFullBundle((NamespaceName) suName), authoritative, isRequestHttps, readOnly).get();
}
if (suName instanceof NamespaceBundle) {
return this.internalGetWebServiceUrl((NamespaceBundle) suName, authoritative, readOnly).get();
return this.internalGetWebServiceUrl((NamespaceBundle) suName, authoritative, isRequestHttps, readOnly).get();
}
throw new IllegalArgumentException("Unrecognized class of NamespaceBundle: " + suName.getClass().getName());
}
private CompletableFuture<URL> internalGetWebServiceUrl(NamespaceBundle bundle, boolean authoritative,
boolean readOnly) {
boolean isRequestHttps, boolean readOnly) {
return findBrokerServiceUrl(bundle, authoritative, readOnly).thenApply(lookupResult -> {
if (lookupResult == null) {
return null;
}
try {
if (lookupResult.isBrokerUrl()) {
// Somebody already owns the service unit
if (lookupResult != null) {
try {
LookupData lookupData = lookupResult.getLookupData();
if (lookupData.getHttpUrl() != null) {
// If the broker uses the new format, we know the correct address
return new URL(lookupData.getHttpUrl());
} else {
// Fallback to use same port as current broker
URI brokerAddress = new URI(lookupData.getBrokerUrl());
String host = brokerAddress.getHost();
int port = config.getWebServicePort();
return new URL(String.format("http://%s:%s", host, port));
}
} else {
// We have the HTTP address to redirect to
return lookupResult.getHttpRedirectAddress().toURL();
final String redirectUrl = isRequestHttps ? lookupData.getHttpUrlTls() : lookupData.getHttpUrl();
return new URL(redirectUrl);
} catch (Exception e) {
// just log the exception, nothing else to do
LOG.warn("internalGetWebServiceUrl [{}]", e.getMessage(), e);
}
} catch (MalformedURLException | URISyntaxException e) {
throw new RuntimeException(e);
}
return null;
});
}
......@@ -391,7 +387,12 @@ public class NamespaceService {
}
// Now setting the redirect url
lookupFuture.complete(new LookupResult(new URI(candidateBroker)));
createLookupResult(candidateBroker).thenAccept(lookupResult -> lookupFuture.complete(lookupResult))
.exceptionally(ex -> {
lookupFuture.completeExceptionally(ex);
return null;
});
}
} catch (Exception e) {
LOG.warn("Error in trying to acquire namespace bundle ownership for {}: {}", bundle, e.getMessage(), e);
......@@ -399,6 +400,32 @@ public class NamespaceService {
}
}
private CompletableFuture<LookupResult> createLookupResult(String candidateBroker) throws Exception {
CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
try {
checkArgument(StringUtils.isNotBlank(candidateBroker), "Lookup broker can't be null " + candidateBroker);
URI uri = new URI(candidateBroker);
String path = String.format("%s/%s:%s", SimpleLoadManagerImpl.LOADBALANCE_BROKERS_ROOT, uri.getHost(),
uri.getPort());
pulsar.getLocalZkCache().getDataAsync(path, loadReportDeserializer).thenAccept(reportData -> {
if (reportData.isPresent()) {
LoadReport report = reportData.get();
lookupFuture.complete(new LookupResult(report.getWebServiceUrl(), report.getWebServiceUrlTls(),
report.getPulsarServiceUrl(), report.getPulsarServieUrlTls()));
} else {
lookupFuture.completeExceptionally(new KeeperException.NoNodeException(path));
}
}).exceptionally(ex -> {
lookupFuture.completeExceptionally(ex);
return null;
});
} catch (Exception e) {
lookupFuture.completeExceptionally(e);
}
return lookupFuture;
}
private boolean isBrokerActive(String candidateBroker) throws KeeperException, InterruptedException {
Set<String> activeNativeBrokers = pulsar.getLocalZkCache()
.getChildren(SimpleLoadManagerImpl.LOADBALANCE_BROKERS_ROOT);
......
......@@ -448,7 +448,6 @@ public class Consumer {
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
public void redeliverUnacknowledgedMessages() {
// cleanup unackedMessage bucket and redeliver those unack-msgs again
unackedMessages.set(0);
blockedConsumerOnUnackedMsgs = false;
......
......@@ -16,6 +16,10 @@
package com.yahoo.pulsar.broker.service;
import static com.google.common.base.Preconditions.checkArgument;
import static com.yahoo.pulsar.broker.admin.PersistentTopics.getPartitionedTopicMetadata;
import static com.yahoo.pulsar.broker.lookup.DestinationLookup.lookupDestinationAsync;
import static com.yahoo.pulsar.common.api.Commands.newLookupResponse;
import static com.yahoo.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5;
import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
......@@ -29,6 +33,7 @@ import org.slf4j.LoggerFactory;
import com.yahoo.pulsar.broker.authentication.AuthenticationDataCommand;
import com.yahoo.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import com.yahoo.pulsar.client.api.PulsarClientException;
import com.yahoo.pulsar.common.api.Commands;
import com.yahoo.pulsar.common.api.PulsarHandler;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck;
......@@ -36,6 +41,8 @@ import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandConnect;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandFlow;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandProducer;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandRedeliverUnacknowledgedMessages;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSend;
......@@ -43,7 +50,6 @@ import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe;
import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import static com.yahoo.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5;
import com.yahoo.pulsar.common.api.proto.PulsarApi.ServerError;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.policies.data.BacklogQuota;
......@@ -138,7 +144,51 @@ public class ServerCnx extends PulsarHandler {
// ////
// // Incoming commands handling
// ////
@Override
protected void handleLookup(CommandLookupTopic lookup) {
if (log.isDebugEnabled()) {
log.debug("Received Lookup from {}", remoteAddress);
}
lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(lookup.getTopic()),
lookup.getAuthoritative(), getRole(), lookup.getRequestId()).thenAccept(lookupResponse -> {
ctx.writeAndFlush(lookupResponse);
}).exceptionally(ex -> {
// it should never happen
log.warn("[{}] lookup failed with error {}", remoteAddress, ex.getMessage(), ex);
ctx.writeAndFlush(
newLookupResponse(ServerError.ServiceNotReady, ex.getMessage(), lookup.getRequestId()));
return null;
});
}
@Override
protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) {
if (log.isDebugEnabled()) {
log.debug("Received PartitionMetadataLookup from {}", remoteAddress);
}
getPartitionedTopicMetadata(getBrokerService().pulsar(), getRole(),
DestinationName.get(partitionMetadata.getTopic())).thenAccept(metadata -> {
int partitions = metadata.partitions;
ctx.writeAndFlush(
Commands.newPartitionMetadataResponse(partitions, partitionMetadata.getRequestId()));
}).exceptionally(ex -> {
if (ex instanceof PulsarClientException) {
log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(), remoteAddress,
partitionMetadata.getTopic(), ex.getMessage());
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError,
ex.getMessage(), partitionMetadata.getRequestId()));
} else {
log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress, ex.getMessage());
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
ex.getMessage(), partitionMetadata.getRequestId()));
}
return null;
});
}
@Override
protected void handleConnect(CommandConnect connect) {
checkArgument(state == State.Start);
......
......@@ -96,7 +96,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
}
@BeforeClass
public void init() throws Exception {
public void initNamespace() throws Exception {
testLocalNamespaces = Lists.newArrayList();
testGlobalNamespaces = Lists.newArrayList();
......@@ -123,6 +123,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
doReturn(mockZookKeeper).when(namespaces).localZk();
doReturn(pulsar.getConfigurationCache().propertiesCache()).when(namespaces).propertiesCache();
doReturn(pulsar.getConfigurationCache().policiesCache()).when(namespaces).policiesCache();
doReturn(false).when(namespaces).isRequestHttps();
doReturn("test").when(namespaces).clientAppId();
doReturn(Sets.newTreeSet(Lists.newArrayList("use", "usw", "usc", "global"))).when(namespaces).clusters();
doNothing().when(namespaces).validateAdminAccessOnProperty("my-property");
......@@ -557,7 +558,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
}
}), Mockito.anyBoolean(), Mockito.anyBoolean());
}), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean());
admin.namespaces().setNamespaceReplicationClusters(testGlobalNamespaces.get(0).toString(),
Lists.newArrayList("usw"));
......@@ -596,7 +597,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
// setup ownership to localhost
URL localWebServiceUrl = new URL(pulsar.getWebServiceAddress());
doReturn(localWebServiceUrl).when(nsSvc).getWebServiceUrl(testNs, false, false);
doReturn(localWebServiceUrl).when(nsSvc).getWebServiceUrl(testNs, false, false, false);
doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
try {
namespaces.deleteNamespace(testNs.getProperty(), testNs.getCluster(), testNs.getLocalName(), false);
......@@ -608,13 +609,13 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
testNs = this.testGlobalNamespaces.get(0);
// setup ownership to localhost
doReturn(localWebServiceUrl).when(nsSvc).getWebServiceUrl(testNs, false, false);
doReturn(localWebServiceUrl).when(nsSvc).getWebServiceUrl(testNs, false, false, false);
doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
namespaces.deleteNamespace(testNs.getProperty(), testNs.getCluster(), testNs.getLocalName(), false);
testNs = this.testLocalNamespaces.get(0);
// setup ownership to localhost
doReturn(localWebServiceUrl).when(nsSvc).getWebServiceUrl(testNs, false, false);
doReturn(localWebServiceUrl).when(nsSvc).getWebServiceUrl(testNs, false, false, false);
doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
namespaces.deleteNamespace(testNs.getProperty(), testNs.getCluster(), testNs.getLocalName(), false);
List<String> nsList = Lists.newArrayList(this.testLocalNamespaces.get(1).toString(),
......@@ -635,7 +636,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
// ensure refreshed destination list in the cache
pulsar.getLocalZkCacheService().managedLedgerListCache().clearTree();
// setup ownership to localhost
doReturn(localWebServiceUrl).when(nsSvc).getWebServiceUrl(testNs, false, false);
doReturn(localWebServiceUrl).when(nsSvc).getWebServiceUrl(testNs, false, false, false);
doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
namespaces.deleteNamespace(testNs.getProperty(), testNs.getCluster(), testNs.getLocalName(), false);
}
......@@ -670,7 +671,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {
}
}), Mockito.anyBoolean(), Mockito.anyBoolean());
}), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean());
doReturn(false).when(nsSvc).isServiceUnitOwned(Mockito.argThat(new Matcher<NamespaceBundle>() {
@Override
......@@ -691,25 +692,26 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
}
}));
doReturn(Optional.of(new NamespaceEphemeralData())).when(nsSvc).getOwner(Mockito.argThat(new Matcher<NamespaceBundle>() {
doReturn(Optional.of(new NamespaceEphemeralData())).when(nsSvc)
.getOwner(Mockito.argThat(new Matcher<NamespaceBundle>() {
@Override
public void describeTo(Description description) {
}
@Override
public void describeTo(Description description) {
}
@Override
public boolean matches(Object item) {
if (item instanceof NamespaceBundle) {
NamespaceBundle bundle = (NamespaceBundle) item;
return bundle.getNamespaceObject().equals(testNs);
}
return false;
}
@Override
public boolean matches(Object item) {
if (item instanceof NamespaceBundle) {
NamespaceBundle bundle = (NamespaceBundle) item;
return bundle.getNamespaceObject().equals(testNs);
}
return false;
}
@Override
public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {
}
}));
@Override
public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {
}
}));
doThrow(new PulsarAdminException.PreconditionFailedException(
new ClientErrorException(Status.PRECONDITION_FAILED))).when(namespacesAdmin)
......@@ -732,7 +734,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
NamespaceBundles nsBundles = nsSvc.getNamespaceBundleFactory().getBundles(testNs, bundleData);
// make one bundle owned
doReturn(localWebServiceUrl).when(nsSvc).getWebServiceUrl(nsBundles.getBundles().get(0), false, false);
doReturn(localWebServiceUrl).when(nsSvc).getWebServiceUrl(nsBundles.getBundles().get(0), false, true, false);
doReturn(true).when(nsSvc).isServiceUnitOwned(nsBundles.getBundles().get(0));
doNothing().when(namespacesAdmin).deleteNamespaceBundle(
testProperty + "/" + testLocalCluster + "/" + bundledNsLocal, "0x00000000_0x80000000");
......@@ -754,7 +756,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
// ensure all three bundles are owned by the local broker
for (NamespaceBundle bundle : nsBundles.getBundles()) {
doReturn(localWebServiceUrl).when(nsSvc).getWebServiceUrl(bundle, false, false);
doReturn(localWebServiceUrl).when(nsSvc).getWebServiceUrl(bundle, false, true, false);
doReturn(true).when(nsSvc).isServiceUnitOwned(bundle);
}
doNothing().when(namespacesAdmin).deleteNamespaceBundle(Mockito.anyString(), Mockito.anyString());
......@@ -787,7 +789,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
}
}), Mockito.anyBoolean(), Mockito.anyBoolean());
}), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean());
doReturn(true).when(nsSvc).isServiceUnitOwned(Mockito.argThat(new Matcher<NamespaceBundle>() {
@Override
......@@ -889,7 +891,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
}
}), Mockito.anyBoolean(), Mockito.anyBoolean());
}), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean());
doReturn(true).when(nsSvc).isServiceUnitOwned(Mockito.argThat(new Matcher<NamespaceBundle>() {
@Override
......@@ -918,7 +920,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
NamespaceBundles nsBundles = nsSvc.getNamespaceBundleFactory().getBundles(testNs, bundleData);
NamespaceBundle testBundle = nsBundles.getBundles().get(0);
// make one bundle owned
doReturn(localWebServiceUrl).when(nsSvc).getWebServiceUrl(testBundle, false, false);
doReturn(localWebServiceUrl).when(nsSvc).getWebServiceUrl(testBundle, false, true, false);
doReturn(true).when(nsSvc).isServiceUnitOwned(testBundle);
doNothing().when(nsSvc).unloadNamespaceBundle(testBundle);
namespaces.unloadNamespaceBundle(testProperty, testLocalCluster, bundledNsLocal, "0x00000000_0x80000000",
......@@ -1034,6 +1036,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
PersistentTopics topics = spy(new PersistentTopics());
topics.setServletContext(new MockServletContext());
topics.setPulsar(pulsar);
doReturn(false).when(topics).isRequestHttps();
doReturn("test").when(topics).clientAppId();
mockWebUrl(localWebServiceUrl, testNs);
try {
......@@ -1077,7 +1081,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
@Override
public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {
}
}), Mockito.anyBoolean(), Mockito.anyBoolean());
}), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean());
doReturn(true).when(nsSvc).isServiceUnitOwned(Mockito.argThat(new Matcher<NamespaceBundle>() {
@Override
public void describeTo(Description description) {
......
......@@ -19,6 +19,7 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
......@@ -67,6 +68,7 @@ public abstract class MockedPulsarServiceBaseTest {
protected MockZooKeeper mockZookKeeper;
protected NonClosableMockBookKeeper mockBookKeeper;
protected boolean isTcpLookup = false;
private SameThreadOrderedSafeExecutor sameThreadOrderedSafeExecutor;
......@@ -84,17 +86,25 @@ public abstract class MockedPulsarServiceBaseTest {
init();
com.yahoo.pulsar.client.api.ClientConfiguration clientConf = new com.yahoo.pulsar.client.api.ClientConfiguration();
clientConf.setStatsInterval(0, TimeUnit.SECONDS);
pulsarClient = PulsarClient.create(brokerUrl.toString(), clientConf);
String lookupUrl = brokerUrl.toString();
if (isTcpLookup) {
lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
}
pulsarClient = PulsarClient.create(lookupUrl, clientConf);
}
protected final void internalSetupForStatsTest() throws Exception {
init();
com.yahoo.pulsar.client.api.ClientConfiguration clientConf = new com.yahoo.pulsar.client.api.ClientConfiguration();
clientConf.setStatsInterval(1, TimeUnit.SECONDS);
pulsarClient = PulsarClient.create(brokerUrl.toString(), clientConf);
String lookupUrl = brokerUrl.toString();
if (isTcpLookup) {
lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
}
pulsarClient = PulsarClient.create(lookupUrl, clientConf);
}
private final void init() throws Exception {
protected final void init() throws Exception {
mockZookKeeper = createMockZooKeeper();
mockBookKeeper = new NonClosableMockBookKeeper(new ClientConfiguration(), mockZookKeeper);
......
......@@ -66,6 +66,8 @@ import com.yahoo.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
import com.yahoo.pulsar.client.admin.BrokerStats;
import com.yahoo.pulsar.client.admin.PulsarAdmin;
import com.yahoo.pulsar.client.api.Authentication;
import com.yahoo.pulsar.client.api.ClientConfiguration;
import com.yahoo.pulsar.client.api.PulsarClient;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.policies.data.AutoFailoverPolicyData;
import com.yahoo.pulsar.common.policies.data.AutoFailoverPolicyType;
......@@ -485,4 +487,5 @@ public class SimpleLoadManagerImplTest {
usage.reset();
assertNotEquals(usage.getBandwidthIn().usage, usageLimit);
}
}
......@@ -26,6 +26,7 @@ import java.net.URI;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
......@@ -33,6 +34,7 @@ import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriInfo;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
......@@ -98,6 +100,9 @@ public class HttpDestinationLookupv2Test {
doReturn(Optional.of(useData)).when(clustersCache).get(AdminResource.path("clusters", "use"));
doReturn(Optional.of(uscData)).when(clustersCache).get(AdminResource.path("clusters", "usc"));
doReturn(Optional.of(uswData)).when(clustersCache).get(AdminResource.path("clusters", "usw"));
doReturn(CompletableFuture.completedFuture(Optional.of(useData))).when(clustersCache).getAsync(AdminResource.path("clusters", "use"));
doReturn(CompletableFuture.completedFuture(Optional.of(uscData))).when(clustersCache).getAsync(AdminResource.path("clusters", "usc"));
doReturn(CompletableFuture.completedFuture(Optional.of(uswData))).when(clustersCache).getAsync(AdminResource.path("clusters", "usw"));
doReturn(clusters).when(clustersListCache).get();
doReturn(ns).when(pulsar).getNamespaceService();
BrokerService brokerService = mock(BrokerService.class);
......@@ -109,6 +114,7 @@ public class HttpDestinationLookupv2Test {
public void crossColoLookup() throws Exception {
DestinationLookup destLookup = spy(new DestinationLookup());
doReturn(false).when(destLookup).isRequestHttps();
destLookup.setPulsar(pulsar);
doReturn("null").when(destLookup).clientAppId();
Field uriField = PulsarWebResource.class.getDeclaredField("uri");
......@@ -145,6 +151,7 @@ public class HttpDestinationLookupv2Test {
.get(AdminResource.path("policies", property, cluster, ns2));
DestinationLookup destLookup = spy(new DestinationLookup());
doReturn(false).when(destLookup).isRequestHttps();
destLookup.setPulsar(pulsar);
doReturn("null").when(destLookup).clientAppId();
Field uriField = PulsarWebResource.class.getDeclaredField("uri");
......
......@@ -69,7 +69,7 @@ public class MockBrokerService {
private final String lookupURI = "/lookup/v2/destination/persistent";
private final String partitionMetadataURI = "/admin/persistent";
private final LookupData lookupData = new LookupData("pulsar://127.0.0.1:" + brokerServicePort,
"pulsar://127.0.0.1:" + brokerServicePortTls, "http://127.0.0.1:" + webServicePort);
"pulsar://127.0.0.1:" + brokerServicePortTls, "http://127.0.0.1:" + webServicePort, null);
private final PartitionedTopicMetadata singlePartitionedTopicMetadata = new PartitionedTopicMetadata(1);
private final PartitionedTopicMetadata multiPartitionedTopicMetadata = new PartitionedTopicMetadata(4);
private final PartitionedTopicMetadata nonPartitionedTopicMetadata = new PartitionedTopicMetadata();
......
......@@ -1238,7 +1238,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessages);
}
}
@Test
public void testUnackBlockRedeliverMessages() throws Exception {
log.info("-- Starting {} test --", methodName);
......@@ -1246,8 +1246,8 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
int unAckedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer();
int totalReceiveMsg = 0;
try {
final int unAckedMessagesBufferSize = 10;
final int receiverQueueSize = 20;
final int unAckedMessagesBufferSize = 20;
final int receiverQueueSize = 10;
final int totalProducedMsgs = 100;
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(unAckedMessagesBufferSize);
......
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.client.impl;
import static com.yahoo.pulsar.client.impl.PulsarClientImpl.requestIdGenerator;
import static java.lang.String.format;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.yahoo.pulsar.client.api.PulsarClientException;
import com.yahoo.pulsar.common.api.Commands;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.partition.PartitionedTopicMetadata;
import io.netty.buffer.ByteBuf;
class BinaryProtoLookupService implements LookupService {
private final ConnectionPool cnxPool;
protected final InetSocketAddress serviceAddress;
private final boolean useTls;
public BinaryProtoLookupService(ConnectionPool cnxPool, String serviceUrl, boolean useTls)
throws PulsarClientException {
this.cnxPool = cnxPool;
this.useTls = useTls;
URI uri;
try {
uri = new URI(serviceUrl);
this.serviceAddress = new InetSocketAddress(uri.getHost(), uri.getPort());
} catch (Exception e) {
log.error("Invalid service-url {} provided {}", serviceUrl, e.getMessage(), e);
throw new PulsarClientException.InvalidServiceURL(e);
}
}
/**
* Calls broker binaryProto-lookup api to find broker-service address which can serve a given topic.
*
* @param destination: topic-name
* @return broker-socket-address that serves given topic
*/
public CompletableFuture<InetSocketAddress> getBroker(DestinationName destination) {
return findBroker(serviceAddress, false, destination);
}
/**
* calls broker binaryProto-lookup api to get metadata of partitioned-topic.
*
*/
public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(DestinationName destination) {
return getPartitionedTopicMetadata(serviceAddress, destination);
}
private CompletableFuture<InetSocketAddress> findBroker(InetSocketAddress socketAddress, boolean authoritative,
DestinationName destination) {
CompletableFuture<InetSocketAddress> addressFuture = new CompletableFuture<InetSocketAddress>();
cnxPool.getConnection(socketAddress).thenAccept(clientCnx -> {
long requestId = requestIdGenerator.getAndIncrement();
ByteBuf request = Commands.newLookup(destination.toString(), authoritative, requestId);
clientCnx.newLookup(request, requestId).thenAccept(lookupDataResult -> {
URI uri = null;
try {
// (1) build response broker-address
if (useTls) {
uri = new URI(lookupDataResult.brokerUrlTls);
} else {
String serviceUrl = lookupDataResult.brokerUrl;
uri = new URI(serviceUrl);
}
InetSocketAddress responseBrokerAddress = new InetSocketAddress(uri.getHost(), uri.getPort());
// (2) redirect to given address if response is: redirect
if (lookupDataResult.redirect) {
findBroker(responseBrokerAddress, lookupDataResult.authoritative, destination)
.thenAccept(brokerAddress -> {
addressFuture.complete(brokerAddress);
}).exceptionally((lookupException) -> {
// lookup failed
log.warn("[{}] lookup failed : {}", destination.toString(),
lookupException.getMessage(), lookupException);
addressFuture.completeExceptionally(lookupException);
return null;
});
} else {
// (3) received correct broker to connect
addressFuture.complete(responseBrokerAddress);
}
} catch (Exception parseUrlException) {
// Failed to parse url
log.warn("[{}] invalid url {} : {}", destination.toString(), uri, parseUrlException.getMessage(),
parseUrlException);
addressFuture.completeExceptionally(parseUrlException);
}
}).exceptionally((sendException) -> {
// lookup failed
log.warn("[{}] failed to send lookup request : {}", destination.toString(), sendException.getMessage(),
sendException);
addressFuture.completeExceptionally(sendException);
return null;
});
}).exceptionally(connectionException -> {
addressFuture.completeExceptionally(connectionException);
return null;
});
return addressFuture;
}
private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(InetSocketAddress socketAddress,
DestinationName destination) {
CompletableFuture<PartitionedTopicMetadata> partitionFuture = new CompletableFuture<PartitionedTopicMetadata>();
cnxPool.getConnection(socketAddress).thenAccept(clientCnx -> {
long requestId = requestIdGenerator.getAndIncrement();
ByteBuf request = Commands.newPartitionMetadataRequest(destination.toString(), requestId);
clientCnx.newLookup(request, requestId).thenAccept(lookupDataResult -> {
try {
URI uri = null;
// (1) if redirect request for different broker lookup
if (lookupDataResult.redirect) {
if (useTls) {
uri = new URI(lookupDataResult.brokerUrlTls);
} else {
String serviceUrl = lookupDataResult.brokerUrl;
uri = new URI(serviceUrl);
}
InetSocketAddress responseBrokerAddress = new InetSocketAddress(uri.getHost(), uri.getPort());
//(1.a) retry getPartitionedMetadata with different redirected broker
getPartitionedTopicMetadata(responseBrokerAddress, destination).thenAccept(metadata -> {
partitionFuture.complete(metadata);
}).exceptionally((lookupException) -> {
// lookup failed
log.warn("[{}] lookup failed : {}", destination.toString(), lookupException.getMessage(),
lookupException);
partitionFuture.completeExceptionally(lookupException);
return null;
});
} else {
// (2) received result partitions
partitionFuture.complete(new PartitionedTopicMetadata(lookupDataResult.partitions));
}
} catch (Exception e) {
partitionFuture.completeExceptionally(new PulsarClientException.LookupException(
format("Failed to parse partition-response redirect=%s , partitions with %s",
lookupDataResult.redirect, lookupDataResult.partitions, e.getMessage())));
}
}).exceptionally((e) -> {
log.warn("[{}] failed to get Partitioned metadata : {}", destination.toString(), e.getMessage(), e);
partitionFuture.completeExceptionally(e);
return null;
});
}).exceptionally(connectionException -> {
partitionFuture.completeExceptionally(connectionException);
return null;
});
return partitionFuture;
}
public String getServiceUrl() {
return serviceAddress.toString();
}
static class LookupDataResult {
private String brokerUrl;
private String brokerUrlTls;
private int partitions;
private boolean authoritative;
private boolean redirect;
public LookupDataResult(String brokerUrl, String brokerUrlTls, boolean redirect, boolean authoritative) {
super();
this.brokerUrl = brokerUrl;
this.brokerUrlTls = brokerUrlTls;
this.authoritative = authoritative;
this.redirect = redirect;
}
public LookupDataResult(int partitions, String brokerUrl, String brokerUrlTls, boolean redirect) {
super();
this.brokerUrl = brokerUrl;
this.brokerUrlTls = brokerUrlTls;
this.partitions = partitions;
this.redirect = redirect;
}
}
private static final Logger log = LoggerFactory.getLogger(BinaryProtoLookupService.class);
}
......@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
import com.yahoo.pulsar.client.api.Authentication;
import com.yahoo.pulsar.client.api.PulsarClientException;
import com.yahoo.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult;
import com.yahoo.pulsar.common.api.Commands;
import com.yahoo.pulsar.common.api.PulsarHandler;
import com.yahoo.pulsar.common.api.proto.PulsarApi.ServerError;
......@@ -33,11 +34,15 @@ import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandConnected;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandError;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandMessage;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadataResponse;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandProducerSuccess;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSendError;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSendReceipt;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSuccess;
import com.yahoo.pulsar.common.api.proto.PulsarApi.ServerError;
import com.yahoo.pulsar.common.util.collections.ConcurrentLongHashMap;
import io.netty.buffer.ByteBuf;
......@@ -51,6 +56,7 @@ public class ClientCnx extends PulsarHandler {
private State state;
private final ConcurrentLongHashMap<CompletableFuture<String>> pendingRequests = new ConcurrentLongHashMap<>(16, 1);
private final ConcurrentLongHashMap<CompletableFuture<LookupDataResult>> pendingLookupRequests = new ConcurrentLongHashMap<>(16, 1);
private final ConcurrentLongHashMap<ProducerImpl> producers = new ConcurrentLongHashMap<>(16, 1);
private final ConcurrentLongHashMap<ConsumerImpl> consumers = new ConcurrentLongHashMap<>(16, 1);
......@@ -191,6 +197,62 @@ public class ClientCnx extends PulsarHandler {
log.warn("{} Received unknown request id from server: {}", ctx.channel(), success.getRequestId());
}
}
@Override
protected void handleLookupResponse(CommandLookupTopicResponse lookupResult) {
log.info("Received Broker lookup response: {}", lookupResult.getResponse());
long requestId = lookupResult.getRequestId();
CompletableFuture<LookupDataResult> requestFuture = pendingLookupRequests.remove(requestId);
if (requestFuture != null) {
// Complete future with exception if : Result.response=fail/null
if (lookupResult.getResponse() == null || LookupType.Failed.equals(lookupResult.getResponse())) {
if (lookupResult.hasError()) {
requestFuture.completeExceptionally(
getPulsarClientException(lookupResult.getError(), lookupResult.getMessage()));
} else {
requestFuture
.completeExceptionally(new PulsarClientException.LookupException("Empty lookup response"));
}
} else {
// return LookupDataResult when Result.response = connect/redirect
boolean redirect = LookupType.Redirect.equals(lookupResult.getResponse());
requestFuture.complete(new LookupDataResult(lookupResult.getBrokerServiceUrl(),
lookupResult.getBrokerServiceUrlTls(), redirect, lookupResult.getAuthoritative()));
}
} else {
log.warn("{} Received unknown request id from server: {}", ctx.channel(), lookupResult.getRequestId());
}
}
@Override
protected void handlePartitionResponse(CommandPartitionedTopicMetadataResponse lookupResult) {
log.info("Received Broker Partition response: {}", lookupResult.getPartitions());
long requestId = lookupResult.getRequestId();
CompletableFuture<LookupDataResult> requestFuture = pendingLookupRequests.remove(requestId);
if (requestFuture != null) {
// Complete future with exception if : Result.response=fail/null
if (lookupResult.getResponse() == null || LookupType.Failed.equals(lookupResult.getResponse())) {
if (lookupResult.hasError()) {
requestFuture.completeExceptionally(
getPulsarClientException(lookupResult.getError(), lookupResult.getMessage()));
} else {
requestFuture
.completeExceptionally(new PulsarClientException.LookupException("Empty lookup response"));
}
} else {
// return LookupDataResult when Result.response = success/redirect
boolean redirect = LookupType.Redirect.equals(lookupResult.getResponse());
requestFuture.complete(new LookupDataResult(lookupResult.getPartitions(),
lookupResult.getBrokerServiceUrl(), lookupResult.getBrokerServiceUrlTls(), redirect));
}
} else {
log.warn("{} Received unknown request id from server: {}", ctx.channel(), lookupResult.getRequestId());
}
}
@Override
protected void handleSendError(CommandSendError sendError) {
......@@ -216,7 +278,7 @@ public class ClientCnx extends PulsarHandler {
}
CompletableFuture<String> requestFuture = pendingRequests.remove(requestId);
if (requestFuture != null) {
requestFuture.completeExceptionally(getPulsarClientException(error));
requestFuture.completeExceptionally(getPulsarClientException(error.getError(), error.getMessage()));
} else {
log.warn("{} Received unknown request id from server: {}", ctx.channel(), error.getRequestId());
}
......@@ -251,6 +313,18 @@ public class ClientCnx extends PulsarHandler {
return state == State.Ready;
}
CompletableFuture<LookupDataResult> newLookup(ByteBuf request, long requestId) {
CompletableFuture<LookupDataResult> future = new CompletableFuture<>();
pendingLookupRequests.put(requestId, future);
ctx.writeAndFlush(request).addListener(writeFuture -> {
if (!writeFuture.isSuccess()) {
log.warn("{} Failed to send request to broker: {}", ctx.channel(), writeFuture.cause().getMessage());
future.completeExceptionally(writeFuture.cause());
}
});
return future;
}
Promise<Void> newPromise() {
return ctx.newPromise();
}
......@@ -299,27 +373,27 @@ public class ClientCnx extends PulsarHandler {
consumers.remove(consumerId);
}
private PulsarClientException getPulsarClientException(CommandError error) {
switch (error.getError()) {
private PulsarClientException getPulsarClientException(ServerError error, String errorMsg) {
switch (error) {
case AuthenticationError:
return new PulsarClientException.AuthenticationException(error.getMessage());
return new PulsarClientException.AuthenticationException(errorMsg);
case AuthorizationError:
return new PulsarClientException.AuthorizationException(error.getMessage());
return new PulsarClientException.AuthorizationException(errorMsg);
case ConsumerBusy:
return new PulsarClientException.ConsumerBusyException(error.getMessage());
return new PulsarClientException.ConsumerBusyException(errorMsg);
case MetadataError:
return new PulsarClientException.BrokerMetadataException(error.getMessage());
return new PulsarClientException.BrokerMetadataException(errorMsg);
case PersistenceError:
return new PulsarClientException.BrokerPersistenceException(error.getMessage());
return new PulsarClientException.BrokerPersistenceException(errorMsg);
case ServiceNotReady:
return new PulsarClientException.LookupException(error.getMessage());
return new PulsarClientException.LookupException(errorMsg);
case ProducerBlockedQuotaExceededError:
return new PulsarClientException.ProducerBlockedQuotaExceededError(error.getMessage());
return new PulsarClientException.ProducerBlockedQuotaExceededError(errorMsg);
case ProducerBlockedQuotaExceededException:
return new PulsarClientException.ProducerBlockedQuotaExceededException(error.getMessage());
return new PulsarClientException.ProducerBlockedQuotaExceededException(errorMsg);
case UnknownError:
default:
return new PulsarClientException(error.getMessage());
return new PulsarClientException(errorMsg);
}
}
......
......@@ -115,7 +115,7 @@ public class ConnectionPool implements Closeable {
return createConnection(address, -1);
}
final int randomKey = Math.abs(random.nextInt()) % maxConnectionsPerHosts;
final int randomKey = signSafeMod(random.nextInt(), maxConnectionsPerHosts);
return pool.computeIfAbsent(address, a -> new ConcurrentHashMap<>()) //
.computeIfAbsent(randomKey, k -> createConnection(address, randomKey));
......@@ -176,6 +176,14 @@ public class ConnectionPool implements Closeable {
map.remove(connectionKey, connectionFuture);
}
}
public static int signSafeMod(long dividend, int divisor) {
int mod = (int) (dividend % (long) divisor);
if (mod < 0) {
mod += divisor;
}
return mod;
}
private static final Logger log = LoggerFactory.getLogger(ConnectionPool.class);
}
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.client.impl;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.yahoo.pulsar.client.util.FutureUtil;
import com.yahoo.pulsar.common.lookup.data.LookupData;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.partition.PartitionedTopicMetadata;
class HttpLookupService implements LookupService {
private final HttpClient httpClient;
private final boolean useTls;
private static final String BasePath = "lookup/v2/destination/";
public HttpLookupService(HttpClient httpClient, boolean useTls) {
this.httpClient = httpClient;
this.useTls = useTls;
}
/**
* Calls http-lookup api to find broker-service address which can serve a given topic.
*
* @param destination: topic-name
* @return broker-socket-address that serves given topic
*/
@SuppressWarnings("deprecation")
public CompletableFuture<InetSocketAddress> getBroker(DestinationName destination) {
return httpClient.get(BasePath + destination.getLookupName(), LookupData.class).thenCompose(lookupData -> {
// Convert LookupData into as SocketAddress, handling exceptions
URI uri = null;
try {
if (useTls) {
uri = new URI(lookupData.getBrokerUrlTls());
} else {
String serviceUrl = lookupData.getBrokerUrl();
if (serviceUrl == null) {
serviceUrl = lookupData.getNativeUrl();
}
uri = new URI(serviceUrl);
}
return CompletableFuture.completedFuture(new InetSocketAddress(uri.getHost(), uri.getPort()));
} catch (Exception e) {
// Failed to parse url
log.warn("[{}] Lookup Failed due to invalid url {}, {}", destination, uri, e.getMessage());
return FutureUtil.failedFuture(e);
}
});
}
public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(DestinationName destination) {
return httpClient.get(String.format("admin/%s/partitions", destination.getLookupName()),
PartitionedTopicMetadata.class);
}
public String getServiceUrl() {
return httpClient.url.toString();
}
private static final Logger log = LoggerFactory.getLogger(HttpLookupService.class);
}
......@@ -16,44 +16,46 @@
package com.yahoo.pulsar.client.impl;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import com.yahoo.pulsar.client.util.FutureUtil;
import com.yahoo.pulsar.common.lookup.data.LookupData;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.partition.PartitionedTopicMetadata;
class LookupService {
private final HttpClient httpClient;
private final boolean useTls;
private static final String BasePath = "lookup/v2/destination/";
public LookupService(HttpClient httpClient, boolean useTls) {
this.httpClient = httpClient;
this.useTls = useTls;
}
/**
* Provides lookup service to find broker which serves given topic. It helps to
* lookup
* <ul>
* <li><b>topic-lookup:</b> lookup to find broker-address which serves given
* topic</li>
* <li><b>Partitioned-topic-Metadata-lookup:</b> lookup to find
* PartitionedMetadata for a given topic</li>
* </ul>
*
*/
interface LookupService {
@SuppressWarnings("deprecation")
public CompletableFuture<InetSocketAddress> getBroker(DestinationName destination) {
return httpClient.get(BasePath + destination.getLookupName(), LookupData.class).thenCompose(lookupData -> {
// Convert LookupData into as SocketAddress, handling exceptions
try {
URI uri;
if (useTls) {
uri = new URI(lookupData.getBrokerUrlTls());
} else {
String serviceUrl = lookupData.getBrokerUrl();
if (serviceUrl == null) {
serviceUrl = lookupData.getNativeUrl();
}
uri = new URI(serviceUrl);
}
return CompletableFuture.completedFuture(new InetSocketAddress(uri.getHost(), uri.getPort()));
} catch (Exception e) {
// Failed to parse url
return FutureUtil.failedFuture(e);
}
});
}
/**
* Calls broker lookup-api to get broker {@link InetSocketAddress} which serves namespacebundle that
* contains given topic.
*
* @param destination:
* topic-name
* @return broker-socket-address that serves given topic
*/
public CompletableFuture<InetSocketAddress> getBroker(DestinationName topic);
/**
* Returns {@link PartitionedTopicMetadata} for a given topic.
*
* @param destination : topic-name
* @return
*/
public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(DestinationName destination);
/**
* Returns broker-service lookup api url.
*
* @return
*/
public String getServiceUrl();
}
......@@ -57,9 +57,8 @@ public class PulsarClientImpl implements PulsarClient {
private static final Logger log = LoggerFactory.getLogger(PulsarClientImpl.class);
private final ClientConfiguration conf;
private final HttpClient httpClient;
private HttpClient httpClient;
private final LookupService lookup;
private final PartitionMetadataLookupService partition;
private final ConnectionPool cnxPool;
private final Timer timer;
private final ExecutorProvider externalExecutorProvider;
......@@ -75,7 +74,7 @@ public class PulsarClientImpl implements PulsarClient {
private final AtomicLong producerIdGenerator = new AtomicLong();
private final AtomicLong consumerIdGenerator = new AtomicLong();
private final AtomicLong requestIdGenerator = new AtomicLong();
protected static final AtomicLong requestIdGenerator = new AtomicLong();
public PulsarClientImpl(String serviceUrl, ClientConfiguration conf) throws PulsarClientException {
this(serviceUrl, conf, getEventLoopGroup(conf), null);
......@@ -88,12 +87,14 @@ public class PulsarClientImpl implements PulsarClient {
}
this.conf = conf;
conf.getAuthentication().start();
httpClient = new HttpClient(serviceUrl, conf.getAuthentication(), eventLoopGroup,
conf.isTlsAllowInsecureConnection(), conf.getTlsTrustCertsFilePath());
lookup = new LookupService(httpClient, conf.isUseTls());
partition = new PartitionMetadataLookupService(httpClient);
cnxPool = new ConnectionPool(this, eventLoopGroup);
if (serviceUrl.startsWith("http")) {
httpClient = new HttpClient(serviceUrl, conf.getAuthentication(), eventLoopGroup,
conf.isTlsAllowInsecureConnection(), conf.getTlsTrustCertsFilePath());
lookup = new HttpLookupService(httpClient, conf.isUseTls());
} else {
lookup = new BinaryProtoLookupService(cnxPool, serviceUrl, conf.isUseTls());
}
timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
externalExecutorProvider = new ExecutorProvider(conf.getListenerThreads(), "pulsar-external-listener");
internalExecutorProvider = new ExecutorProvider(conf.getListenerThreads(), "pulsar-internal-listener");
......@@ -287,7 +288,7 @@ public class PulsarClientImpl implements PulsarClient {
@Override
public CompletableFuture<Void> closeAsync() {
log.info("Client closing. URL: {}", this.httpClient.url.toString());
log.info("Client closing. URL: {}", lookup.getServiceUrl());
if (!state.compareAndSet(State.Open, State.Closing)) {
return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
}
......@@ -327,7 +328,9 @@ public class PulsarClientImpl implements PulsarClient {
@Override
public void shutdown() throws PulsarClientException {
try {
httpClient.close();
if (httpClient!= null) {
httpClient.close();
}
cnxPool.close();
timer.stop();
externalExecutorProvider.shutdownNow();
......@@ -339,11 +342,10 @@ public class PulsarClientImpl implements PulsarClient {
}
}
protected CompletableFuture<ClientCnx> getConnection(final String topic) {
DestinationName destinationName = DestinationName.get(topic);
return lookup.getBroker(destinationName).thenCompose((brokerAddress) -> cnxPool.getConnection(brokerAddress));
}
protected CompletableFuture<ClientCnx> getConnection(final String topic) {
DestinationName destinationName = DestinationName.get(topic);
return lookup.getBroker(destinationName).thenCompose((brokerAddress) -> cnxPool.getConnection(brokerAddress));
}
protected Timer timer() {
return timer;
......@@ -375,7 +377,7 @@ public class PulsarClientImpl implements PulsarClient {
try {
DestinationName destinationName = DestinationName.get(topic);
metadataFuture = partition.getPartitionedTopicMetadata(destinationName);
metadataFuture = lookup.getPartitionedTopicMetadata(destinationName);
} catch (IllegalArgumentException e) {
return FutureUtil.failedFuture(e);
}
......
......@@ -34,7 +34,12 @@ import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandConnect;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandConnected;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandError;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandFlow;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandMessage;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadataResponse;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandPing;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandPong;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandProducer;
......@@ -342,6 +347,99 @@ public class Commands {
return res;
}
public static ByteBuf newPartitionMetadataResponse(ServerError error, String errorMsg, long requestId) {
CommandPartitionedTopicMetadataResponse.Builder partitionMetadataResponseBuilder = CommandPartitionedTopicMetadataResponse
.newBuilder();
partitionMetadataResponseBuilder.setRequestId(requestId);
partitionMetadataResponseBuilder.setError(error);
if (errorMsg != null) {
partitionMetadataResponseBuilder.setMessage(errorMsg);
}
CommandPartitionedTopicMetadataResponse partitionMetadataResponse = partitionMetadataResponseBuilder.build();
ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.PARTITIONED_METADATA_RESPONSE)
.setPartitionMetadataResponse(partitionMetadataResponse));
partitionMetadataResponseBuilder.recycle();
partitionMetadataResponse.recycle();
return res;
}
public static ByteBuf newPartitionMetadataRequest(String topic, long requestId) {
CommandPartitionedTopicMetadata.Builder partitionMetadataBuilder = CommandPartitionedTopicMetadata.newBuilder();
partitionMetadataBuilder.setTopic(topic);
partitionMetadataBuilder.setRequestId(requestId);
CommandPartitionedTopicMetadata partitionMetadata = partitionMetadataBuilder.build();
ByteBuf res = serializeWithSize(
BaseCommand.newBuilder().setType(Type.PARTITIONED_METADATA).setPartitionMetadata(partitionMetadata));
partitionMetadataBuilder.recycle();
partitionMetadata.recycle();
return res;
}
public static ByteBuf newPartitionMetadataResponse(int partitions, long requestId) {
CommandPartitionedTopicMetadataResponse.Builder partitionMetadataResponseBuilder = CommandPartitionedTopicMetadataResponse
.newBuilder();
partitionMetadataResponseBuilder.setPartitions(partitions);
partitionMetadataResponseBuilder.setRequestId(requestId);
CommandPartitionedTopicMetadataResponse partitionMetadataResponse = partitionMetadataResponseBuilder.build();
ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.PARTITIONED_METADATA_RESPONSE)
.setPartitionMetadataResponse(partitionMetadataResponse));
partitionMetadataResponseBuilder.recycle();
partitionMetadataResponse.recycle();
return res;
}
public static ByteBuf newLookup(String topic, boolean authoritative, long requestId) {
CommandLookupTopic.Builder lookupTopicBuilder = CommandLookupTopic.newBuilder();
lookupTopicBuilder.setTopic(topic);
lookupTopicBuilder.setRequestId(requestId);
lookupTopicBuilder.setAuthoritative(authoritative);
CommandLookupTopic lookupBroker = lookupTopicBuilder.build();
ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.LOOKUP).setLookupTopic(lookupBroker));
lookupTopicBuilder.recycle();
lookupBroker.recycle();
return res;
}
public static ByteBuf newLookupResponse(String brokerServiceUrl, String brokerServiceUrlTls, boolean authoritative,
LookupType response, long requestId) {
CommandLookupTopicResponse.Builder connectionBuilder = CommandLookupTopicResponse.newBuilder();
connectionBuilder.setBrokerServiceUrl(brokerServiceUrl);
if (brokerServiceUrlTls != null) {
connectionBuilder.setBrokerServiceUrlTls(brokerServiceUrlTls);
}
connectionBuilder.setResponse(response);
connectionBuilder.setRequestId(requestId);
connectionBuilder.setAuthoritative(authoritative);
CommandLookupTopicResponse connectionLookupResponse = connectionBuilder.build();
ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.LOOKUP_RESPONSE)
.setLookupTopicResponse(connectionLookupResponse));
connectionBuilder.recycle();
connectionLookupResponse.recycle();
return res;
}
public static ByteBuf newLookupResponse(ServerError error, String errorMsg, long requestId) {
CommandLookupTopicResponse.Builder connectionBuilder = CommandLookupTopicResponse.newBuilder();
connectionBuilder.setRequestId(requestId);
connectionBuilder.setError(error);
if (errorMsg != null) {
connectionBuilder.setMessage(errorMsg);
}
connectionBuilder.setResponse(LookupType.Failed);
CommandLookupTopicResponse connectionBroker = connectionBuilder.build();
ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.LOOKUP_RESPONSE).setLookupTopicResponse(connectionBroker));
connectionBuilder.recycle();
connectionBroker.recycle();
return res;
}
public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, AckType ackType,
ValidationError validationError) {
CommandAck.Builder ackBuilder = CommandAck.newBuilder();
......
......@@ -28,7 +28,11 @@ import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandConnect;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandConnected;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandError;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandFlow;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandMessage;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadataResponse;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandPing;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandPong;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandProducer;
......@@ -79,6 +83,30 @@ public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter {
messageReceived();
switch (cmd.getType()) {
case PARTITIONED_METADATA:
checkArgument(cmd.hasPartitionMetadata());
handlePartitionMetadataRequest(cmd.getPartitionMetadata());
cmd.getPartitionMetadata().recycle();
break;
case PARTITIONED_METADATA_RESPONSE:
checkArgument(cmd.hasPartitionMetadataResponse());
handlePartitionResponse(cmd.getPartitionMetadataResponse());
cmd.getPartitionMetadataResponse().recycle();
break;
case LOOKUP:
checkArgument(cmd.hasLookupTopic());
handleLookup(cmd.getLookupTopic());
cmd.getLookupTopic().recycle();
break;
case LOOKUP_RESPONSE:
checkArgument(cmd.hasLookupTopicResponse());
handleLookupResponse(cmd.getLookupTopicResponse());
cmd.getLookupTopicResponse().recycle();
break;
case ACK:
checkArgument(cmd.hasAck());
handleAck(cmd.getAck());
......@@ -212,6 +240,22 @@ public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter {
protected abstract void messageReceived();
protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata response) {
throw new UnsupportedOperationException();
}
protected void handlePartitionResponse(CommandPartitionedTopicMetadataResponse response) {
throw new UnsupportedOperationException();
}
protected void handleLookup(CommandLookupTopic lookup) {
throw new UnsupportedOperationException();
}
protected void handleLookupResponse(CommandLookupTopicResponse connection) {
throw new UnsupportedOperationException();
}
protected void handleConnect(CommandConnect connect) {
throw new UnsupportedOperationException();
}
......
......@@ -57,8 +57,10 @@ public abstract class PulsarHandler extends PulsarDecoder {
if (log.isDebugEnabled()) {
log.debug("[{}] Scheduling keep-alive task every {} s", ctx.channel(), keepAliveIntervalSeconds);
}
this.keepAliveTask = ctx.executor().scheduleAtFixedRate(this::handleKeepAliveTimeout, keepAliveIntervalSeconds,
keepAliveIntervalSeconds, TimeUnit.SECONDS);
if (keepAliveIntervalSeconds > 0) {
this.keepAliveTask = ctx.executor().scheduleAtFixedRate(this::handleKeepAliveTimeout,
keepAliveIntervalSeconds, keepAliveIntervalSeconds, TimeUnit.SECONDS);
}
}
@Override
......
......@@ -21,15 +21,23 @@ public class LookupData {
private String brokerUrl;
private String brokerUrlTls;
private String httpUrl; // Web service HTTP address
private String httpUrlTls; // Web service HTTPS address
private String nativeUrl;
public LookupData() {
}
public LookupData(String brokerUrl, String brokerUrlTls, String httpUrl) {
public LookupData(String brokerUrl, String brokerUrlTls, String httpUrl, String httpUrlTls) {
this.brokerUrl = brokerUrl;
this.brokerUrlTls = brokerUrlTls;
this.httpUrl = httpUrl;
this.httpUrlTls = httpUrlTls;
this.nativeUrl = brokerUrl;
}
public LookupData(String brokerUrl, String brokerUrlTls, boolean redirect, boolean authoritative) {
this.brokerUrl = brokerUrl;
this.brokerUrlTls = brokerUrlTls;
this.nativeUrl = brokerUrl;
}
......@@ -44,6 +52,14 @@ public class LookupData {
public String getHttpUrl() {
return httpUrl;
}
public String getHttpUrlTls() {
return httpUrlTls;
}
public void setHttpUrlTls(String httpUrlTls) {
this.httpUrlTls = httpUrlTls;
}
/**
* Legacy name, but client libraries are still using it so it needs to be included in Json
......
......@@ -20,6 +20,8 @@ import com.google.common.base.Objects;
public class ClusterData {
private String serviceUrl;
private String serviceUrlTls;
private String brokerServiceUrl;
private String brokerServiceUrlTls;
public ClusterData() {
}
......@@ -32,6 +34,13 @@ public class ClusterData {
this.serviceUrl = serviceUrl;
this.serviceUrlTls = serviceUrlTls;
}
public ClusterData(String serviceUrl, String serviceUrlTls, String brokerServiceUrl, String brokerServiceUrlTls) {
this.serviceUrl = serviceUrl;
this.serviceUrlTls = serviceUrlTls;
this.brokerServiceUrl = brokerServiceUrl;
this.brokerServiceUrlTls = brokerServiceUrlTls;
}
public String getServiceUrl() {
return serviceUrl;
......@@ -48,12 +57,30 @@ public class ClusterData {
public void setServiceUrlTls(String serviceUrlTls) {
this.serviceUrlTls = serviceUrlTls;
}
public String getBrokerServiceUrl() {
return brokerServiceUrl;
}
public void setBrokerServiceUrl(String brokerServiceUrl) {
this.brokerServiceUrl = brokerServiceUrl;
}
public String getBrokerServiceUrlTls() {
return brokerServiceUrlTls;
}
public void setBrokerServiceUrlTls(String brokerServiceUrlTls) {
this.brokerServiceUrlTls = brokerServiceUrlTls;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof ClusterData) {
ClusterData other = (ClusterData) obj;
return Objects.equal(serviceUrl, other.serviceUrl) && Objects.equal(serviceUrlTls, other.serviceUrlTls);
return Objects.equal(serviceUrl, other.serviceUrl) && Objects.equal(serviceUrlTls, other.serviceUrlTls)
&& Objects.equal(brokerServiceUrl, other.brokerServiceUrl)
&& Objects.equal(brokerServiceUrlTls, other.brokerServiceUrlTls);
}
return false;
......@@ -61,19 +88,26 @@ public class ClusterData {
@Override
public int hashCode() {
if (serviceUrlTls != null && !serviceUrlTls.isEmpty()) {
return Objects.hashCode(serviceUrl + serviceUrlTls);
} else {
return Objects.hashCode(serviceUrl);
}
return Objects.hashCode(this.toString());
}
@Override
public String toString() {
if (serviceUrlTls == null || serviceUrlTls.isEmpty()) {
return serviceUrl;
} else {
return serviceUrl + "," + serviceUrlTls;
StringBuilder str = new StringBuilder();
str.append(serviceUrl);
if (serviceUrlTls != null && !serviceUrlTls.isEmpty()) {
str.append(",");
str.append(serviceUrlTls);
}
if (brokerServiceUrl != null && !brokerServiceUrl.isEmpty()) {
str.append(",");
str.append(brokerServiceUrl);
}
if (brokerServiceUrlTls != null && !brokerServiceUrlTls.isEmpty()) {
str.append(",");
str.append(brokerServiceUrlTls);
}
return str.toString();
}
}
......@@ -127,7 +127,18 @@ public class ByteBufCodedOutputStream {
writeTag(fieldNumber, WireFormat.WIRETYPE_VARINT);
writeUInt64NoTag(value);
}
/** Write a {@code bool} field, including tag, to the stream. */
public void writeBool(final int fieldNumber, final boolean value) throws IOException {
writeTag(fieldNumber, WireFormat.WIRETYPE_VARINT);
writeBoolNoTag(value);
}
/** Write a {@code bool} field to the stream. */
public void writeBoolNoTag(final boolean value) throws IOException {
writeRawByte(value ? 1 : 0);
}
/** Write a {@code uint64} field to the stream. */
public void writeUInt64NoTag(final long value) throws IOException {
writeRawVarint64(value);
......
......@@ -125,6 +125,50 @@ message CommandSubscribe {
optional string consumer_name = 6;
}
message CommandPartitionedTopicMetadata {
required string topic = 1;
required uint64 request_id = 2;
}
message CommandPartitionedTopicMetadataResponse {
enum LookupType {
Redirect = 0;
Success = 1;
Failed = 2;
}
optional uint32 partitions = 1; // Optional in case of error
required uint64 request_id = 2;
optional LookupType response = 3;
optional string brokerServiceUrl = 4; // Present in case of redirect
optional string brokerServiceUrlTls = 5;
optional ServerError error = 6;
optional string message = 7;
}
message CommandLookupTopic {
required string topic = 1;
required uint64 request_id = 2;
optional bool authoritative = 3 [default = false];
}
message CommandLookupTopicResponse {
enum LookupType {
Redirect = 0;
Connect = 1;
Failed = 2;
}
optional string brokerServiceUrl = 1; // Optional in case of error
optional string brokerServiceUrlTls = 2;
optional LookupType response = 3;
required uint64 request_id = 4;
optional bool authoritative = 5 [default = false];
optional ServerError error = 6;
optional string message = 7;
}
/// Create a new Producer on a topic, assigning the given producer_id,
/// all messages sent with this producer_id will be persisted on the topic
message CommandProducer {
......@@ -266,6 +310,12 @@ message BaseCommand {
PONG = 19;
REDELIVER_UNACKNOWLEDGED_MESSAGES = 20;
PARTITIONED_METADATA = 21;
PARTITIONED_METADATA_RESPONSE = 22;
LOOKUP = 23;
LOOKUP_RESPONSE = 24;
}
required Type type = 1;
......@@ -293,4 +343,10 @@ message BaseCommand {
optional CommandPing ping = 18;
optional CommandPong pong = 19;
optional CommandRedeliverUnacknowledgedMessages redeliverUnacknowledgedMessages = 20;
optional CommandPartitionedTopicMetadata partitionMetadata = 21;
optional CommandPartitionedTopicMetadataResponse partitionMetadataResponse = 22;
optional CommandLookupTopic lookupTopic = 23;
optional CommandLookupTopicResponse lookupTopicResponse = 24;
}
......@@ -30,7 +30,8 @@ public class LookupDataTest {
@Test
void withConstructor() {
LookupData data = new LookupData("pulsar://localhost:8888", "pulsar://localhost:8884", "http://localhost:8080");
LookupData data = new LookupData("pulsar://localhost:8888", "pulsar://localhost:8884", "http://localhost:8080",
"http://localhost:8081");
assertEquals(data.getBrokerUrl(), "pulsar://localhost:8888");
assertEquals(data.getHttpUrl(), "http://localhost:8080");
}
......@@ -38,7 +39,8 @@ public class LookupDataTest {
@SuppressWarnings("unchecked")
@Test
void serializeToJsonTest() throws Exception {
LookupData data = new LookupData("pulsar://localhost:8888", "pulsar://localhost:8884", "http://localhost:8080");
LookupData data = new LookupData("pulsar://localhost:8888", "pulsar://localhost:8884", "http://localhost:8080",
"http://localhost:8081");
ObjectMapper mapper = ObjectMapperFactory.getThreadLocal();
String json = mapper.writeValueAsString(data);
......@@ -49,5 +51,6 @@ public class LookupDataTest {
assertEquals(jsonMap.get("brokerUrlSsl"), "");
assertEquals(jsonMap.get("nativeUrl"), "pulsar://localhost:8888");
assertEquals(jsonMap.get("httpUrl"), "http://localhost:8080");
assertEquals(jsonMap.get("httpUrlTls"), "http://localhost:8081");
}
}
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.discovery.service;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.discovery.service.server.ServiceConfig;
import com.yahoo.pulsar.discovery.service.web.ZookeeperCacheLoader;
import com.yahoo.pulsar.zookeeper.ZooKeeperClientFactory;
import static org.apache.bookkeeper.util.MathUtils.signSafeMod;
/**
* Maintains available active broker list and returns next active broker in round-robin for discovery service.
*
*/
public class BrokerDiscoveryProvider {
private ZookeeperCacheLoader zkCache;
private final String zookeeperServers;
private final AtomicInteger counter = new AtomicInteger();
public BrokerDiscoveryProvider(ServiceConfig config) {
this.zookeeperServers = config.getZookeeperServers();
}
/**
* starts {@link ZookeeperCacheLoader} to maintain active-broker list
*
* @param zkClientFactory
* @throws PulsarServerException
*/
public void start(ZooKeeperClientFactory zkClientFactory) throws PulsarServerException {
try {
zkCache = new ZookeeperCacheLoader(zkClientFactory, zookeeperServers);
} catch (Exception e) {
LOG.error("Failed to start Zookkeeper {}", e.getMessage(), e);
throw new PulsarServerException("Failed to start zookeeper :" + e.getMessage(), e);
}
}
/**
* Find next broke {@link LoadReport} in round-robin fashion.
*
* @return
* @throws PulsarServerException
*/
LoadReport nextBroker() throws PulsarServerException {
List<LoadReport> availableBrokers = zkCache.getAvailableBrokers();
if (availableBrokers.isEmpty()) {
throw new PulsarServerException("No active broker is available");
} else {
int brokersCount = availableBrokers.size();
int nextIdx = signSafeMod(counter.getAndIncrement(), brokersCount);
return availableBrokers.get(nextIdx);
}
}
public void close() {
zkCache.close();
}
private static final Logger LOG = LoggerFactory.getLogger(BrokerDiscoveryProvider.class);
}
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.discovery.service;
import java.net.InetAddress;
import org.apache.commons.lang.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.yahoo.pulsar.discovery.service.server.ServiceConfig;
import com.yahoo.pulsar.zookeeper.ZooKeeperClientFactory;
import com.yahoo.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollMode;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
/**
* Main discovery-service which starts component to serve incoming discovery-request over binary-proto channel and
* redirects to one of the active broker
*
*/
public class DiscoveryService {
private final ServiceConfig config;
private final String serviceUrl;
private final String serviceUrlTls;
private ZooKeeperClientFactory zkClientFactory = null;
private BrokerDiscoveryProvider discoveryProvider;
private final EventLoopGroup acceptorGroup;
private final EventLoopGroup workerGroup;
private final DefaultThreadFactory acceptorThreadFactory = new DefaultThreadFactory("pulsar-discovery-acceptor");
private final DefaultThreadFactory workersThreadFactory = new DefaultThreadFactory("pulsar-discovery-io");
private final int numThreads = Runtime.getRuntime().availableProcessors();
public DiscoveryService(ServiceConfig serviceConfig) {
this.config = serviceConfig;
this.serviceUrl = serviceUrl();
this.serviceUrlTls = serviceUrlTls();
discoveryProvider = new BrokerDiscoveryProvider(serviceConfig);
EventLoopGroup acceptorEventLoop, workersEventLoop;
if (SystemUtils.IS_OS_LINUX) {
try {
acceptorEventLoop = new EpollEventLoopGroup(1, acceptorThreadFactory);
workersEventLoop = new EpollEventLoopGroup(numThreads, workersThreadFactory);
} catch (UnsatisfiedLinkError e) {
acceptorEventLoop = new NioEventLoopGroup(1, acceptorThreadFactory);
workersEventLoop = new NioEventLoopGroup(numThreads, workersThreadFactory);
}
} else {
acceptorEventLoop = new NioEventLoopGroup(1, acceptorThreadFactory);
workersEventLoop = new NioEventLoopGroup(numThreads, workersThreadFactory);
}
this.acceptorGroup = acceptorEventLoop;
this.workerGroup = workersEventLoop;
}
/**
* Starts discovery service by initializing zookkeeper and server
* @throws Exception
*/
public void start() throws Exception {
discoveryProvider.start(getZooKeeperClientFactory());
startServer();
}
/**
* starts server to handle discovery-request from client-channel
*
* @throws Exception
*/
public void startServer() throws Exception {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.group(acceptorGroup, workerGroup);
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024));
if (workerGroup instanceof EpollEventLoopGroup) {
bootstrap.channel(EpollServerSocketChannel.class);
bootstrap.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
} else {
bootstrap.channel(NioServerSocketChannel.class);
}
bootstrap.childHandler(new ServiceChannelInitializer(this, config, false));
// Bind and start to accept incoming connections.
bootstrap.bind(config.getServicePort()).sync();
LOG.info("Started Pulsar Broker service on port {}", config.getWebServicePort());
if (config.isTlsEnabled()) {
ServerBootstrap tlsBootstrap = bootstrap.clone();
tlsBootstrap.childHandler(new ServiceChannelInitializer(this, config, true));
tlsBootstrap.bind(config.getServicePortTls()).sync();
LOG.info("Started Pulsar Broker TLS service on port {}", config.getWebServicePortTls());
}
}
public ZooKeeperClientFactory getZooKeeperClientFactory() {
if (zkClientFactory == null) {
zkClientFactory = new ZookeeperClientFactoryImpl();
}
// Return default factory
return zkClientFactory;
}
public BrokerDiscoveryProvider getDiscoveryProvider() {
return discoveryProvider;
}
public void close() {
discoveryProvider.close();
acceptorGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
/**
* Derive the host
*
* @param isBindOnLocalhost
* @return
*/
public String host() {
try {
if (!config.isBindOnLocalhost()) {
return InetAddress.getLocalHost().getHostName();
} else {
return "localhost";
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new IllegalStateException("failed to find host", e);
}
}
public String serviceUrl() {
return new StringBuilder("pulsar://").append(host()).append(":").append(config.getServicePort()).toString();
}
public String serviceUrlTls() {
if (config.isTlsEnabled()) {
return new StringBuilder("pulsar://").append(host()).append(":").append(config.getServicePortTls())
.toString();
} else {
return "";
}
}
public String getServiceUrl() {
return serviceUrl;
}
public String getServiceUrlTls() {
return serviceUrlTls;
}
private static final Logger LOG = LoggerFactory.getLogger(DiscoveryService.class);
}
\ No newline at end of file
......@@ -13,23 +13,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.client.impl;
package com.yahoo.pulsar.discovery.service;
import java.util.concurrent.CompletableFuture;
import java.io.IOException;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.partition.PartitionedTopicMetadata;
public class PulsarServerException extends IOException {
private static final long serialVersionUID = 1;
public class PartitionMetadataLookupService {
private final HttpClient httpClient;
public PulsarServerException(String message) {
super(message);
}
public PartitionMetadataLookupService(HttpClient httpClient) {
this.httpClient = httpClient;
public PulsarServerException(Throwable cause) {
super(cause);
}
public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(DestinationName destination) {
return httpClient.get(String.format("admin/%s/partitions", destination.getLookupName()),
PartitionedTopicMetadata.class);
public PulsarServerException(String message, Throwable cause) {
super(message, cause);
}
};
\ No newline at end of file
}
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.discovery.service;
import static com.google.common.base.Preconditions.checkArgument;
import static com.yahoo.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType.Redirect;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.yahoo.pulsar.common.api.Commands;
import com.yahoo.pulsar.common.api.PulsarHandler;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandConnect;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
import com.yahoo.pulsar.common.api.proto.PulsarApi.ServerError;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
/**
* Handles incoming discovery request from client and sends appropriate response back to client
*
*/
public class ServerConnection extends PulsarHandler {
private BrokerDiscoveryProvider discoveryProvider;
private State state;
enum State {
Start, Connected
}
public ServerConnection(BrokerDiscoveryProvider discoveryhandler) {
super(0, TimeUnit.SECONDS); // discovery-service doesn't need to run keepAlive task
this.discoveryProvider = discoveryhandler;
this.state = State.Start;
}
/**
* handles connect request and sends {@code State.Connected} ack to client
*/
@Override
protected void handleConnect(CommandConnect connect) {
checkArgument(state == State.Start);
if (LOG.isDebugEnabled()) {
LOG.debug("Received CONNECT from {}", remoteAddress);
}
ctx.writeAndFlush(Commands.newConnected(connect));
state = State.Connected;
remoteEndpointProtocolVersion = connect.getProtocolVersion();
}
@Override
protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) {
checkArgument(state == State.Connected);
if (LOG.isDebugEnabled()) {
LOG.debug("Received PartitionMetadataLookup from {}", remoteAddress);
}
sendLookupResponse(partitionMetadata.getRequestId());
}
/**
* handles discovery request from client ands sends next active broker address
*/
@Override
protected void handleLookup(CommandLookupTopic lookup) {
checkArgument(state == State.Connected);
if (LOG.isDebugEnabled()) {
LOG.debug("Received Lookup from {}", remoteAddress);
}
sendLookupResponse(lookup.getRequestId());
}
private void sendLookupResponse(long requestId) {
try {
LoadReport availableBroker = discoveryProvider.nextBroker();
ctx.writeAndFlush(Commands.newLookupResponse(availableBroker.getPulsarServiceUrl(),
availableBroker.getPulsarServieUrlTls(), false, Redirect, requestId));
} catch (PulsarServerException e) {
LOG.warn("[{}] Failed to get next active broker {}", remoteAddress, e.getMessage(), e);
ctx.writeAndFlush(
Commands.newLookupResponse(ServerError.ServiceNotReady, e.getMessage(), requestId));
}
}
@Override
protected boolean isHandshakeCompleted() {
return state == State.Connected;
}
private static final Logger LOG = LoggerFactory.getLogger(ServerConnection.class);
}
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.discovery.service;
import java.io.File;
import com.yahoo.pulsar.common.api.PulsarDecoder;
import com.yahoo.pulsar.common.api.PulsarLengthFieldFrameDecoder;
import com.yahoo.pulsar.discovery.service.server.ServiceConfig;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
/**
* Initialize service channel handlers.
*
*/
public class ServiceChannelInitializer extends ChannelInitializer<SocketChannel> {
public static final String TLS_HANDLER = "tls";
private ServiceConfig serviceConfig;
private DiscoveryService discoveryService;
private boolean enableTLS;
public ServiceChannelInitializer(DiscoveryService discoveryService, ServiceConfig serviceConfig, boolean enableTLS) {
super();
this.serviceConfig = serviceConfig;
this.discoveryService = discoveryService;
this.enableTLS = enableTLS;
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
if (enableTLS) {
File tlsCert = new File(serviceConfig.getTlsCertificateFilePath());
File tlsKey = new File(serviceConfig.getTlsKeyFilePath());
SslContextBuilder builder = SslContextBuilder.forServer(tlsCert, tlsKey);
// allows insecure connection
builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
SslContext sslCtx = builder.clientAuth(ClientAuth.OPTIONAL).build();
ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
}
ch.pipeline().addLast("frameDecoder",
new PulsarLengthFieldFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
ch.pipeline().addLast("handler", new ServerConnection(discoveryService.getDiscoveryProvider()));
}
}
......@@ -33,6 +33,7 @@ import java.util.TreeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.yahoo.pulsar.discovery.service.DiscoveryService;
import com.yahoo.pulsar.discovery.service.web.DiscoveryServiceServlet;
/**
......@@ -52,19 +53,29 @@ public class DiscoveryServiceStarter {
// load config file
final ServiceConfig config = load(configFile);
// create broker service
DiscoveryService discoveryService = new DiscoveryService(config);
// create a web-service
final ServerManager server = new ServerManager(config);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
discoveryService.close();
server.stop();
} catch (Exception e) {
log.warn("server couldn't stop gracefully {}", e.getMessage(), e);
}
}
});
discoveryService.start();
startWebService(server, config);
}
protected static void startWebService(ServerManager server, ServiceConfig config) throws Exception {
// add servlet
Map<String, String> initParameters = new TreeMap<>();
initParameters.put("zookeeperServers", config.getZookeeperServers());
......
......@@ -133,6 +133,10 @@ public class ServerManager {
webServiceExecutor.shutdown();
log.info("Server stopped successfully");
}
public boolean isStarted() {
return server.isStarted();
}
private static final Logger log = LoggerFactory.getLogger(ServerManager.class);
}
......@@ -25,10 +25,17 @@ public class ServiceConfig {
// Zookeeper quorum connection string
private String zookeeperServers;
// Port to use to server binary-proto request
private int servicePort = 5000;
// Port to use to server binary-proto-tls request
private int servicePortTls = 5001;
// Port to use to server HTTP request
private int webServicePort = 8080;
// Port to use to server HTTPS request
private int webServicePortTls = 8443;
// Control whether to bind directly on localhost rather than on normal
// hostname
private boolean bindOnLocalhost = false;
/***** --- TLS --- ****/
// Enable TLS
......@@ -46,6 +53,22 @@ public class ServiceConfig {
this.zookeeperServers = zookeeperServers;
}
public int getServicePort() {
return servicePort;
}
public void setServicePort(int servicePort) {
this.servicePort = servicePort;
}
public int getServicePortTls() {
return servicePortTls;
}
public void setServicePortTls(int servicePortTls) {
this.servicePortTls = servicePortTls;
}
public int getWebServicePort() {
return webServicePort;
}
......@@ -86,4 +109,12 @@ public class ServiceConfig {
this.tlsKeyFilePath = tlsKeyFilePath;
}
public boolean isBindOnLocalhost() {
return bindOnLocalhost;
}
public void setBindOnLocalhost(boolean bindOnLocalhost) {
this.bindOnLocalhost = bindOnLocalhost;
}
}
......@@ -130,7 +130,7 @@ public class DiscoveryServiceServlet extends HttpServlet {
location.append('?').append(request.getQueryString());
}
if (log.isDebugEnabled()) {
log.info("Redirecting to {}", location);
log.info("Redirecting to {}", location);
}
response.sendRedirect(location.toString());
} catch (URISyntaxException e) {
......
......@@ -49,7 +49,7 @@ public class ZookeeperCacheLoader implements Closeable {
private final OrderedSafeExecutor orderedExecutor = new OrderedSafeExecutor(8, "pulsar-discovery");
static final String LOADBALANCE_BROKERS_ROOT = "/loadbalance/brokers";
public static final String LOADBALANCE_BROKERS_ROOT = "/loadbalance/brokers";
private static final int zooKeeperSessionTimeoutMillis = 30_000;
......
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.discovery.service;
import static com.yahoo.pulsar.discovery.service.web.ZookeeperCacheLoader.LOADBALANCE_BROKERS_ROOT;
import static org.apache.bookkeeper.test.PortManager.nextFreePort;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import com.google.common.util.concurrent.MoreExecutors;
import com.yahoo.pulsar.discovery.service.server.ServiceConfig;
import com.yahoo.pulsar.zookeeper.ZooKeeperClientFactory;
import com.yahoo.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
public class BaseDiscoveryTestSetup {
protected ServiceConfig config;
protected DiscoveryService service;
protected MockZooKeeper mockZookKeeper;
private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt";
private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/certificate/server.key";
protected void setup() throws Exception {
config = new ServiceConfig();
config.setServicePort(nextFreePort());
config.setServicePortTls(nextFreePort());
config.setBindOnLocalhost(true);
config.setTlsEnabled(true);
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
mockZookKeeper = createMockZooKeeper();
service = spy(new DiscoveryService(config));
doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory();
service.start();
}
protected void cleanup() throws Exception {
mockZookKeeper.shutdown();
service.close();
}
protected MockZooKeeper createMockZooKeeper() throws Exception {
MockZooKeeper zk = MockZooKeeper.newInstance(MoreExecutors.sameThreadExecutor());
ZkUtils.createFullPathOptimistic(zk, LOADBALANCE_BROKERS_ROOT,
"".getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
return zk;
}
protected ZooKeeperClientFactory mockZooKeeperClientFactory = new ZooKeeperClientFactory() {
@Override
public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessionType,
int zkSessionTimeoutMillis) {
// Always return the same instance (so that we don't loose the mock ZK content on broker restart
return CompletableFuture.completedFuture(mockZookKeeper);
}
};
}
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.discovery.service;
import static com.yahoo.pulsar.discovery.service.web.ZookeeperCacheLoader.LOADBALANCE_BROKERS_ROOT;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivateKey;
import java.security.cert.X509Certificate;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.yahoo.pulsar.common.api.Commands;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.common.util.SecurityUtility;
import com.yahoo.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
public class DiscoveryServiceTest extends BaseDiscoveryTestSetup {
private final static String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/certificate/client.crt";
private final static String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/certificate/client.key";
@BeforeMethod
private void init() throws Exception {
super.setup();
}
@AfterMethod
private void clean() throws Exception {
super.cleanup();
}
/**
* Verifies: Discovery-service returns broker is round-robin manner
*
* @throws Exception
*/
@Test
public void testBrokerDiscoveryRoundRobin() throws Exception {
addBrokerToZk(5);
String prevUrl = null;
for (int i = 0; i < 10; i++) {
String current = service.getDiscoveryProvider().nextBroker().getPulsarServiceUrl();
assertNotEquals(prevUrl, current);
prevUrl = current;
}
}
/**
* It verifies: client connects to Discovery-service and receives discovery response successfully.
*
* @throws Exception
*/
@Test
public void testClientServerConnection() throws Exception {
addBrokerToZk(2);
// 1. client connects to DiscoveryService, 2. Client receive service-lookup response
final int messageTransfer = 2;
final CountDownLatch latch = new CountDownLatch(messageTransfer);
NioEventLoopGroup workerGroup = connectToService(service.getServiceUrl(), latch, false);
try {
assertTrue(latch.await(1, TimeUnit.SECONDS));
} catch (InterruptedException e) {
fail("should have received lookup response message from server", e);
}
workerGroup.shutdownGracefully();
}
@Test(enabled = true)
public void testClientServerConnectionTls() throws Exception {
addBrokerToZk(2);
// 1. client connects to DiscoveryService, 2. Client receive service-lookup response
final int messageTransfer = 2;
final CountDownLatch latch = new CountDownLatch(messageTransfer);
NioEventLoopGroup workerGroup = connectToService(service.getServiceUrlTls(), latch, true);
try {
assertTrue(latch.await(1, TimeUnit.SECONDS));
} catch (InterruptedException e) {
fail("should have received lookup response message from server", e);
}
workerGroup.shutdownGracefully();
}
/**
* creates ClientHandler channel to connect and communicate with server
*
* @param serviceUrl
* @param latch
* @return
* @throws URISyntaxException
*/
public static NioEventLoopGroup connectToService(String serviceUrl, CountDownLatch latch, boolean tls)
throws URISyntaxException {
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
if(tls) {
SslContextBuilder builder = SslContextBuilder.forClient();
builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
X509Certificate[] certificates = SecurityUtility.loadCertificatesFromPemFile(TLS_CLIENT_CERT_FILE_PATH);
PrivateKey privateKey = SecurityUtility.loadPrivateKeyFromPemFile(TLS_CLIENT_KEY_FILE_PATH);
builder.keyManager(privateKey, (X509Certificate[]) certificates);
SslContext sslCtx = builder.build();
ch.pipeline().addLast("tls", sslCtx.newHandler(ch.alloc()));
}
ch.pipeline().addLast(new ClientHandler(latch));
}
});
URI uri = new URI(serviceUrl);
InetSocketAddress serviceAddress = new InetSocketAddress(uri.getHost(), uri.getPort());
b.connect(serviceAddress).addListener((ChannelFuture future) -> {
if(!future.isSuccess()) {
throw new IllegalStateException(future.cause());
}
});
return workerGroup;
}
static class ClientHandler extends ChannelInboundHandlerAdapter {
final CountDownLatch latch;
public ClientHandler(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer = (ByteBuf) msg;
buffer.release();
latch.countDown();
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
ctx.writeAndFlush(Commands.newConnect("", ""));
latch.countDown();
}
}
private void addBrokerToZk(int number) throws Exception {
for (int i = 0; i < number; i++) {
LoadReport report = new LoadReport(null, null, "pulsar://broker-:15000" + i, null);
String reportData = ObjectMapperFactory.getThreadLocal().writeValueAsString(report);
ZkUtils.createFullPathOptimistic(mockZookKeeper, LOADBALANCE_BROKERS_ROOT + "/" + "broker-" + i,
reportData.getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
Thread.sleep(100); // wait to get cache updated
}
}
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.discovery.service.server;
import static org.apache.bookkeeper.test.PortManager.nextFreePort;
import static org.testng.Assert.assertTrue;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import org.testng.annotations.Test;
/**
* 1. starts discovery service a. loads broker list from zk 2. http-client calls multiple http request: GET, PUT and
* POST. 3. discovery service redirects to appropriate brokers in round-robin 4. client receives unknown host exception
* with redirected broker
*
*/
public class DiscoveryServiceWebTest {
@Test
public void testWebDiscoveryServiceStarter() throws Exception {
int port = nextFreePort();
File testConfigFile = new File("tmp." + System.currentTimeMillis() + ".properties");
if (testConfigFile.exists()) {
testConfigFile.delete();
}
PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(testConfigFile)));
printWriter.println("zookeeperServers=z1.pulsar.com,z2.pulsar.com,z3.pulsar.com");
printWriter.println("webServicePort=" + port);
printWriter.close();
testConfigFile.deleteOnExit();
final ServiceConfig config = DiscoveryServiceStarter.load(testConfigFile.getAbsolutePath());
final ServerManager server = new ServerManager(config);
DiscoveryServiceStarter.startWebService(server, config);
assertTrue(server.isStarted());
server.stop();
testConfigFile.delete();
}
}
......@@ -104,9 +104,11 @@ public class DiscoveryServiceWebTest extends BaseZKStarterTest{
} catch (KeeperException.NodeExistsException ne) {
// Ok
} catch (KeeperException | InterruptedException e) {
fail("failed while creating broker znodes", e);
e.printStackTrace();
fail("failed while creating broker znodes");
} catch (JsonProcessingException e) {
fail("failed while creating broker znodes", e);
e.printStackTrace();
fail("failed while creating broker znodes");
}
});
......@@ -153,9 +155,11 @@ public class DiscoveryServiceWebTest extends BaseZKStarterTest{
} catch (KeeperException.NodeExistsException ne) {
// Ok
} catch (KeeperException | InterruptedException e) {
fail("failed while creating broker znodes", e);
e.printStackTrace();
fail("failed while creating broker znodes");
} catch (JsonProcessingException e) {
fail("failed while creating broker znodes", e);
e.printStackTrace();
fail("failed while creating broker znodes");
}
});
......@@ -214,8 +218,10 @@ public class DiscoveryServiceWebTest extends BaseZKStarterTest{
} catch (KeeperException.NodeExistsException ne) {
// Ok
} catch (KeeperException | InterruptedException e) {
fail("failed while creating broker znodes", e);
e.printStackTrace();
fail("failed while creating broker znodes");
} catch (JsonProcessingException e) {
e.printStackTrace();
fail("failed while creating broker znodes");
}
});
......
-----BEGIN CERTIFICATE-----
MIIDVjCCAj4CCQCtw/UnTFDT7DANBgkqhkiG9w0BAQUFADBtMQswCQYDVQQGEwJB
VTETMBEGA1UECAwKU29tZS1TdGF0ZTEVMBMGA1UEBwwMRGVmYXVsdCBDaXR5MSEw
HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxDzANBgNVBAMMBmNsaWVu
dDAeFw0xNjA2MjAwMTQ1NDZaFw0yNjA2MTgwMTQ1NDZaMG0xCzAJBgNVBAYTAkFV
MRMwEQYDVQQIDApTb21lLVN0YXRlMRUwEwYDVQQHDAxEZWZhdWx0IENpdHkxITAf
BgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEPMA0GA1UEAwwGY2xpZW50
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAqQV5F3Au9FWXIYPdWqiX
Rk5gdVmVkDuuFK4ZoOd8inoJpB3PPkpmpgoVkKQHDFhgx3ODGWIUgo+n6QDsJxY4
ygHfVeggQgek8iUfteYVsIcHS0bjkhIij/3ihC301FkiqbrV069oLvUXLKcv3zxG
mdBAiz0k4xGZhFieVRvQCLY9syUUxmQ/3Cv42lDY8a1gTw4CRRx/hCfDvXCKhOT4
bMwUIDZfHB3JoDh3Thp8FLz0nTrRF75mSQJ/OdcafIm0Xoz2Otp/CSxLS+U1lLvG
05crWTDe0om7NW4mK4CqGCFq5gUw7eIzaeO7Q5Qez9XGTMzkgIDTMvNYGGEeJhhm
NQIDAQABMA0GCSqGSIb3DQEBBQUAA4IBAQAKXy4g6hljY5MpO8mbZh+uJHq6NEUs
4dr7OKDDWc39AROZsGf2eFUmHOjmRSw7VHpguGKI+rFRELVffpg/VvMh5apu+DBf
jhxtDNceAyh5uugPNUJHXyeikBDYW8bAzUU3DmMldPkTZWcGjurmyhDQ1TtK2YJe
RMFBXw5aAzdJMNi6OfXDH/ZX32hrb482yghDZj+ndnm0FefmLbFTQRMF8/fIHb1W
kqNHwIaapZwH6j/MJy/TRFYcJunrBUYT9zVjY46k3GU0ex/Bn7T4pg9gzgFGZJhn
jQQFKliIC84thCzdlPkrLduLY8tmlDKpLXatbEQ+s1MmNOURm6irPp6g
-----END CERTIFICATE-----
-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCpBXkXcC70VZch
g91aqJdGTmB1WZWQO64Urhmg53yKegmkHc8+SmamChWQpAcMWGDHc4MZYhSCj6fp
AOwnFjjKAd9V6CBCB6TyJR+15hWwhwdLRuOSEiKP/eKELfTUWSKputXTr2gu9Rcs
py/fPEaZ0ECLPSTjEZmEWJ5VG9AItj2zJRTGZD/cK/jaUNjxrWBPDgJFHH+EJ8O9
cIqE5PhszBQgNl8cHcmgOHdOGnwUvPSdOtEXvmZJAn851xp8ibRejPY62n8JLEtL
5TWUu8bTlytZMN7Sibs1biYrgKoYIWrmBTDt4jNp47tDlB7P1cZMzOSAgNMy81gY
YR4mGGY1AgMBAAECggEAcJj3yVhvv0/BhY8+CCYl2K1f7u1GCLbpSleNNTbhLbMM
9yrwo/OWnGg9Y4USOPQrTNOz81X2id+/oSZ/K67PGCvVJ3qi+rny9WkrzdbAfkAF
6O0Jr4arRbeBjkK7Rjc3M1EHH6VLx3R5AsNBzfpuogss5FVQXICd/5+1oscLeLEx
/Fn+51IEn9FUg5vr7ElG51f+zPxexcWHLNoqGjTEIGGtI8/CfTzD9tBV4sIjf/Nc
Zzfs9XYrChfcrS0U1zDa+L7c5gYfoN6M08sBiuZlhyyO9wgzPlp+XnsrSFv6hUta
0scjAbN4bh+orQn6zgFN/sjkQnraWXW7pKFLyTR/IQKBgQDVju4IbhE9XRweNgXi
s3BuGV+HsuFffEf0904/zCuCUcScGb5WCz5+KtlFJ//YxfocHVZajH+4GdCGbWim
m+H3XvRpWgfK/aBNOXu5ueLbnPYyPjTrcpKRsomeoiV+Jz1tv5PQElwzCiCzVvQf
fMyhQT16YIsFQAGJzQMBEHWODQKBgQDKnKps3sKSR3ycUtIxCVXUir7p52qst0Pm
bPO8JrcRKZP2z8MJB96+DcQFzrxj7t5DDktkYEsFOPPuIeUsYXsY+MKHs4hEQVCz
hpDJJNQ8s+SV8TLzKpinZEmLIjslLbn2rQrpqybPg84VxqX3qqM8IrXhMf77aGj6
QHqvQwHWyQKBgQDF1RVO+9++j82ncvY6z22coKath5leIjxqgtqbISFBJUxUK0j2
Xo4yxLDnbqmE/8m1V7wSP8tlGYzhquLiTM+kn/Mc0Ukc0503TMQABmJQfXRYkOXn
IwkCLXltWdoPpnwyeeGNRCTjJ0OpvyiBLtRFobE498xxPZzvMdrRlpS/1QKBgQCo
wmMleUnBQ2/kWQugMnFeLg6kjs+IesFAnYFKN0kGL4aB7j06OWbrEFY0rCS4bA6O
9coQGjCCchSjRXI4TB2XCCQnmX8nsuuADNZt45Iv2XrM9XEFn3Y0/tBO5j0zU2nw
r+NGC/uwns050BMPPf7mqNarctQ6HZZK0wgdEQfoGQKBgC+pbkQv9cn68TsiaJ3w
tvNRTXCIAAH4Vtn9Cp+63ao+kXn94BJqQF99i58kJpG4ol6wbCHUoC6fHgxUh5HB
JB0HjC2eCMgn4acAQg0sPW6l35KX36yYxtrL7eosB/yBYum0XAwmboNjEhlCZkOs
YOpSsn61g7xqqrt40Spb5vUn
-----END PRIVATE KEY-----
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.discovery.service;
import static com.yahoo.pulsar.discovery.service.web.ZookeeperCacheLoader.LOADBALANCE_BROKERS_ROOT;
import static org.apache.bookkeeper.test.PortManager.nextFreePort;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import com.google.common.util.concurrent.MoreExecutors;
import com.yahoo.pulsar.discovery.service.server.ServiceConfig;
import com.yahoo.pulsar.zookeeper.ZooKeeperClientFactory;
import com.yahoo.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
public class BaseDiscoveryTestSetup {
protected ServiceConfig config;
protected DiscoveryService service;
protected MockZooKeeper mockZookKeeper;
private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt";
private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/certificate/server.key";
protected void setup() throws Exception {
config = new ServiceConfig();
config.setServicePort(nextFreePort());
config.setServicePortTls(nextFreePort());
config.setBindOnLocalhost(true);
config.setTlsEnabled(true);
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
mockZookKeeper = createMockZooKeeper();
service = spy(new DiscoveryService(config));
doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory();
service.start();
}
protected void cleanup() throws Exception {
mockZookKeeper.shutdown();
service.close();
}
protected MockZooKeeper createMockZooKeeper() throws Exception {
MockZooKeeper zk = MockZooKeeper.newInstance(MoreExecutors.sameThreadExecutor());
ZkUtils.createFullPathOptimistic(zk, LOADBALANCE_BROKERS_ROOT,
"".getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
return zk;
}
protected ZooKeeperClientFactory mockZooKeeperClientFactory = new ZooKeeperClientFactory() {
@Override
public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessionType,
int zkSessionTimeoutMillis) {
// Always return the same instance (so that we don't loose the mock ZK content on broker restart
return CompletableFuture.completedFuture(mockZookKeeper);
}
};
}
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.discovery.service;
import static com.yahoo.pulsar.discovery.service.web.ZookeeperCacheLoader.LOADBALANCE_BROKERS_ROOT;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivateKey;
import java.security.cert.X509Certificate;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.yahoo.pulsar.common.api.Commands;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.common.util.SecurityUtility;
import com.yahoo.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
public class DiscoveryServiceTest extends BaseDiscoveryTestSetup {
private final static String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/certificate/client.crt";
private final static String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/certificate/client.key";
@BeforeMethod
private void init() throws Exception {
super.setup();
}
@AfterMethod
private void clean() throws Exception {
super.cleanup();
}
/**
* Verifies: Discovery-service returns broker is round-robin manner
*
* @throws Exception
*/
@Test
public void testBrokerDiscoveryRoundRobin() throws Exception {
addBrokerToZk(5);
String prevUrl = null;
for (int i = 0; i < 10; i++) {
String current = service.getDiscoveryProvider().nextBroker().getPulsarServiceUrl();
assertNotEquals(prevUrl, current);
prevUrl = current;
}
}
/**
* It verifies: client connects to Discovery-service and receives discovery response successfully.
*
* @throws Exception
*/
@Test
public void testClientServerConnection() throws Exception {
addBrokerToZk(2);
// 1. client connects to DiscoveryService, 2. Client receive service-lookup response
final int messageTransfer = 2;
final CountDownLatch latch = new CountDownLatch(messageTransfer);
NioEventLoopGroup workerGroup = connectToService(service.getServiceUrl(), latch, false);
try {
assertTrue(latch.await(1, TimeUnit.SECONDS));
} catch (InterruptedException e) {
fail("should have received lookup response message from server", e);
}
workerGroup.shutdownGracefully();
}
@Test(enabled = true)
public void testClientServerConnectionTls() throws Exception {
addBrokerToZk(2);
// 1. client connects to DiscoveryService, 2. Client receive service-lookup response
final int messageTransfer = 2;
final CountDownLatch latch = new CountDownLatch(messageTransfer);
NioEventLoopGroup workerGroup = connectToService(service.getServiceUrlTls(), latch, true);
try {
assertTrue(latch.await(1, TimeUnit.SECONDS));
} catch (InterruptedException e) {
fail("should have received lookup response message from server", e);
}
workerGroup.shutdownGracefully();
}
/**
* creates ClientHandler channel to connect and communicate with server
*
* @param serviceUrl
* @param latch
* @return
* @throws URISyntaxException
*/
public static NioEventLoopGroup connectToService(String serviceUrl, CountDownLatch latch, boolean tls)
throws URISyntaxException {
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
if(tls) {
SslContextBuilder builder = SslContextBuilder.forClient();
builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
X509Certificate[] certificates = SecurityUtility.loadCertificatesFromPemFile(TLS_CLIENT_CERT_FILE_PATH);
PrivateKey privateKey = SecurityUtility.loadPrivateKeyFromPemFile(TLS_CLIENT_KEY_FILE_PATH);
builder.keyManager(privateKey, (X509Certificate[]) certificates);
SslContext sslCtx = builder.build();
ch.pipeline().addLast("tls", sslCtx.newHandler(ch.alloc()));
}
ch.pipeline().addLast(new ClientHandler(latch));
}
});
URI uri = new URI(serviceUrl);
InetSocketAddress serviceAddress = new InetSocketAddress(uri.getHost(), uri.getPort());
b.connect(serviceAddress).addListener((ChannelFuture future) -> {
if(!future.isSuccess()) {
throw new IllegalStateException(future.cause());
}
});
return workerGroup;
}
static class ClientHandler extends ChannelInboundHandlerAdapter {
final CountDownLatch latch;
public ClientHandler(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer = (ByteBuf) msg;
buffer.release();
latch.countDown();
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
ctx.writeAndFlush(Commands.newConnect("", ""));
latch.countDown();
}
}
private void addBrokerToZk(int number) throws Exception {
for (int i = 0; i < number; i++) {
LoadReport report = new LoadReport(null, null, "pulsar://broker-:15000" + i, null);
String reportData = ObjectMapperFactory.getThreadLocal().writeValueAsString(report);
ZkUtils.createFullPathOptimistic(mockZookKeeper, LOADBALANCE_BROKERS_ROOT + "/" + "broker-" + i,
reportData.getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
Thread.sleep(100); // wait to get cache updated
}
}
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.discovery.service.server;
import static org.apache.bookkeeper.test.PortManager.nextFreePort;
import static org.testng.Assert.assertTrue;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import org.testng.annotations.Test;
/**
* 1. starts discovery service a. loads broker list from zk 2. http-client calls multiple http request: GET, PUT and
* POST. 3. discovery service redirects to appropriate brokers in round-robin 4. client receives unknown host exception
* with redirected broker
*
*/
public class DiscoveryServiceWebTest {
@Test
public void testWebDiscoveryServiceStarter() throws Exception {
int port = nextFreePort();
File testConfigFile = new File("tmp." + System.currentTimeMillis() + ".properties");
if (testConfigFile.exists()) {
testConfigFile.delete();
}
PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(testConfigFile)));
printWriter.println("zookeeperServers=z1.pulsar.com,z2.pulsar.com,z3.pulsar.com");
printWriter.println("webServicePort=" + port);
printWriter.close();
testConfigFile.deleteOnExit();
final ServiceConfig config = DiscoveryServiceStarter.load(testConfigFile.getAbsolutePath());
final ServerManager server = new ServerManager(config);
DiscoveryServiceStarter.startWebService(server, config);
assertTrue(server.isStarted());
server.stop();
testConfigFile.delete();
}
}
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.discovery.service.web;
import static com.yahoo.pulsar.discovery.service.web.ZookeeperCacheLoader.LOADBALANCE_BROKERS_ROOT;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import com.google.common.util.concurrent.MoreExecutors;
import com.yahoo.pulsar.zookeeper.ZooKeeperClientFactory;
import com.yahoo.pulsar.zookeeper.ZookeeperClientFactoryImpl;
public class BaseZKStarterTest {
protected MockZooKeeper mockZookKeeper;
protected void start() throws Exception {
mockZookKeeper = createMockZooKeeper();
}
protected void close() throws Exception {
mockZookKeeper.shutdown();
}
/**
* Create MockZookeeper instance
* @return
* @throws Exception
*/
protected MockZooKeeper createMockZooKeeper() throws Exception {
MockZooKeeper zk = MockZooKeeper.newInstance(MoreExecutors.sameThreadExecutor());
ZkUtils.createFullPathOptimistic(zk, LOADBALANCE_BROKERS_ROOT,
"".getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
return zk;
}
protected static class DiscoveryZooKeeperClientFactoryImpl implements ZooKeeperClientFactory {
static ZooKeeper zk;
@Override
public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessionType,
int zkSessionTimeoutMillis) {
return CompletableFuture.completedFuture(zk);
}
}
}
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.discovery.service.web;
import static com.yahoo.pulsar.discovery.service.web.ZookeeperCacheLoader.LOADBALANCE_BROKERS_ROOT;
import static javax.ws.rs.core.Response.Status.BAD_GATEWAY;
import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static org.apache.bookkeeper.test.PortManager.nextFreePort;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.filter.LoggingFilter;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.yahoo.pulsar.common.policies.data.BundlesData;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.discovery.service.server.DiscoveryServiceStarter;
import com.yahoo.pulsar.discovery.service.server.ServerManager;
import com.yahoo.pulsar.discovery.service.server.ServiceConfig;
import com.yahoo.pulsar.discovery.service.web.DiscoveryServiceServlet;
import com.yahoo.pulsar.discovery.service.web.RestException;
import com.yahoo.pulsar.discovery.service.web.ZookeeperCacheLoader;
import com.yahoo.pulsar.zookeeper.ZooKeeperClientFactory;
import com.yahoo.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
/**
* 1. starts discovery service a. loads broker list from zk 2. http-client calls multiple http request: GET, PUT and
* POST. 3. discovery service redirects to appropriate brokers in round-robin 4. client receives unknown host exception
* with redirected broker
*
*/
public class DiscoveryServiceWebTest extends BaseZKStarterTest{
private Client client = ClientBuilder.newClient(new ClientConfig().register(LoggingFilter.class));
private static final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt";
private static final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/certificate/server.key";
@BeforeMethod
private void init() throws Exception {
start();
}
@AfterMethod
private void cleanup() throws Exception {
close();
}
@Test
public void testNextBroker() throws Exception {
// 1. create znode for each broker
List<String> brokers = Lists.newArrayList("broker-1", "broker-2", "broker-3");
brokers.stream().forEach(broker -> {
try {
LoadReport report = new LoadReport(broker, null, null, null);
String reportData = ObjectMapperFactory.getThreadLocal().writeValueAsString(report);
ZkUtils.createFullPathOptimistic(mockZookKeeper, LOADBALANCE_BROKERS_ROOT + "/" + broker,
reportData.getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException ne) {
// Ok
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
fail("failed while creating broker znodes");
} catch (JsonProcessingException e) {
e.printStackTrace();
fail("failed while creating broker znodes");
}
});
// 2. Setup discovery-zkcache
DiscoveryServiceServlet discovery = new DiscoveryServiceServlet();
DiscoveryZooKeeperClientFactoryImpl.zk = mockZookKeeper;
Field zkCacheField = DiscoveryServiceServlet.class.getDeclaredField("zkCache");
zkCacheField.setAccessible(true);
ZookeeperCacheLoader zkCache = new ZookeeperCacheLoader(new DiscoveryZooKeeperClientFactoryImpl(),
"zk-test-servers");
zkCacheField.set(discovery, zkCache);
// 3. verify nextBroker functionality : round-robin in broker list
for (String broker : brokers) {
assertEquals(broker, discovery.nextBroker().getWebServiceUrl());
}
}
@Test
public void testRiderectUrlWithServerStarted() throws Exception {
// 1. start server
int port = nextFreePort();
ServiceConfig config = new ServiceConfig();
config.setWebServicePort(port);
ServerManager server = new ServerManager(config);
DiscoveryZooKeeperClientFactoryImpl.zk = mockZookKeeper;
Map<String, String> params = new TreeMap<>();
params.put("zookeeperServers", "dummy-value");
params.put("zookeeperClientFactoryClass", DiscoveryZooKeeperClientFactoryImpl.class.getName());
server.addServlet("/", DiscoveryServiceServlet.class, params);
server.start();
// 2. create znode for each broker
List<String> brokers = Lists.newArrayList("broker-1", "broker-2", "broker-3");
brokers.stream().forEach(b -> {
try {
final String broker = b + ":15000";
LoadReport report = new LoadReport("http://" + broker, null, null, null);
String reportData = ObjectMapperFactory.getThreadLocal().writeValueAsString(report);
ZkUtils.createFullPathOptimistic(mockZookKeeper, LOADBALANCE_BROKERS_ROOT + "/" + broker,
reportData.getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException ne) {
// Ok
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
fail("failed while creating broker znodes");
} catch (JsonProcessingException e) {
e.printStackTrace();
fail("failed while creating broker znodes");
}
});
String serviceUrl = server.getServiceUri().toString();
String requestUrl = serviceUrl + "admin/namespaces/p1/c1/n1";
/**
* 3. verify : every time when vip receives a request: it redirects to above brokers sequentially and client
* must get unknown host exception with above brokers in a sequential manner.
**/
assertEquals(brokers, validateRequest(brokers, HttpMethod.PUT, requestUrl, new BundlesData(1)),
"redirection failed");
assertEquals(brokers, validateRequest(brokers, HttpMethod.GET, requestUrl, null), "redirection failed");
assertEquals(brokers, validateRequest(brokers, HttpMethod.POST, requestUrl, new BundlesData(1)),
"redirection failed");
server.stop();
}
@Test
public void testTlsEnable() throws Exception {
// 1. start server with tls enable
int port = nextFreePort();
int tlsPort = nextFreePort();
ServiceConfig config = new ServiceConfig();
config.setWebServicePort(port);
config.setWebServicePortTls(tlsPort);
config.setTlsEnabled(true);
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
ServerManager server = new ServerManager(config);
DiscoveryZooKeeperClientFactoryImpl.zk = mockZookKeeper;
Map<String, String> params = new TreeMap<>();
params.put("zookeeperServers", "dummy-value");
params.put("zookeeperClientFactoryClass", DiscoveryZooKeeperClientFactoryImpl.class.getName());
server.addServlet("/", DiscoveryServiceServlet.class, params);
server.start();
// 2. get ZookeeperCacheLoader to add more brokers
final String redirect_broker_host = "broker-1";
List<String> brokers = Lists.newArrayList(redirect_broker_host);
brokers.stream().forEach(b -> {
try {
final String brokerUrl = b + ":" + port;
final String brokerUrlTls = b + ":" + tlsPort;
LoadReport report = new LoadReport("http://" + brokerUrl, "https://" + brokerUrlTls, null, null);
String reportData = ObjectMapperFactory.getThreadLocal().writeValueAsString(report);
ZkUtils.createFullPathOptimistic(mockZookKeeper, LOADBALANCE_BROKERS_ROOT + "/" + brokerUrl,
reportData.getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException ne) {
// Ok
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
fail("failed while creating broker znodes");
} catch (JsonProcessingException e) {
e.printStackTrace();
fail("failed while creating broker znodes");
}
});
// 3. https request with tls enable at server side
String serviceUrl = String.format("https://localhost:%s/", tlsPort);
String requestUrl = serviceUrl + "admin/namespaces/p1/c1/n1";
KeyManager[] keyManagers = null;
TrustManager[] trustManagers = InsecureTrustManagerFactory.INSTANCE.getTrustManagers();
SSLContext sslCtx = SSLContext.getInstance("TLS");
sslCtx.init(keyManagers, trustManagers, new SecureRandom());
HttpsURLConnection.setDefaultSSLSocketFactory(sslCtx.getSocketFactory());
try {
InputStream response = new URL(requestUrl).openStream();
fail("it should give unknown host exception as: discovery service redirects request to: "
+ redirect_broker_host);
} catch (Exception e) {
// 4. Verify: server accepts https request and redirected to one of the available broker host defined into
// zk. and as broker-service is not up: it should give "UnknownHostException with host=broker-url"
String host = e.getLocalizedMessage();
assertEquals(e.getClass(), UnknownHostException.class);
assertTrue(host.startsWith(redirect_broker_host));
}
server.stop();
}
@Test
public void testException() {
RestException exception1 = new RestException(BAD_GATEWAY, "test-msg");
assertTrue(exception1.getMessage().contains(BAD_GATEWAY.toString()));
RestException exception2 = new RestException(BAD_GATEWAY.getStatusCode(), "test-msg");
assertTrue(exception2.getMessage().contains(BAD_GATEWAY.toString()));
RestException exception3 = new RestException(exception2);
assertTrue(exception3.getMessage().contains(INTERNAL_SERVER_ERROR.toString()));
assertTrue(RestException.getExceptionData(exception2).contains(BAD_GATEWAY.toString()));
}
public List<String> validateRequest(List<String> brokers, String method, String url, BundlesData bundle) {
List<String> redirectBrokers = brokers.stream().map(broker -> {
String redirectedBroker = null;
try {
WebTarget webTarget = client.target(url);
Invocation.Builder invocationBuilder = webTarget.request(MediaType.APPLICATION_JSON);
if (HttpMethod.PUT.equals(method)) {
invocationBuilder.put(Entity.entity(bundle, MediaType.APPLICATION_JSON));
fail();
} else if (HttpMethod.GET.equals(method)) {
invocationBuilder.get();
fail();
} else if (HttpMethod.POST.equals(method)) {
invocationBuilder.post(Entity.entity(bundle, MediaType.APPLICATION_JSON));
fail();
} else {
fail("Unsupported http method");
}
} catch (Exception e) {
if (e.getCause() instanceof UnknownHostException) {
redirectedBroker = e.getCause().getMessage().split(":")[0];
} else {
// fail
fail();
}
}
return redirectedBroker;
}).collect(Collectors.toList());
return redirectBrokers;
}
}
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.discovery.service.web;
import static com.yahoo.pulsar.discovery.service.web.ZookeeperCacheLoader.LOADBALANCE_BROKERS_ROOT;
import static org.testng.Assert.fail;
import java.io.IOException;
import java.util.List;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.beust.jcommander.internal.Lists;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.discovery.service.web.ZookeeperCacheLoader;
import com.yahoo.pulsar.discovery.service.web.BaseZKStarterTest.DiscoveryZooKeeperClientFactoryImpl;
import com.yahoo.pulsar.zookeeper.MockedZooKeeperClientFactoryImpl;
import com.yahoo.pulsar.zookeeper.ZooKeeperClientFactory;
import com.yahoo.pulsar.zookeeper.ZookeeperClientFactoryImpl;
public class ZookeeperCacheLoaderTest extends BaseZKStarterTest {
@BeforeMethod
private void init() throws Exception {
start();
}
@AfterMethod
private void cleanup() throws Exception {
close();
}
/**
* Create znode for available broker in ZooKeeper and updates it again to verify ZooKeeper cache update
*
* @throws InterruptedException
* @throws KeeperException
* @throws IOException
*/
@Test
public void testZookeeperCacheLoader() throws InterruptedException, KeeperException, Exception {
DiscoveryZooKeeperClientFactoryImpl.zk = mockZookKeeper;
ZookeeperCacheLoader zkLoader = new ZookeeperCacheLoader(new DiscoveryZooKeeperClientFactoryImpl(), "");
List<String> brokers = Lists.newArrayList("broker-1:15000", "broker-2:15000", "broker-3:15000");
// 1. create znode for each broker
brokers.stream().forEach(b -> {
try {
zkLoader.getLocalZkCache().getZooKeeper().create(LOADBALANCE_BROKERS_ROOT + "/" + b, new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException | InterruptedException e) {
fail("failed while creating broker znodes");
}
});
Thread.sleep(100); // wait for 100 msec: to get cache updated
// 2. get available brokers from ZookeeperCacheLoader
List<LoadReport> list = zkLoader.getAvailableBrokers();
// 3. verify retrieved broker list
Assert.assertTrue(brokers.containsAll(list));
// 4.a add new broker
zkLoader.getLocalZkCache().getZooKeeper().create(LOADBALANCE_BROKERS_ROOT + "/" + "broker-4:15000", new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
brokers.add("broker-4:15000");
Thread.sleep(100); // wait for 100 msec: to get cache updated
// 4.b. get available brokers from ZookeeperCacheLoader
list = zkLoader.getAvailableBrokers();
// 4.c. verify retrieved broker list
Assert.assertTrue(brokers.containsAll(list));
}
}
......@@ -204,6 +204,17 @@ public abstract class ZooKeeperCache implements Watcher {
return getData(path, this, deserializer).map(e -> e.getKey());
}
public <T> CompletableFuture<Optional<T>> getDataAsync(final String path, final Deserializer<T> deserializer) {
CompletableFuture<Optional<T>> future = new CompletableFuture<>();
getDataAsync(path, this, deserializer).thenAccept(data -> {
future.complete(data.map(e -> e.getKey()));
}).exceptionally(ex -> {
future.complete(Optional.empty());
return null;
});
return future;
}
/**
* Cache that implements automatic reloading on update will pass a different Watcher object to reload cache entry
* automatically
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册