未验证 提交 2777b0e4 编写于 作者: J Jia Zhai 提交者: GitHub

Issue #3653: Kerberos authentication for web resource support (#4097)

Fixes #3653

Master Issue: #3491

** Motivation
Add kerberos support for web resource support.
This mainly include 2 parts:

- the HttpClient that works for HttpLookup.
- the BaseResource that works for admin rest end point.

*** Modifications
For kerberos authentication, there need several back/forth requests to do the negotiation between client and server.
This change add a method authenticationStage in AuthenticationSasl, and a method authenticateHttpRequest in AuthenticationProviderSasl to do the mutual negotiation.
And a saslRoleToken is cached in AuthenticationSasl once the authentication get success.
When do the sasl authentication, it will first use saslRoleToken cache, and if sever check this token failed, do real sasl authentication.
Changed unit test SaslAuthenticateTest, which enable sasl authentication in admin and also use http lookup to verify the change.
上级 c642e5d9
......@@ -351,11 +351,6 @@ tokenAuthClaim=
### --- SASL Authentication Provider --- ###
# Whether Use SASL Authentication or not.
# TODO: used to bypass web resource check. will remove it after implementation the support.
# github issue #3653 {@link: https://github.com/apache/pulsar/issues/3653}
isSaslAuthentication=
# This is a regexp, which limits the range of possible ids which can connect to the Broker using SASL.
# Default value: `SaslConstants.JAAS_CLIENT_ALLOWED_IDS_DEFAULT`, which is ".*pulsar.*",
# so only clients whose id contains 'pulsar' are allowed to connect.
......
......@@ -18,21 +18,39 @@
*/
package org.apache.pulsar.broker.authentication;
import static org.apache.pulsar.common.sasl.SaslConstants.JAAS_SERVER_SECTION_NAME;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedDataAttributeName;
import static org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedRoleAttributeName;
import static org.apache.pulsar.common.sasl.SaslConstants.JAAS_CLIENT_ALLOWED_IDS;
import static org.apache.pulsar.common.sasl.SaslConstants.JAAS_SERVER_SECTION_NAME;
import static org.apache.pulsar.common.sasl.SaslConstants.KINIT_COMMAND;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_AUTH_ROLE_TOKEN;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_AUTH_ROLE_TOKEN_EXPIRED;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_AUTH_TOKEN;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_HEADER_STATE;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_CLIENT_INIT;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_COMPLETE;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_NEGOTIATE;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_SERVER;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_SERVER_CHECK_TOKEN;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.Base64;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import javax.security.auth.login.LoginException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import com.google.common.collect.Maps;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.api.AuthData;
......@@ -76,6 +94,8 @@ public class AuthenticationProviderSasl implements AuthenticationProvider {
throw new IOException(e);
}
}
this.signer = new SaslRoleTokenSigner(Long.toString(new Random().nextLong()).getBytes());
}
@Override
......@@ -109,4 +129,167 @@ public class AuthenticationProviderSasl implements AuthenticationProvider {
throw new AuthenticationException(t.getMessage());
}
}
// for http auth.
private static final long SASL_ROLE_TOKEN_LIVE_SECONDS = 3600;
// A signer for http role token, with random secret.
private SaslRoleTokenSigner signer;
/**
* Returns null if authentication has not completed.
* Return auth role if authentication has completed, and httpRequest's role token contains the authRole
*/
public String authRoleFromHttpRequest(HttpServletRequest httpRequest) throws AuthenticationException {
String tokenStr = httpRequest.getHeader(SASL_AUTH_ROLE_TOKEN);
if (tokenStr == null) {
return null;
}
String unSigned = signer.verifyAndExtract(tokenStr);
SaslRoleToken token;
try {
token = SaslRoleToken.parse(unSigned);
if (log.isDebugEnabled()) {
log.debug("server side get role token: {}, session in token:{}, session in request:{}",
token, token.getSession(), httpRequest.getRemoteAddr());
}
} catch (Exception e) {
log.error("token parse failed, with exception: ", e);
return SASL_AUTH_ROLE_TOKEN_EXPIRED;
}
if (!token.isExpired()) {
return token.getUserRole();
} else if (token.isExpired()) {
return SASL_AUTH_ROLE_TOKEN_EXPIRED;
} else {
return null;
}
}
private String createAuthRoleToken(String role, String sessionId) {
long expireAtMs = System.currentTimeMillis() + SASL_ROLE_TOKEN_LIVE_SECONDS * 1000; // 1 hour
SaslRoleToken token = new SaslRoleToken(role, sessionId, expireAtMs);
String signed = signer.sign(token.toString());
if (log.isDebugEnabled()) {
log.debug("create role token token: {}, role: {} session :{}, expires:{}\nsigned:{}",
token, token.getUserRole(), token.getSession(), token.getExpires(), signed);
}
return signed;
}
private ConcurrentHashMap<Long, AuthenticationState> authStates = new ConcurrentHashMap<>();
// return authState if it is in cache.
private AuthenticationState getAuthState(HttpServletRequest request) {
String id = request.getHeader(SASL_STATE_SERVER);
if (id == null) {
return null;
}
try {
return authStates.get(Long.parseLong(id));
} catch (NumberFormatException e) {
log.error("[{}] Wrong Id String in Token {}. e:", request.getRequestURI(),
id, e);
return null;
}
}
private void setResponseHeaderState(HttpServletResponse response, String state) {
response.setHeader(SaslConstants.SASL_HEADER_TYPE, SaslConstants.SASL_TYPE_VALUE);
response.setHeader(SASL_HEADER_STATE, state);
}
/**
* Passed in request, set response, according to request.
* and return whether we should do following chain.doFilter or not.
*/
@Override
public boolean authenticateHttpRequest(HttpServletRequest request, HttpServletResponse response) throws Exception {
AuthenticationState state = getAuthState(request);
String saslAuthRoleToken = authRoleFromHttpRequest(request);
// role token exist
if (saslAuthRoleToken != null) {
// role token expired, send role token expired to client.
if (saslAuthRoleToken.equalsIgnoreCase(SASL_AUTH_ROLE_TOKEN_EXPIRED)) {
setResponseHeaderState(response, SASL_AUTH_ROLE_TOKEN_EXPIRED);
response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Role token expired");
if (log.isDebugEnabled()) {
log.debug("[{}] Server side role token expired: {}", request.getRequestURI(), saslAuthRoleToken);
}
return false;
}
// role token OK to use,
// if request is ask for role token verify, send auth complete to client
// if request is a real request with valid role token, pass this request down.
if (request.getHeader(SASL_HEADER_STATE).equalsIgnoreCase(SASL_STATE_COMPLETE)) {
request.setAttribute(AuthenticatedRoleAttributeName, saslAuthRoleToken);
request.setAttribute(AuthenticatedDataAttributeName,
new AuthenticationDataHttps(request));
if (log.isDebugEnabled()) {
log.debug("[{}] Server side role token OK to go on: {}", request.getRequestURI(), saslAuthRoleToken);
}
return true;
} else {
checkState(request.getHeader(SASL_HEADER_STATE).equalsIgnoreCase(SASL_STATE_SERVER_CHECK_TOKEN));
setResponseHeaderState(response, SASL_STATE_COMPLETE);
response.setHeader(SASL_STATE_SERVER, request.getHeader(SASL_STATE_SERVER));
response.setStatus(HttpServletResponse.SC_OK);
if (log.isDebugEnabled()) {
log.debug("[{}] Server side role token verified success: {}", request.getRequestURI(), saslAuthRoleToken);
}
return false;
}
} else {
// no role token, do sasl auth
// need new authState
if (state == null || request.getHeader(SASL_HEADER_STATE).equalsIgnoreCase(SASL_STATE_CLIENT_INIT)) {
state = newAuthState(null, null, null);
authStates.put(state.getStateId(), state);
}
checkState(request.getHeader(SASL_AUTH_TOKEN) != null,
"Header token should exist if no role token.");
// do the sasl auth
AuthData clientData = AuthData.of(Base64.getDecoder().decode(
request.getHeader(SASL_AUTH_TOKEN)));
AuthData brokerData = state.authenticate(clientData);
// authentication has completed, it has get the auth role.
if (state.isComplete()) {
if (log.isDebugEnabled()) {
log.debug("[{}] SASL server authentication complete, send OK to client.", request.getRequestURI());
}
String authRole = state.getAuthRole();
String authToken = createAuthRoleToken(authRole, String.valueOf(state.getStateId()));
response.setHeader(SASL_AUTH_ROLE_TOKEN, authToken);
// auth request complete, return OK, wait for a new real request to come.
response.setHeader(SASL_STATE_SERVER, String.valueOf(state.getStateId()));
setResponseHeaderState(response, SASL_STATE_COMPLETE);
response.setStatus(HttpServletResponse.SC_OK);
// auth completed, no need to keep authState
authStates.remove(state.getStateId());
return false;
} else {
// auth not complete
if (log.isDebugEnabled()) {
log.debug("[{}] SASL server authentication not complete, send {} back to client.",
request.getRequestURI(), HttpServletResponse.SC_UNAUTHORIZED);
}
setResponseHeaderState(response, SASL_STATE_NEGOTIATE);
response.setHeader(SASL_STATE_SERVER, String.valueOf(state.getStateId()));
response.setHeader(SASL_AUTH_TOKEN, Base64.getEncoder().encodeToString(brokerData.getBytes()));
response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "SASL Authentication not complete.");
return false;
}
}
}
}
......@@ -186,18 +186,6 @@ public class PulsarSaslServer {
ac.setAuthorized(true);
log.info("Successfully authenticated client: authenticationID: {}; authorizationID: {}.",
authenticationID, authorizationID);
KerberosName kerberosName = new KerberosName(authenticationID);
try {
StringBuilder userNameBuilder = new StringBuilder(kerberosName.getShortName());
userNameBuilder.append("/").append(kerberosName.getHostName());
userNameBuilder.append("@").append(kerberosName.getRealm());
log.info("Setting authorizedID: {} ", userNameBuilder);
ac.setAuthorizedID(userNameBuilder.toString());
} catch (IOException e) {
log.error("Failed to set name based on Kerberos authentication rules.");
}
}
}
}
......@@ -53,10 +53,4 @@ public class SaslAuthenticationDataSource implements AuthenticationDataSource {
public String getAuthorizationID() {
return pulsarSaslServer.getAuthorizationID();
}
// TODO: for http support. github issue #3653 {@link: https://github.com/apache/pulsar/issues/3653}
/* default boolean hasDataFromHttp() {
return false;
}*/
}
......@@ -21,8 +21,11 @@ package org.apache.pulsar.broker.authentication;
import static com.google.common.base.Preconditions.checkArgument;
import java.util.concurrent.atomic.AtomicLong;
import javax.naming.AuthenticationException;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.api.AuthData;
/**
......@@ -31,10 +34,14 @@ import org.apache.pulsar.common.api.AuthData;
* It is basically holding the the authentication state.
* It tell broker whether the authentication is completed or not,
*/
@Slf4j
public class SaslAuthenticationState implements AuthenticationState {
private SaslAuthenticationDataSource authenticationDataSource;
private final long stateId;
private static final AtomicLong stateIdGenerator = new AtomicLong(0L);
private final SaslAuthenticationDataSource authenticationDataSource;
public SaslAuthenticationState(AuthenticationDataSource authenticationDataSource) {
stateId = stateIdGenerator.incrementAndGet();
checkArgument(authenticationDataSource instanceof SaslAuthenticationDataSource);
this.authenticationDataSource = (SaslAuthenticationDataSource)authenticationDataSource;
}
......@@ -58,9 +65,15 @@ public class SaslAuthenticationState implements AuthenticationState {
* Returns null if authentication has completed, and no auth data is required to send back to client.
* Do auth and Returns the auth data back to client, if authentication has not completed.
*/
@Override
public AuthData authenticate(AuthData authData) throws AuthenticationException {
return authenticationDataSource.authenticate(authData);
}
@Override
public long getStateId() {
return stateId;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.authentication;
import java.security.Principal;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
import javax.naming.AuthenticationException;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SaslRoleToken implements Principal {
/**
* Constant that identifies an anonymous request.
*/
public static final SaslRoleToken ANONYMOUS = new SaslRoleToken();
private static final String ATTR_SEPARATOR = "&";
private static final String USER_ROLE = "u";
private static final String EXPIRES = "e";
private static final String SESSION = "i";
private final static Set<String> ATTRIBUTES =
new HashSet<String>(Arrays.asList(USER_ROLE, EXPIRES, SESSION));
private String userRole;
private String session;
private long expires;
private String token;
private SaslRoleToken() {
userRole = null;
session = null;
expires = -1;
token = "ANONYMOUS";
generateToken();
}
private static final String ILLEGAL_ARG_MSG = " is NULL, empty or contains a '" + ATTR_SEPARATOR + "'";
/**
* Creates an authentication token.
*
* @param userRole user name.
* @param session the sessionId.
* (<code>System.currentTimeMillis() + validityPeriod</code>).
*/
public SaslRoleToken(String userRole, String session) {
checkForIllegalArgument(session, "session");
this.userRole = userRole;
this.session = session;
this.expires = -1;
generateToken();
}
public SaslRoleToken(String userRole, String session, long expires) {
checkForIllegalArgument(userRole, "userRole");
checkForIllegalArgument(session, "session");
this.userRole = userRole;
this.session = session;
this.expires = expires;
generateToken();
}
/**
* Check if the provided value is invalid. Throw an error if it is invalid, NOP otherwise.
*
* @param value the value to check.
* @param name the parameter name to use in an error message if the value is invalid.
*/
private static void checkForIllegalArgument(String value, String name) {
if (value == null || value.length() == 0 || value.contains(ATTR_SEPARATOR)) {
throw new IllegalArgumentException(name + ILLEGAL_ARG_MSG);
}
}
/**
* Sets the expiration of the token.
*
* @param expires expiration time of the token in milliseconds since the epoch.
*/
public void setExpires(long expires) {
if (this != SaslRoleToken.ANONYMOUS) {
this.expires = expires;
generateToken();
}
}
/**
* Generates the token.
*/
private void generateToken() {
StringBuffer sb = new StringBuffer();
sb.append(USER_ROLE).append("=").append(getUserRole()).append(ATTR_SEPARATOR);
sb.append(SESSION).append("=").append(getSession()).append(ATTR_SEPARATOR);
sb.append(EXPIRES).append("=").append(getExpires());
token = sb.toString();
}
/**
* Returns the user name.
*
* @return the user name.
*/
public String getUserRole() {
return userRole;
}
/**
* Returns the principal name (this method name comes from the JDK {@link Principal} interface).
*
* @return the principal name.
*/
@Override
public String getName() {
return userRole;
}
/**
* Returns the authentication mechanism of the token.
*
* @return the authentication mechanism of the token.
*/
public String getSession() {
return session;
}
/**
* Returns the expiration time of the token.
*
* @return the expiration time of the token, in milliseconds since Epoc.
*/
public long getExpires() {
return expires;
}
/**
* Returns if the token has expired.
*
* @return if the token has expired.
*/
public boolean isExpired() {
return getExpires() != -1 && System.currentTimeMillis() > getExpires();
}
/**
* Returns the string representation of the token.
* <p/>
* This string representation is parseable by the {@link #parse} method.
*
* @return the string representation of the token.
*/
@Override
public String toString() {
return token;
}
/**
* Parses a string into an authentication token.
*
* @param tokenStr string representation of a token.
*
* @return the parsed authentication token.
*
* @throws AuthenticationException thrown if the string representation could not be parsed into
* an authentication token.
*/
public static SaslRoleToken parse(String tokenStr) throws AuthenticationException {
Map<String, String> map = split(tokenStr);
if (!map.keySet().equals(ATTRIBUTES)) {
throw new AuthenticationException("Invalid token string, missing attributes");
}
long expires = Long.parseLong(map.get(EXPIRES));
SaslRoleToken token = new SaslRoleToken(map.get(USER_ROLE), map.get(SESSION));
token.setExpires(expires);
return token;
}
/**
* Splits the string representation of a token into attributes pairs.
*
* @param tokenStr string representation of a token.
*
* @return a map with the attribute pairs of the token.
*
* @throws AuthenticationException thrown if the string representation of the token could not be broken into
* attribute pairs.
*/
private static Map<String, String> split(String tokenStr) throws AuthenticationException {
Map<String, String> map = new HashMap<String, String>();
StringTokenizer st = new StringTokenizer(tokenStr, ATTR_SEPARATOR);
while (st.hasMoreTokens()) {
String part = st.nextToken();
int separator = part.indexOf('=');
if (separator == -1) {
throw new AuthenticationException("Invalid authentication token");
}
String key = part.substring(0, separator);
String value = part.substring(separator + 1);
map.put(key, value);
}
return map;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.authentication;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import javax.naming.AuthenticationException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Base64;
@Slf4j
public class SaslRoleTokenSigner {
private static final String SIGNATURE = "&s=";
private byte[] secret;
/**
* Creates a SaslRoleTokenSigner instance using the specified secret.
*
* @param secret secret to use for creating the digest.
*/
public SaslRoleTokenSigner(byte[] secret) {
if (secret == null) {
throw new IllegalArgumentException("secret cannot be NULL");
}
this.secret = secret.clone();
}
/**
* Returns a signed string.
* <p/>
* The signature '&s=SIGNATURE' is appended at the end of the string.
*
* @param str string to sign.
*
* @return the signed string.
*/
public String sign(String str) {
if (str == null || str.length() == 0) {
throw new IllegalArgumentException("NULL or empty string to sign");
}
String signature = computeSignature(str);
return str + SIGNATURE + signature;
}
/**
* Verifies a signed string and extracts the original string.
*
* @param signedStr the signed string to verify and extract.
*
* @return the extracted original string.
*
* @throws AuthenticationException thrown if the given string is not a signed string or if the signature is invalid.
*/
public String verifyAndExtract(String signedStr) throws AuthenticationException {
int index = signedStr.lastIndexOf(SIGNATURE);
if (index == -1) {
throw new AuthenticationException("Invalid signed text: " + signedStr);
}
String originalSignature = signedStr.substring(index + SIGNATURE.length());
String rawValue = signedStr.substring(0, index);
String currentSignature = computeSignature(rawValue);
if (!originalSignature.equals(currentSignature)) {
throw new AuthenticationException("Invalid signature");
}
return rawValue;
}
/**
* Returns the signature of a string.
*
* @param str string to sign.
*
* @return the signature for the string.
*/
protected String computeSignature(String str) {
try {
MessageDigest md = MessageDigest.getInstance("SHA");
md.update(str.getBytes());
md.update(secret);
byte[] digest = md.digest();
return new Base64(0).encodeToString(digest);
} catch (NoSuchAlgorithmException ex) {
throw new RuntimeException("It should not happen, " + ex.getMessage(), ex);
}
}
}
......@@ -30,10 +30,12 @@ import java.util.concurrent.TimeUnit;
import javax.security.auth.login.Configuration;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.bookkeeper.test.PortManager;
import org.apache.commons.io.FileUtils;
import org.apache.curator.shaded.com.google.common.collect.Maps;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.Consumer;
......@@ -99,7 +101,7 @@ public class ProxySaslAuthenticationTest extends ProducerConsumerBase {
File keytabProxy = new File(kerberosWorkDir, "pulsarproxy.keytab");
kdc.createPrincipal(keytabProxy, principalProxyNoRealm);
File jaasFile = new File(kerberosWorkDir, "jaas.properties");
File jaasFile = new File(kerberosWorkDir, "jaas.conf");
try (FileWriter writer = new FileWriter(jaasFile)) {
writer.write("\n"
+ "PulsarBroker {\n"
......@@ -135,7 +137,7 @@ public class ProxySaslAuthenticationTest extends ProducerConsumerBase {
);
}
File krb5file = new File(kerberosWorkDir, "krb5.properties");
File krb5file = new File(kerberosWorkDir, "krb5.conf");
try (FileWriter writer = new FileWriter(krb5file)) {
String conf = "[libdefaults]\n"
+ " default_realm = " + kdc.getRealm() + "\n"
......@@ -147,11 +149,11 @@ public class ProxySaslAuthenticationTest extends ProducerConsumerBase {
+ " kdc = " + kdc.getHost() + ":" + kdc.getPort() + "\n"
+ " }";
writer.write(conf);
log.info("krb5.properties:\n" + conf);
log.info("krb5.conf:\n" + conf);
}
System.setProperty("java.security.auth.login.config", jaasFile.getAbsolutePath());
System.setProperty("java.security.krb5.properties", krb5file.getAbsolutePath());
System.setProperty("java.security.krb5.conf", krb5file.getAbsolutePath());
Configuration.getConfiguration().refresh();
// Client config
......@@ -162,7 +164,7 @@ public class ProxySaslAuthenticationTest extends ProducerConsumerBase {
@AfterClass
public static void stopMiniKdc() {
System.clearProperty("java.security.auth.login.config");
System.clearProperty("java.security.krb5.properties");
System.clearProperty("java.security.krb5.conf");
if (kdc != null) {
kdc.stop();
}
......@@ -181,18 +183,27 @@ public class ProxySaslAuthenticationTest extends ProducerConsumerBase {
isTcpLookup = true;
conf.setAdvertisedAddress(localHostname);
conf.setAuthenticationEnabled(true);
conf.setSaslAuthentication(true);
conf.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
conf.setSaslJaasServerSectionName("PulsarBroker");
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderSasl.class.getName());
conf.setAuthenticationProviders(providers);
conf.setClusterName("test");
conf.setSuperUserRoles(ImmutableSet.of("client/" + localHostname + "@" + kdc.getRealm()));
super.init();
lookupUrl = new URI("broker://" + "localhost" + ":" + BROKER_PORT);
// set admin auth, to verify admin web resources
Map<String, String> clientSaslConfig = Maps.newHashMap();
clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
clientSaslConfig.put("serverType", "broker");
log.info("set client jaas section name: PulsarClient");
admin = PulsarAdmin.builder()
.serviceHttpUrl(brokerUrl.toString())
.authentication(AuthenticationFactory.create(AuthenticationSasl.class.getName(), clientSaslConfig))
.build();
super.producerBaseSetup();
log.info("-- {} --, end.", methodName);
}
......@@ -208,7 +219,6 @@ public class ProxySaslAuthenticationTest extends ProducerConsumerBase {
log.info("-- Starting {} test --", methodName);
// Step 1: Create Admin Client
//updateAdminClient();
final String proxyServiceUrl = "pulsar://localhost:" + servicePort;
// create a client which connects to proxy and pass authData
String topicName = "persistent://my-property/my-ns/my-topic1";
......@@ -223,9 +233,6 @@ public class ProxySaslAuthenticationTest extends ProducerConsumerBase {
// proxy connect to broker
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
/*proxyConfig.setBrokerClientAuthenticationParameters(
"{\"saslJaasClientSectionName\": " + "\"PulsarProxy\"," +
"\"serverType\": " + "\"broker\"}");*/
proxyConfig.setBrokerClientAuthenticationParameters(
"{\"saslJaasClientSectionName\": " + "\"PulsarProxy\"," +
"\"serverType\": " + "\"broker\"}");
......
......@@ -19,7 +19,6 @@
package org.apache.pulsar.broker.authentication;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.FileWriter;
import java.net.URI;
......@@ -29,11 +28,17 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.security.auth.login.Configuration;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.curator.shaded.com.google.common.collect.Maps;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
......@@ -42,6 +47,8 @@ import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.auth.AuthenticationSasl;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.sasl.SaslConstants;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
......@@ -72,8 +79,9 @@ public class SaslAuthenticateTest extends ProducerConsumerBase {
String principalServerNoRealm = "broker/" + localHostname;
String principalServer = "broker/" + localHostname + "@" + kdc.getRealm();
log.info("principalServer: " + principalServer);
String principalClientNoRealm = "client/" + localHostname;
String principalClientNoRealm = "client";
String principalClient = principalClientNoRealm + "@" + kdc.getRealm();
log.info("principalClient: " + principalClient);
File keytabClient = new File(kerberosWorkDir, "pulsarclient.keytab");
......@@ -82,7 +90,7 @@ public class SaslAuthenticateTest extends ProducerConsumerBase {
File keytabServer = new File(kerberosWorkDir, "pulsarbroker.keytab");
kdc.createPrincipal(keytabServer, principalServerNoRealm);
File jaasFile = new File(kerberosWorkDir, "jaas.properties");
File jaasFile = new File(kerberosWorkDir, "jaas.conf");
try (FileWriter writer = new FileWriter(jaasFile)) {
writer.write("\n"
+ "PulsarBroker {\n"
......@@ -107,7 +115,7 @@ public class SaslAuthenticateTest extends ProducerConsumerBase {
);
}
File krb5file = new File(kerberosWorkDir, "krb5.properties");
File krb5file = new File(kerberosWorkDir, "krb5.conf");
try (FileWriter writer = new FileWriter(krb5file)) {
String conf = "[libdefaults]\n"
+ " default_realm = " + kdc.getRealm() + "\n"
......@@ -119,16 +127,17 @@ public class SaslAuthenticateTest extends ProducerConsumerBase {
+ " kdc = " + kdc.getHost() + ":" + kdc.getPort() + "\n"
+ " }";
writer.write(conf);
log.info("krb5.properties:\n" + conf);
log.info("krb5.conf:\n" + conf);
}
System.setProperty("java.security.auth.login.config", jaasFile.getAbsolutePath());
System.setProperty("java.security.krb5.properties", krb5file.getAbsolutePath());
System.setProperty("java.security.krb5.conf", krb5file.getAbsolutePath());
Configuration.getConfiguration().refresh();
// Client config
Map<String, String> clientSaslConfig = Maps.newHashMap();
clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
clientSaslConfig.put("serverType", "broker");
log.info("set client jaas section name: PulsarClient");
authSasl = AuthenticationFactory.create(AuthenticationSasl.class.getName(), clientSaslConfig);
log.info("created AuthenticationSasl");
......@@ -137,7 +146,7 @@ public class SaslAuthenticateTest extends ProducerConsumerBase {
@AfterClass
public static void stopMiniKdc() {
System.clearProperty("java.security.auth.login.config");
System.clearProperty("java.security.krb5.properties");
System.clearProperty("java.security.krb5.conf");
if (kdc != null) {
kdc.stop();
}
......@@ -151,26 +160,39 @@ public class SaslAuthenticateTest extends ProducerConsumerBase {
@Override
protected void setup() throws Exception {
log.info("-- {} --, start at host: {}", methodName, localHostname);
isTcpLookup = true;
// use http lookup to verify HttpClient works well.
isTcpLookup = false;
conf.setAdvertisedAddress(localHostname);
conf.setAuthenticationEnabled(true);
conf.setSaslAuthentication(true);
conf.setSaslJaasClientAllowedIds(".*" + localHostname + ".*");
conf.setSaslJaasClientAllowedIds(".*" + "client" + ".*");
conf.setSaslJaasServerSectionName("PulsarBroker");
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderSasl.class.getName());
conf.setAuthenticationProviders(providers);
conf.setClusterName("test");
conf.setSuperUserRoles(ImmutableSet.of("client" + "@" + kdc.getRealm()));
super.init();
lookupUrl = new URI("broker://" + "localhost" + ":" + BROKER_PORT);
lookupUrl = new URI("http://" + "localhost" + ":" + BROKER_WEBSERVICE_PORT);
pulsarClient = PulsarClient.builder()
.serviceUrl(lookupUrl.toString())
.statsInterval(0, TimeUnit.SECONDS)
.authentication(authSasl).build();
// set admin auth, to verify admin web resources
Map<String, String> clientSaslConfig = Maps.newHashMap();
clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
clientSaslConfig.put("serverType", "broker");
log.info("set client jaas section name: PulsarClient");
admin = PulsarAdmin.builder()
.serviceHttpUrl(brokerUrl.toString())
.authentication(AuthenticationFactory.create(AuthenticationSasl.class.getName(), clientSaslConfig))
.build();
log.info("-- {} --, end.", methodName);
super.producerBaseSetup();
}
......@@ -217,4 +239,51 @@ public class SaslAuthenticateTest extends ProducerConsumerBase {
log.info("-- {} -- end", methodName);
}
// Test sasl server/client auth.
@Test
public void testSaslServerAndClientAuth() throws Exception {
log.info("-- {} -- start", methodName);
String hostName = "localhost";
// prepare client and server side resource
AuthenticationDataProvider dataProvider = authSasl.getAuthData(hostName);
AuthenticationProviderSasl saslServer = (AuthenticationProviderSasl)
(pulsar.getBrokerService().getAuthenticationService()
.getAuthenticationProvider(SaslConstants.AUTH_METHOD_NAME));
AuthenticationState authState = saslServer.newAuthState(null, null, null);
// auth between server and client.
// first time auth
AuthData initData1 = dataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
AuthData serverData1 = authState.authenticate(initData1);
boolean complete = authState.isComplete();
Assert.assertEquals(complete, false);
// second time auth, completed
AuthData initData2 = dataProvider.authenticate(serverData1);
AuthData serverData2 = authState.authenticate(initData2);
complete = authState.isComplete();
Assert.assertEquals(complete, true);
Assert.assertEquals(serverData2.getBytes(), null);
// if completed, server could not auth again.
try {
authState.authenticate(initData2);
Assert.fail("Expected fail because auth completed for authState");
} catch (Exception e) {
// expected
}
// another server could not serve old client
try {
AuthenticationState authState2 = saslServer.newAuthState(null, null, null);
AuthData serverData3 = authState2.authenticate(initData1);
Assert.fail("Expected fail. server is auth old client data");
} catch (Exception e) {
// expected
}
log.info("-- {} -- end", methodName);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.authentication;
import javax.naming.AuthenticationException;
import lombok.extern.slf4j.Slf4j;
import org.testng.Assert;
import org.testng.annotations.Test;
@Slf4j
public class SaslServerTokenSignerTest {
@Test
public void testNoSecret() throws Exception {
try {
new SaslRoleTokenSigner(null);
Assert.fail();
}
catch (IllegalArgumentException ex) {
}
}
@Test
public void testNullAndEmptyString() throws Exception {
SaslRoleTokenSigner signer = new SaslRoleTokenSigner("secret".getBytes());
try {
signer.sign(null);
Assert.fail("Null String should Failed");
} catch (IllegalArgumentException ex) {
// Expected
} catch (Throwable ex) {
Assert.fail("Null String should Failed with IllegalArgumentException.");
}
try {
signer.sign("");
Assert.fail("Empty String should Failed");
} catch (IllegalArgumentException ex) {
// Expected
} catch (Throwable ex) {
Assert.fail("Empty String should Failed with IllegalArgumentException.");
}
}
@Test
public void testSignature() throws Exception {
SaslRoleTokenSigner signer = new SaslRoleTokenSigner("secret".getBytes());
String s1 = signer.sign("ok");
String s2 = signer.sign("ok");
String s3 = signer.sign("wrong");
Assert.assertEquals(s1, s2);
Assert.assertNotSame(s1, s3);
}
@Test
public void testVerify() throws Exception {
SaslRoleTokenSigner signer = new SaslRoleTokenSigner("secret".getBytes());
String t = "test";
String s = signer.sign(t);
String e = signer.verifyAndExtract(s);
Assert.assertEquals(t, e);
Assert.assertNotEquals(t, s);
}
@Test
public void testInvalidSignedText() throws Exception {
SaslRoleTokenSigner signer = new SaslRoleTokenSigner("secret".getBytes());
try {
signer.verifyAndExtract("test");
Assert.fail();
} catch (AuthenticationException ex) {
// Expected
} catch (Throwable ex) {
Assert.fail();
}
}
@Test
public void testTampering() throws Exception {
SaslRoleTokenSigner signer = new SaslRoleTokenSigner("secret".getBytes());
String t = "test";
String s = signer.sign(t);
s += "x";
try {
signer.verifyAndExtract(s);
Assert.fail();
} catch (AuthenticationException ex) {
// Expected
} catch (Throwable ex) {
Assert.fail();
}
}
}
......@@ -604,16 +604,6 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private String anonymousUserRole = null;
@FieldContext(
category = CATEGORY_SASL_AUTH,
doc = "Whether Use SASL Authentication or not"
)
// TODO: isSaslAuthentication used to bypass web resource check.
// will remove it after implementation the support.
// github issue #3653 {@link: https://github.com/apache/pulsar/issues/3653}
private boolean isSaslAuthentication = false;
@FieldContext(
category = CATEGORY_SASL_AUTH,
doc = "This is a regexp, which limits the range of possible ids which can connect to the Broker using SASL.\n"
......
......@@ -25,6 +25,9 @@ import java.net.SocketAddress;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.api.AuthData;
......@@ -73,4 +76,11 @@ public interface AuthenticationProvider extends Closeable {
return new OneStageAuthenticationState(authData, remoteAddress, sslSession, this);
}
/**
* Set response, according to passed in request.
* and return whether we should do following chain.doFilter or not.
*/
default boolean authenticateHttpRequest(HttpServletRequest request, HttpServletResponse response) throws Exception {
throw new AuthenticationException("Not supported");
}
}
......@@ -20,6 +20,8 @@
package org.apache.pulsar.broker.authentication;
import javax.naming.AuthenticationException;
import javax.servlet.http.HttpServletRequest;
import org.apache.pulsar.common.api.AuthData;
/**
......@@ -47,7 +49,14 @@ public interface AuthenticationState {
AuthenticationDataSource getAuthDataSource();
/**
* Whether the authentication is completed or not
* Whether the authentication is completed or not.
*/
boolean isComplete();
/**
* Get AuthenticationState ID
*/
default long getStateId() {
return -1L;
}
}
......@@ -18,9 +18,10 @@
*/
package org.apache.pulsar.broker.web;
import static com.google.common.base.Preconditions.checkState;
import java.io.IOException;
import javax.naming.AuthenticationException;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
......@@ -31,8 +32,8 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.common.sasl.SaslConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -51,27 +52,52 @@ public class AuthenticationFilter implements Filter {
this.authenticationService = authenticationService;
}
private boolean isSaslRequest(HttpServletRequest request) {
if (request.getHeader(SaslConstants.SASL_HEADER_TYPE) == null ||
request.getHeader(SaslConstants.SASL_HEADER_TYPE).isEmpty()) {
return false;
}
if (request.getHeader(SaslConstants.SASL_HEADER_TYPE)
.equalsIgnoreCase(SaslConstants.SASL_TYPE_VALUE)) {
return true;
} else {
return false;
}
}
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
try {
String role = authenticationService.authenticateHttpRequest((HttpServletRequest) request);
request.setAttribute(AuthenticatedRoleAttributeName, role);
request.setAttribute(AuthenticatedDataAttributeName,
HttpServletRequest httpRequest = (HttpServletRequest) request;
HttpServletResponse httpResponse = (HttpServletResponse) response;
if (!isSaslRequest(httpRequest)) {
// not sasl type, return role directly.
String role = authenticationService.authenticateHttpRequest((HttpServletRequest) request);
request.setAttribute(AuthenticatedRoleAttributeName, role);
request.setAttribute(AuthenticatedDataAttributeName,
new AuthenticationDataHttps((HttpServletRequest) request));
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Authenticated HTTP request with role {}", request.getRemoteAddr(), role);
}
chain.doFilter(request, response);
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Authenticated HTTP request with role {}", request.getRemoteAddr(), role);
boolean doFilter = authenticationService
.getAuthenticationProvider(SaslConstants.AUTH_METHOD_NAME)
.authenticateHttpRequest(httpRequest, httpResponse);
if (doFilter) {
chain.doFilter(request, response);
}
} catch (AuthenticationException e) {
} catch (Exception e) {
HttpServletResponse httpResponse = (HttpServletResponse) response;
httpResponse.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Authentication required");
LOG.warn("[{}] Failed to authenticate HTTP request: {}", request.getRemoteAddr(), e.getMessage());
return;
}
chain.doFilter(request, response);
}
@Override
......
......@@ -226,11 +226,8 @@ public class ServerCnx extends PulsarHandler {
* - originalPrincipal is not blank
* - originalPrincipal is not a proxy principal
*/
//TODO: for sasl proxy.
// github issue #3655 {@link: https://github.com/apache/pulsar/issues/3655}
private boolean invalidOriginalPrincipal(String originalPrincipal) {
return (service.isAuthenticationEnabled() && service.isAuthorizationEnabled()
&& !isSaslAuthenticationMethod()
&& proxyRoles.contains(authRole) && (StringUtils.isBlank(originalPrincipal) || proxyRoles.contains(originalPrincipal)));
}
......@@ -1485,10 +1482,6 @@ public class ServerCnx extends PulsarHandler {
}
}
private boolean isSaslAuthenticationMethod(){
return authMethod.equalsIgnoreCase(SaslConstants.AUTH_METHOD_NAME);
}
private static final Logger log = LoggerFactory.getLogger(ServerCnx.class);
/**
......
......@@ -27,7 +27,6 @@ import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
......@@ -42,6 +41,10 @@ import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
......@@ -49,7 +52,11 @@ import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.common.naming.*;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
......@@ -58,12 +65,6 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
/**
* Base class for Web resources in Pulsar. It provides basic authorization functions.
*/
......@@ -166,10 +167,7 @@ public abstract class PulsarWebResource {
* if not authorized
*/
protected void validateSuperUserAccess() {
// TODO: isSaslAuthentication used to bypass web resource check.
// will remove it after implementation the support.
// github issue #3653 {@link: https://github.com/apache/pulsar/issues/3653}
if (config().isAuthenticationEnabled() && !config().isSaslAuthentication()) {
if (config().isAuthenticationEnabled()) {
String appId = clientAppId();
if(log.isDebugEnabled()) {
log.debug("[{}] Check super user access: Authenticated: {} -- Role: {}", uri.getRequestUri(),
......@@ -245,7 +243,7 @@ public abstract class PulsarWebResource {
throw new RestException(Status.NOT_FOUND, "Tenant does not exist");
}
if (pulsar.getConfiguration().isAuthenticationEnabled() && !pulsar.getConfiguration().isSaslAuthentication() && pulsar.getConfiguration().isAuthorizationEnabled()) {
if (pulsar.getConfiguration().isAuthenticationEnabled() && pulsar.getConfiguration().isAuthorizationEnabled()) {
if (!isClientAuthenticated(clientAppId)) {
throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
}
......
......@@ -133,10 +133,7 @@ public class WebService implements AutoCloseable {
});
}
// TODO: isSaslAuthentication used to bypass web resource check.
// will remove it after implementation the support.
// github issue #3653 {@link: https://github.com/apache/pulsar/issues/3653}
if (requiresAuthentication && pulsar.getConfiguration().isAuthenticationEnabled() && !pulsar.getConfiguration().isSaslAuthentication()) {
if (requiresAuthentication && pulsar.getConfiguration().isAuthenticationEnabled()) {
FilterHolder filter = new FilterHolder(new AuthenticationFilter(
pulsar.getBrokerService().getAuthenticationService()));
context.addFilter(filter, MATCH_ALL, EnumSet.allOf(DispatcherType.class));
......
......@@ -19,6 +19,8 @@
package org.apache.pulsar.client.admin.internal;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
......@@ -43,6 +45,8 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.client.admin.PulsarAdminException.ServerSideErrorException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
......@@ -59,17 +63,53 @@ public abstract class BaseResource {
public Builder request(final WebTarget target) throws PulsarAdminException {
try {
Builder builder = target.request(MediaType.APPLICATION_JSON);
// Add headers for authentication if any
if (auth != null && auth.getAuthData().hasDataForHttp()) {
for (Map.Entry<String, String> header : auth.getAuthData().getHttpHeaders()) {
builder.header(header.getKey(), header.getValue());
}
return requestAsync(target).get();
} catch (Exception e) {
throw new GettingAuthenticationDataException(e);
}
}
// do the authentication stage, and once authentication completed return a Builder
public CompletableFuture<Builder> requestAsync(final WebTarget target) {
CompletableFuture<Builder> builderFuture = new CompletableFuture<>();
CompletableFuture<Map<String, String>> authFuture = new CompletableFuture<>();
try {
AuthenticationDataProvider authData = auth.getAuthData(target.getUri().getHost());
if (authData.hasDataForHttp()) {
auth.authenticationStage(target.getUri().toString(), authData, null, authFuture);
} else {
authFuture.complete(null);
}
return builder;
// auth complete, return a new Builder
authFuture.whenComplete((respHeaders, ex) -> {
if (ex != null) {
log.warn("[{}] Failed to perform http request at authn stage: {}",
ex.getMessage());
builderFuture.completeExceptionally(new PulsarClientException(ex));
return;
}
try {
Builder builder = target.request(MediaType.APPLICATION_JSON);
if (authData.hasDataForHttp()) {
Set<Entry<String, String>> headers =
auth.newRequestHeader(target.getUri().toString(), authData, respHeaders);
if (headers != null) {
headers.forEach(entry -> builder.header(entry.getKey(), entry.getValue()));
}
}
builderFuture.complete(builder);
} catch (Throwable t) {
builderFuture.completeExceptionally(new GettingAuthenticationDataException(t));
}
});
} catch (Throwable t) {
throw new GettingAuthenticationDataException(t);
builderFuture.completeExceptionally(new GettingAuthenticationDataException(t));
}
return builderFuture;
}
public <T> CompletableFuture<Void> asyncPutRequest(final WebTarget target, Entity<T> entity) {
......
......@@ -20,9 +20,16 @@ package org.apache.pulsar.client.admin.internal;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.common.sasl.SaslConstants;
import org.asynchttpclient.RequestBuilder;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.ws.rs.client.WebTarget;
public class ComponentResource extends BaseResource {
......@@ -30,18 +37,32 @@ public class ComponentResource extends BaseResource {
super(auth);
}
public RequestBuilder addAuthHeaders(RequestBuilder requestBuilder) throws PulsarAdminException {
public RequestBuilder addAuthHeaders(WebTarget target, RequestBuilder requestBuilder) throws PulsarAdminException {
try {
if (auth != null && auth.getAuthData().hasDataForHttp()) {
for (Map.Entry<String, String> header : auth.getAuthData().getHttpHeaders()) {
requestBuilder.addHeader(header.getKey(), header.getValue());
if (auth != null) {
Set<Entry<String, String>> headers = getAuthHeaders(target);
if (headers != null && !headers.isEmpty()) {
headers.forEach(header -> requestBuilder.addHeader(header.getKey(), header.getValue()));
}
}
return requestBuilder;
} catch (Throwable t) {
throw new PulsarAdminException.GettingAuthenticationDataException(t);
}
}
private Set<Entry<String, String>> getAuthHeaders(WebTarget target) throws Exception {
AuthenticationDataProvider authData = auth.getAuthData(target.getUri().getHost());
String targetUrl = target.getUri().toString();
if (auth.getAuthMethodName().equalsIgnoreCase(SaslConstants.AUTH_METHOD_NAME)) {
CompletableFuture<Map<String, String>> authFuture = new CompletableFuture<>();
auth.authenticationStage(targetUrl, authData, null, authFuture);
return auth.newRequestHeader(targetUrl, authData, authFuture.get());
} else if (authData.hasDataForHttp()) {
return auth.newRequestHeader(targetUrl, authData, null);
} else {
return null;
}
}
}
......@@ -167,7 +167,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
// If the function code is built in, we don't need to submit here
builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
}
org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).get();
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
......@@ -215,7 +215,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
// If the function code is built in, we don't need to submit here
builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
}
org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).get();
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
......@@ -334,7 +334,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
.addBodyPart(new FilePart("data", new File(sourceFile), MediaType.APPLICATION_OCTET_STREAM))
.addBodyPart(new StringPart("path", path, MediaType.TEXT_PLAIN));
org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).get();
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
}
......@@ -358,7 +358,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
RequestBuilder builder = get(target.getUri().toASCIIString());
Future<HttpResponseStatus> whenStatusCode
= asyncHttpClient.executeRequest(addAuthHeaders(builder).build(), new AsyncHandler<HttpResponseStatus>() {
= asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build(), new AsyncHandler<HttpResponseStatus>() {
private HttpResponseStatus status;
@Override
......
......@@ -125,7 +125,7 @@ public class SinkImpl extends ComponentResource implements Sink {
// If the function code is built in, we don't need to submit here
builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
}
org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(sink, builder).build()).get();
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
......@@ -173,7 +173,7 @@ public class SinkImpl extends ComponentResource implements Sink {
// If the function code is built in, we don't need to submit here
builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
}
org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(sink, builder).build()).get();
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
......
......@@ -125,7 +125,7 @@ public class SourceImpl extends ComponentResource implements Source {
// If the function code is built in, we don't need to submit here
builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
}
org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(source, builder).build()).get();
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
}
......@@ -171,7 +171,7 @@ public class SourceImpl extends ComponentResource implements Source {
// If the function code is built in, we don't need to submit here
builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
}
org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(source, builder).build()).get();
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
......
......@@ -21,7 +21,10 @@ package org.apache.pulsar.client.api;
import java.io.Closeable;
import java.io.Serializable;
import java.util.Map;
import javax.naming.AuthenticationException;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
/**
......@@ -75,4 +78,25 @@ public interface Authentication extends Closeable, Serializable {
* Initialize the authentication provider
*/
void start() throws PulsarClientException;
/**
* An authentication Stage.
* when authentication complete, passed-in authFuture will contains authentication related http request headers.
*/
default void authenticationStage(String requestUrl,
AuthenticationDataProvider authData,
Map<String, String> previousResHeaders,
CompletableFuture<Map<String, String>> authFuture) {
authFuture.complete(null);
}
/**
* Add an authenticationStage that will complete along with authFuture
*/
default Set<Entry<String, String>> newRequestHeader(String hostName,
AuthenticationDataProvider authData,
Map<String, String> previousResHeaders) throws Exception {
return authData.getHttpHeaders();
}
}
......@@ -88,7 +88,7 @@ public interface AuthenticationDataProvider extends Serializable {
*
* @return an enumeration of all the header names
*/
default Set<Map.Entry<String, String>> getHttpHeaders() {
default Set<Map.Entry<String, String>> getHttpHeaders() throws Exception {
return null;
}
......
......@@ -57,5 +57,10 @@
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
</dependency>
</dependencies>
</project>
......@@ -19,13 +19,44 @@
package org.apache.pulsar.client.impl.auth;
import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.common.sasl.SaslConstants.AUTH_METHOD_NAME;
import static org.apache.pulsar.common.sasl.SaslConstants.JAAS_CLIENT_SECTION_NAME;
import static org.apache.pulsar.common.sasl.SaslConstants.JAAS_DEFAULT_CLIENT_SECTION_NAME;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_AUTH_ROLE_TOKEN;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_AUTH_ROLE_TOKEN_EXPIRED;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_AUTH_TOKEN;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_BROKER_PROTOCOL;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_HEADER_STATE;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_HEADER_TYPE;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_SERVER_TYPE;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_CLIENT_INIT;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_COMPLETE;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_NEGOTIATE;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_SERVER;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_SERVER_CHECK_TOKEN;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_TYPE_VALUE;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.Base64;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.security.auth.login.LoginException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Invocation.Builder;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import com.google.common.collect.Maps;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
......@@ -33,8 +64,8 @@ import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.AuthenticationUtil;
import org.apache.pulsar.client.impl.auth.PulsarSaslClient.ClientCallbackHandler;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.sasl.JAASCredentialsContainer;
import org.apache.pulsar.common.sasl.SaslConstants;
/**
* Authentication provider for SASL based authentication.
......@@ -59,7 +90,7 @@ public class AuthenticationSasl implements Authentication, EncodedAuthentication
@Override
public String getAuthMethodName() {
return SaslConstants.AUTH_METHOD_NAME;
return AUTH_METHOD_NAME;
}
@Override
......@@ -78,7 +109,7 @@ public class AuthenticationSasl implements Authentication, EncodedAuthentication
public void configure(String encodedAuthParamString) {
if (isBlank(encodedAuthParamString)) {
log.info("authParams for SASL is be empty, will use default JAAS client section name: {}",
SaslConstants.JAAS_DEFAULT_CLIENT_SECTION_NAME);
JAAS_DEFAULT_CLIENT_SECTION_NAME);
}
try {
......@@ -104,9 +135,9 @@ public class AuthenticationSasl implements Authentication, EncodedAuthentication
// read section from config files of kerberos
this.loginContextName = authParams
.getOrDefault(SaslConstants.JAAS_CLIENT_SECTION_NAME, SaslConstants.JAAS_DEFAULT_CLIENT_SECTION_NAME);
.getOrDefault(JAAS_CLIENT_SECTION_NAME, JAAS_DEFAULT_CLIENT_SECTION_NAME);
this.serverType = authParams
.getOrDefault(SaslConstants.SASL_SERVER_TYPE, SaslConstants.SASL_BROKER_PROTOCOL);
.getOrDefault(SASL_SERVER_TYPE, SASL_BROKER_PROTOCOL);
// init the static jaasCredentialsContainer that shares amongst client.
if (!initializedJAAS) {
......@@ -130,11 +161,182 @@ public class AuthenticationSasl implements Authentication, EncodedAuthentication
@Override
public void start() throws PulsarClientException {
client = ClientBuilder.newClient();
}
@Override
public void close() throws IOException {
if(client != null) {
client.close();
}
}
private String saslRoleToken = null;
private Client client = null;
// role token exists but expired return true
private boolean isRoleTokenExpired(Map<String, String> responseHeaders) {
if ((saslRoleToken != null)
&& (responseHeaders != null)
// header type match
&& (responseHeaders.get(SASL_HEADER_TYPE) != null && responseHeaders.get(SASL_HEADER_TYPE)
.equalsIgnoreCase(SASL_TYPE_VALUE))
// header state expired
&& (responseHeaders.get(SASL_HEADER_STATE) != null && responseHeaders.get(SASL_HEADER_STATE)
.equalsIgnoreCase(SASL_AUTH_ROLE_TOKEN_EXPIRED))) {
return true;
} else {
return false;
}
}
@SneakyThrows(Exception.class)
private Builder newRequestBuilder(WebTarget target,
AuthenticationDataProvider authData,
Map<String, String> previousResHeaders) {
Builder builder = target.request(MediaType.APPLICATION_JSON);
Set<Entry<String, String>> headers = newRequestHeader(
target.getUri().toString(),
authData,
previousResHeaders);
headers.forEach(entry -> {
builder.header(entry.getKey(), entry.getValue());
});
return builder;
}
// set header according to previous response
@Override
public Set<Entry<String, String>> newRequestHeader(String hostName,
AuthenticationDataProvider authData,
Map<String, String> previousRespHeaders) throws Exception {
Map<String, String> headers = Maps.newHashMap();
if (authData.hasDataForHttp()) {
authData.getHttpHeaders().forEach(header ->
headers.put(header.getKey(), header.getValue())
);
}
// role token expired in last check. remove role token, new sasl client, restart auth.
if (isRoleTokenExpired(previousRespHeaders)) {
previousRespHeaders = null;
saslRoleToken = null;
authData = getAuthData(hostName);
}
// role token is not expired and OK to use.
// 1. first time request, send server to check if expired.
// 2. server checked, and return SASL_STATE_COMPLETE, ask server to complete auth
// 3. server checked, and not return SASL_STATE_COMPLETE
if(saslRoleToken != null) {
headers.put(SASL_AUTH_ROLE_TOKEN, saslRoleToken);
if (previousRespHeaders == null) {
// first time auth, ask server to check the role token expired or not.
if (log.isDebugEnabled()) {
log.debug("request builder add token: Check token");
}
headers.put(SASL_HEADER_STATE, SASL_STATE_SERVER_CHECK_TOKEN);
} else if (previousRespHeaders.get(SASL_HEADER_STATE).equalsIgnoreCase(SASL_STATE_COMPLETE)) {
headers.put(SASL_HEADER_STATE, SASL_STATE_COMPLETE);
if (log.isDebugEnabled()) {
log.debug("request builder add token. role verified by server");
}
} else {
if (log.isDebugEnabled()) {
log.debug("request builder add token. NOT complete. state: {}",
previousRespHeaders.get(SASL_HEADER_STATE));
}
headers.put(SASL_HEADER_STATE, SASL_STATE_NEGOTIATE);
}
return headers.entrySet();
}
// role token is null, need do auth.
if (previousRespHeaders == null) {
if (log.isDebugEnabled()) {
log.debug("Init authn in client side");
}
// first time init
headers.put(SASL_HEADER_STATE, SASL_STATE_CLIENT_INIT);
AuthData initData = authData.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
headers.put(SASL_AUTH_TOKEN,
Base64.getEncoder().encodeToString(initData.getBytes()));
} else {
AuthData brokerData = AuthData.of(
Base64.getDecoder().decode(
previousRespHeaders.get(SASL_AUTH_TOKEN)));
AuthData clientData = authData.authenticate(brokerData);
headers.put(SASL_STATE_SERVER, previousRespHeaders.get(SASL_STATE_SERVER));
headers.put(SASL_HEADER_TYPE, SASL_TYPE_VALUE);
headers.put(SASL_HEADER_STATE, SASL_STATE_NEGOTIATE);
headers.put(SASL_AUTH_TOKEN,
Base64.getEncoder().encodeToString(clientData.getBytes()));
}
return headers.entrySet();
}
private Map<String, String> getHeaders(Response response) {
Map<String, String> headers = Maps.newHashMap();
String saslHeader = response.getHeaderString(SASL_HEADER_TYPE);
String headerState = response.getHeaderString(SASL_HEADER_STATE);
String authToken = response.getHeaderString(SASL_AUTH_TOKEN);
String serverStateId = response.getHeaderString(SASL_STATE_SERVER);
if (saslRoleToken != null) {
headers.put(SASL_AUTH_ROLE_TOKEN, saslRoleToken);
}
headers.put(SASL_HEADER_TYPE, saslHeader);
headers.put(SASL_HEADER_STATE, headerState);
headers.put(SASL_AUTH_TOKEN, authToken);
headers.put(SASL_STATE_SERVER, serverStateId);
return headers;
}
@Override
public void authenticationStage(String requestUrl,
AuthenticationDataProvider authData,
Map<String, String> previousResHeaders,
CompletableFuture<Map<String, String>> authFuture) {
// a new request for sasl auth
Builder builder = newRequestBuilder(client.target(requestUrl), authData, previousResHeaders);
builder.async().get(new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
if (response.getStatus() == HTTP_UNAUTHORIZED) {
// sasl auth on going
authenticationStage(requestUrl, authData, getHeaders(response), authFuture);
return;
}
if (response.getStatus() != HttpURLConnection.HTTP_OK) {
log.warn("HTTP get request failed: {}", response.getStatusInfo());
authFuture.completeExceptionally(new PulsarClientException("Sasl Auth request failed: " + response.getStatus()));
return;
} else {
if (response.getHeaderString(SASL_AUTH_ROLE_TOKEN) != null) {
saslRoleToken = response.getHeaderString(SASL_AUTH_ROLE_TOKEN);
}
if (log.isDebugEnabled()) {
log.debug("Complete auth with saslRoleToken: {}", saslRoleToken);
}
authFuture.complete(getHeaders(response));
return;
}
}
@Override
public void failed(Throwable throwable) {
log.warn("Failed to perform http request: {}", throwable);
authFuture.completeExceptionally(new PulsarClientException(throwable));
return;
}
});
}
}
......@@ -19,12 +19,18 @@
package org.apache.pulsar.client.impl.auth;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import javax.naming.AuthenticationException;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.sasl.SaslConstants;
@Slf4j
public class SaslAuthenticationDataProvider implements AuthenticationDataProvider {
......@@ -54,4 +60,17 @@ public class SaslAuthenticationDataProvider implements AuthenticationDataProvide
return pulsarSaslClient.evaluateChallenge(commandData);
}
@Override
public boolean hasDataForHttp() {
return true;
}
@Override
public Set<Entry<String, String>> getHttpHeaders() throws Exception {
Map<String, String> headers = new HashMap<>();
headers.put(SaslConstants.SASL_HEADER_TYPE, SaslConstants.SASL_TYPE_VALUE);
return headers.entrySet();
}
}
......@@ -18,22 +18,20 @@
*/
package org.apache.pulsar.client.impl;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.ssl.SslContext;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Map;
import java.util.Properties;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.ssl.SslContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
......@@ -41,19 +39,16 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.SecurityUtility;
import org.asynchttpclient.AsyncCompletionHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.AsyncHttpClientConfig;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;
import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Slf4j
public class HttpClient implements Closeable {
protected final static int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10;
......@@ -133,56 +128,76 @@ public class HttpClient implements Closeable {
final CompletableFuture<T> future = new CompletableFuture<>();
try {
String requestUrl = new URL(serviceNameResolver.resolveHostUri().toURL(), path).toString();
AuthenticationDataProvider authData = authentication.getAuthData();
BoundRequestBuilder builder = httpClient.prepareGet(requestUrl);
String remoteHostName = serviceNameResolver.resolveHostUri().getHost();
AuthenticationDataProvider authData = authentication.getAuthData(remoteHostName);
// Add headers for authentication if any
CompletableFuture<Map<String, String>> authFuture = new CompletableFuture<>();
// bring a authenticationStage for sasl auth.
if (authData.hasDataForHttp()) {
for (Map.Entry<String, String> header : authData.getHttpHeaders()) {
builder.setHeader(header.getKey(), header.getValue());
}
authentication.authenticationStage(requestUrl, authData, null, authFuture);
} else {
authFuture.complete(null);
}
final ListenableFuture<Response> responseFuture = builder.setHeader("Accept", "application/json")
.execute(new AsyncCompletionHandler<Response>() {
// auth complete, do real request
authFuture.whenComplete((respHeaders, ex) -> {
if (ex != null) {
log.warn("[{}] Failed to perform http request at authentication stage: {}",
requestUrl, ex.getMessage());
future.completeExceptionally(new PulsarClientException(ex));
return;
}
@Override
public Response onCompleted(Response response) throws Exception {
return response;
}
// auth complete, use a new builder
BoundRequestBuilder builder = httpClient.prepareGet(requestUrl)
.setHeader("Accept", "application/json");
if (authData.hasDataForHttp()) {
Set<Entry<String, String>> headers;
try {
headers = authentication.newRequestHeader(requestUrl, authData, respHeaders);
} catch (Exception e) {
log.warn("[{}] Error during HTTP get headers: {}", requestUrl, e.getMessage());
future.completeExceptionally(new PulsarClientException(e));
return;
}
if (headers != null) {
headers.forEach(entry -> builder.addHeader(entry.getKey(), entry.getValue()));
}
}
@Override
public void onThrowable(Throwable t) {
log.warn("[{}] Failed to perform http request: {}", requestUrl, t.getMessage());
future.completeExceptionally(new PulsarClientException(t));
}
});
builder.execute().toCompletableFuture().whenComplete((response2, t) -> {
if (t != null) {
log.warn("[{}] Failed to perform http request: {}", requestUrl, t.getMessage());
future.completeExceptionally(new PulsarClientException(t));
return;
}
responseFuture.addListener(() -> {
try {
Response response = responseFuture.get();
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
log.warn("[{}] HTTP get request failed: {}", requestUrl, response.getStatusText());
// request not success
if (response2.getStatusCode() != HttpURLConnection.HTTP_OK) {
log.warn("[{}] HTTP get request failed: {}", requestUrl, response2.getStatusText());
Exception e;
if (response.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
e = new NotFoundException("Not found: " + response.getStatusText());
if (response2.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
e = new NotFoundException("Not found: " + response2.getStatusText());
} else {
e = new PulsarClientException("HTTP get request failed: " + response.getStatusText());
e = new PulsarClientException("HTTP get request failed: " + response2.getStatusText());
}
future.completeExceptionally(e);
return;
}
T data = ObjectMapperFactory.getThreadLocal().readValue(response.getResponseBodyAsBytes(), clazz);
future.complete(data);
} catch (Exception e) {
log.warn("[{}] Error during HTTP get request: {}", requestUrl, e.getMessage());
future.completeExceptionally(new PulsarClientException(e));
}
}, MoreExecutors.directExecutor());
try {
T data = ObjectMapperFactory.getThreadLocal().readValue(response2.getResponseBodyAsBytes(), clazz);
future.complete(data);
} catch (Exception e) {
log.warn("[{}] Error during HTTP get request: {}", requestUrl, e.getMessage());
future.completeExceptionally(new PulsarClientException(e));
}
});
});
} catch (Exception e) {
log.warn("[{}] Failed to get authentication data for lookup: {}", path, e.getMessage());
log.warn("[{}]PulsarClientImpl: {}", path, e.getMessage());
if (e instanceof PulsarClientException) {
future.completeExceptionally(e);
} else {
......@@ -191,8 +206,5 @@ public class HttpClient implements Closeable {
}
return future;
}
private static final Logger log = LoggerFactory.getLogger(HttpClient.class);
}
......@@ -60,6 +60,32 @@ public class SaslConstants {
// Stand for the start of mutual auth between Client and Broker
public static final String INIT_PROVIDER_DATA = "isInit";
// Sasl token name that contained auth role
public static final String SASL_AUTH_ROLE_TOKEN = "SaslAuthRoleToken";
public static final String SASL_AUTH_ROLE_TOKEN_EXPIRED = "SaslAuthRoleTokenExpired";
/**
* HTTP header used by the SASL client/server during an authentication sequence.
*/
// auth type
public static final String SASL_HEADER_TYPE = "SASL-Type";
public static final String SASL_TYPE_VALUE = "Kerberos";
// header name for token auth between client and server
public static final String SASL_AUTH_TOKEN = "SASL-Token";
// header name for state
public static final String SASL_HEADER_STATE = "State";
// header value for state
public static final String SASL_STATE_CLIENT_INIT = "Init";
public static final String SASL_STATE_NEGOTIATE = "ING";
public static final String SASL_STATE_COMPLETE = "Done";
public static final String SASL_STATE_SERVER_CHECK_TOKEN = "ServerCheckToken";
// server side track the server
public static final String SASL_STATE_SERVER = "SASL-Server-ID";
public static boolean isUsingTicketCache(String configurationEntry) {
AppConfigurationEntry[] entries = Configuration.getConfiguration()
.getAppConfigurationEntry(configurationEntry);
......
......@@ -246,10 +246,7 @@ public class WebSocketService implements Closeable {
public boolean isAuthenticationEnabled() {
if (this.config == null)
return false;
// TODO: isSaslAuthentication used to bypass web resource check.
// will remove it after implementation the support.
// github issue #3653 {@link: https://github.com/apache/pulsar/issues/3653}
return this.config.isAuthenticationEnabled() && !this.config.isSaslAuthentication();
return this.config.isAuthenticationEnabled();
}
public boolean isAuthorizationEnabled() {
......
......@@ -96,7 +96,7 @@ public class WebSocketWebResource {
* if not authorized
*/
protected void validateSuperUserAccess() {
if (service().getConfig().isAuthenticationEnabled() && !service().getConfig().isSaslAuthentication()) {
if (service().getConfig().isAuthenticationEnabled()) {
String appId = clientAppId();
if (log.isDebugEnabled()) {
log.debug("[{}] Check super user access: Authenticated: {} -- Role: {}", uri.getRequestUri(),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册