提交 381fd6d5 编写于 作者: A Addison Higham 提交者: xiaolong.ran

[proxy] Fix proxy to be able to re-send request body (#5361)

Fixes #5360

This adds a small cache of the request body to ensure that it can be
re-sent.

TODO: still needs tested
(cherry picked from commit 978efaf1)
上级 0884d7cc
......@@ -20,9 +20,14 @@ package org.apache.pulsar.proxy.server;
import static org.apache.commons.lang3.StringUtils.isBlank;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.security.cert.X509Certificate;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.Executor;
......@@ -30,6 +35,7 @@ import javax.net.ssl.SSLContext;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.client.api.Authentication;
......@@ -42,6 +48,7 @@ import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.ProtocolHandlers;
import org.eclipse.jetty.client.RedirectProtocolHandler;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.proxy.ProxyServlet;
......@@ -133,6 +140,33 @@ class AdminProxyHandler extends ProxyServlet {
}
// This class allows the request body to be replayed, the default implementation
// does not
protected class ReplayableProxyContentProvider extends ProxyInputStreamContentProvider {
private Boolean firstIteratorCalled = false;
private final ByteArrayOutputStream bodyBuffer;
protected ReplayableProxyContentProvider(HttpServletRequest request, HttpServletResponse response, Request proxyRequest, InputStream input) {
super(request, response, proxyRequest, input);
bodyBuffer = new ByteArrayOutputStream(request.getContentLength());
}
@Override
public Iterator<ByteBuffer> iterator() {
if (firstIteratorCalled) {
return Collections.singleton(ByteBuffer.wrap(bodyBuffer.toByteArray())).iterator();
} else {
firstIteratorCalled = true;
return super.iterator();
}
}
@Override
protected ByteBuffer onRead(byte[] buffer, int offset, int length) {
bodyBuffer.write(buffer, offset, length);
return super.onRead(buffer, offset, length);
}
}
private static class JettyHttpClient extends HttpClient {
public JettyHttpClient() {
super();
......@@ -159,6 +193,12 @@ class AdminProxyHandler extends ProxyServlet {
}
@Override
protected ContentProvider proxyRequestContent(HttpServletRequest request,
HttpServletResponse response, Request proxyRequest) throws IOException {
return new ReplayableProxyContentProvider(request, response, proxyRequest, request.getInputStream());
}
@Override
protected HttpClient newHttpClient() {
try {
......
......@@ -25,6 +25,7 @@ import java.util.Collections;
import lombok.Cleanup;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
......@@ -55,6 +56,7 @@ public class TestProxy extends PulsarTestSuite {
.withEnv("clusterName", clusterName);
specBuilder.externalService("proxy-via-url", proxyViaURL);
return super.beforeSetupCluster(clusterName, specBuilder);
}
......@@ -108,4 +110,26 @@ public class TestProxy extends PulsarTestSuite {
testProxy(proxyViaURL.getPlainTextServiceUrl(), proxyViaURL.getHttpServiceUrl());
}
@Test
public void testProxyRequestBodyRedirect() throws Exception {
// See GH issue #5360, this ensures that we properly get a request with a body to be processed
final String tenant = "proxy-test-" + randomName(10);
final String namespace = tenant + "/ns1";
final String topic = "persistent://" + namespace + "/topic1";
@Cleanup
PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
admin.tenants().createTenant(tenant,
new TenantInfo(Collections.emptySet(), Collections.singleton(pulsarCluster.getClusterName())));
admin.namespaces().createNamespace(namespace, Collections.singleton(pulsarCluster.getClusterName()));
for (int i = 0; i < 10; i++) {
// Ensure we the command works even if re-directs happen with a request body
admin.topics().createSubscription(topic, "test-" + i, MessageId.earliest);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册