提交 d7b31fcd 编写于 作者: V Vlad Ilyushchenko

chore(http): always abort broken HTTP import requests. If we could not...

chore(http): always abort broken HTTP import requests. If we could not understand data at the beginning there is barely any reason to receive arbitrary sized uploads.
上级 4127c9ac
......@@ -61,7 +61,6 @@ import java.util.Properties;
public class PropServerConfiguration implements ServerConfiguration {
public static final String CONFIG_DIRECTORY = "conf";
private final IODispatcherConfiguration httpIODispatcherConfiguration = new HttpIODispatcherConfiguration();
private final TextImportProcessorConfiguration textImportProcessorConfiguration = new PropTextImportProcessorConfiguration();
private final StaticContentProcessorConfiguration staticContentProcessorConfiguration = new PropStaticContentProcessorConfiguration();
private final HttpServerConfiguration httpServerConfiguration = new PropHttpServerConfiguration();
private final TextConfiguration textConfiguration = new PropTextConfiguration();
......@@ -139,7 +138,6 @@ public class PropServerConfiguration implements ServerConfiguration {
private int sendBufferSize;
private CharSequence indexFileName;
private String publicDirectory;
private boolean abortBrokenUploads;
private int activeConnectionLimit;
private int eventCapacity;
private int ioQueueCapacity;
......@@ -250,7 +248,6 @@ public class PropServerConfiguration implements ServerConfiguration {
this.textLexerStringPoolCapacity = getInt(properties, "http.text.lexer.string.pool.capacity", 64);
this.timestampAdapterPoolCapacity = getInt(properties, "http.text.timestamp.adapter.pool.capacity", 64);
this.utf8SinkSize = getIntSize(properties, "http.text.utf8.sink.size", 4096);
this.abortBrokenUploads = getBoolean(properties, "http.text.abort.broken.uploads", true);
this.jsonQueryConnectionCheckFrequency = getInt(properties, "http.json.query.connection.check.frequency", 1_000_000);
this.jsonQueryDoubleScale = getInt(properties, "http.json.query.double.scale", 10);
......@@ -562,13 +559,6 @@ public class PropServerConfiguration implements ServerConfiguration {
}
}
private class PropTextImportProcessorConfiguration implements TextImportProcessorConfiguration {
@Override
public boolean abortBrokenUploads() {
return abortBrokenUploads;
}
}
private class HttpIODispatcherConfiguration implements IODispatcherConfiguration {
@Override
public int getActiveConnectionLimit() {
......@@ -771,11 +761,6 @@ public class PropServerConfiguration implements ServerConfiguration {
return staticContentProcessorConfiguration;
}
@Override
public TextImportProcessorConfiguration getTextImportProcessorConfiguration() {
return textImportProcessorConfiguration;
}
@Override
public JsonQueryProcessorConfiguration getJsonQueryProcessorConfiguration() {
return jsonQueryProcessorConfiguration;
......
......@@ -24,10 +24,8 @@
package io.questdb.cutlass.http;
import io.questdb.cutlass.http.processors.DefaultTextImportProcessorConfiguration;
import io.questdb.cutlass.http.processors.JsonQueryProcessorConfiguration;
import io.questdb.cutlass.http.processors.StaticContentProcessorConfiguration;
import io.questdb.cutlass.http.processors.TextImportProcessorConfiguration;
import io.questdb.network.DefaultIODispatcherConfiguration;
import io.questdb.network.IODispatcherConfiguration;
import io.questdb.std.FilesFacade;
......@@ -66,7 +64,7 @@ class DefaultHttpServerConfiguration implements HttpServerConfiguration {
return null;
}
};
private final TextImportProcessorConfiguration textImportProcessorConfiguration = new DefaultTextImportProcessorConfiguration();
private final JsonQueryProcessorConfiguration jsonQueryProcessorConfiguration = new JsonQueryProcessorConfiguration() {
@Override
public MillisecondClock getClock() {
......@@ -161,11 +159,6 @@ class DefaultHttpServerConfiguration implements HttpServerConfiguration {
return staticContentProcessorConfiguration;
}
@Override
public TextImportProcessorConfiguration getTextImportProcessorConfiguration() {
return textImportProcessorConfiguration;
}
@Override
public JsonQueryProcessorConfiguration getJsonQueryProcessorConfiguration() {
return jsonQueryProcessorConfiguration;
......
......@@ -29,7 +29,7 @@ import io.questdb.network.PeerIsSlowToReadException;
import io.questdb.network.ServerDisconnectException;
public interface HttpMultipartContentListener {
void onChunk(long lo, long hi) throws PeerDisconnectedException, PeerIsSlowToReadException;
void onChunk(long lo, long hi) throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException;
void onPartBegin(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException;
......
......@@ -139,7 +139,7 @@ public class HttpServer implements Closeable {
@Override
public HttpRequestProcessor newInstance() {
return new TextImportProcessor(configuration.getTextImportProcessorConfiguration(), cairoEngine);
return new TextImportProcessor(cairoEngine);
}
});
......
......@@ -27,7 +27,6 @@ package io.questdb.cutlass.http;
import io.questdb.WorkerPoolAwareConfiguration;
import io.questdb.cutlass.http.processors.JsonQueryProcessorConfiguration;
import io.questdb.cutlass.http.processors.StaticContentProcessorConfiguration;
import io.questdb.cutlass.http.processors.TextImportProcessorConfiguration;
import io.questdb.network.IODispatcherConfiguration;
import io.questdb.std.time.MillisecondClock;
......@@ -54,8 +53,6 @@ public interface HttpServerConfiguration extends WorkerPoolAwareConfiguration {
StaticContentProcessorConfiguration getStaticContentProcessorConfiguration();
TextImportProcessorConfiguration getTextImportProcessorConfiguration();
JsonQueryProcessorConfiguration getJsonQueryProcessorConfiguration();
int getSendBufferSize();
......
......@@ -67,16 +67,11 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
atomicityParamMap.put("strict", Atomicity.SKIP_ALL);
}
private final TextImportProcessorConfiguration configuration;
private final CairoEngine engine;
private HttpConnectionContext transientContext;
private TextImportProcessorState transientState;
public TextImportProcessor(
TextImportProcessorConfiguration configuration,
CairoEngine cairoEngine
) {
this.configuration = configuration;
public TextImportProcessor(CairoEngine cairoEngine) {
this.engine = cairoEngine;
}
......@@ -257,7 +252,8 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
}
@Override
public void onChunk(long lo, long hi) throws PeerDisconnectedException, PeerIsSlowToReadException {
public void onChunk(long lo, long hi)
throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException {
if (hi > lo) {
try {
transientState.textLoader.parse(lo, hi, transientContext.getCairoSecurityContext());
......@@ -381,18 +377,15 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
state.clear();
}
private void handleTextException(TextException e) throws PeerDisconnectedException, PeerIsSlowToReadException {
if (configuration.abortBrokenUploads()) {
sendError(transientContext, e.getMessage(), Chars.equalsNc("json", transientContext.getRequestHeader().getUrlParam("fmt")));
throw PeerDisconnectedException.INSTANCE;
}
transientState.state = TextImportProcessorState.STATE_DATA_ERROR;
transientState.stateMessage = e.getMessage();
private void handleTextException(TextException e)
throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException {
sendError(transientContext, e.getFlyweightMessage(), Chars.equalsNc("json", transientContext.getRequestHeader().getUrlParam("fmt")));
throw ServerDisconnectException.INSTANCE;
}
private void sendError(
HttpConnectionContext context,
String message,
CharSequence message,
boolean json
) throws PeerDisconnectedException, PeerIsSlowToReadException {
final HttpChunkedResponseSocket socket = context.getChunkedResponseSocket();
......
......@@ -88,8 +88,6 @@ public class PropServerConfigurationTest {
Assert.assertEquals("Keep-Alive: timeout=5, max=10000" + Misc.EOL, configuration.getHttpServerConfiguration().getStaticContentProcessorConfiguration().getKeepAliveHeader());
Assert.assertTrue(configuration.getHttpServerConfiguration().getTextImportProcessorConfiguration().abortBrokenUploads());
Assert.assertEquals(256, configuration.getHttpServerConfiguration().getDispatcherConfiguration().getActiveConnectionLimit());
Assert.assertEquals(1024, configuration.getHttpServerConfiguration().getDispatcherConfiguration().getEventCapacity());
Assert.assertEquals(1024, configuration.getHttpServerConfiguration().getDispatcherConfiguration().getIOQueueCapacity());
......@@ -335,8 +333,6 @@ public class PropServerConfigurationTest {
Assert.assertEquals("Keep-Alive: timeout=10, max=50000" + Misc.EOL, configuration.getHttpServerConfiguration().getStaticContentProcessorConfiguration().getKeepAliveHeader());
Assert.assertTrue(configuration.getHttpServerConfiguration().allowDeflateBeforeSend());
Assert.assertFalse(configuration.getHttpServerConfiguration().getTextImportProcessorConfiguration().abortBrokenUploads());
Assert.assertEquals(64, configuration.getHttpServerConfiguration().getDispatcherConfiguration().getActiveConnectionLimit());
Assert.assertEquals(2048, configuration.getHttpServerConfiguration().getDispatcherConfiguration().getEventCapacity());
Assert.assertEquals(64, configuration.getHttpServerConfiguration().getDispatcherConfiguration().getIOQueueCapacity());
......
......@@ -56,6 +56,48 @@ public class IODispatcherTest {
@Rule
public TemporaryFolder temp = new TemporaryFolder();
private static void assertDownloadResponse(long fd, Rnd rnd, long buffer, int len, int nonRepeatedContentLength, String expectedResponseHeader, long expectedResponseLen) {
int expectedHeaderLen = expectedResponseHeader.length();
int headerCheckRemaining = expectedResponseHeader.length();
long downloadedSoFar = 0;
int contentRemaining = 0;
while (downloadedSoFar < expectedResponseLen) {
int contentOffset = 0;
int n = Net.recv(fd, buffer, len);
Assert.assertTrue(n > -1);
if (n > 0) {
if (headerCheckRemaining > 0) {
for (int i = 0; i < n && headerCheckRemaining > 0; i++) {
if (expectedResponseHeader.charAt(expectedHeaderLen - headerCheckRemaining) != (char) Unsafe.getUnsafe().getByte(buffer + i)) {
Assert.fail("at " + (expectedHeaderLen - headerCheckRemaining));
}
headerCheckRemaining--;
contentOffset++;
}
}
if (headerCheckRemaining == 0) {
for (int i = contentOffset; i < n; i++) {
if (contentRemaining == 0) {
contentRemaining = nonRepeatedContentLength;
rnd.reset();
}
Assert.assertEquals(rnd.nextByte(), Unsafe.getUnsafe().getByte(buffer + i));
contentRemaining--;
}
}
downloadedSoFar += n;
}
}
}
private static void sendRequest(String request, long fd, long buffer) {
final int requestLen = request.length();
Chars.strcpy(request, requestLen, buffer);
Assert.assertEquals(requestLen, Net.send(fd, buffer, requestLen));
}
@Before
public void setUp() throws Exception {
temp.create();
......@@ -359,10 +401,7 @@ public class IODispatcherTest {
@Override
public HttpRequestProcessor newInstance() {
return new TextImportProcessor(
httpConfiguration.getTextImportProcessorConfiguration(),
engine
);
return new TextImportProcessor(engine);
}
});
......@@ -630,6 +669,71 @@ public class IODispatcherTest {
);
}
@Test
public void testImportBadJson() throws Exception {
testImport(
"HTTP/1.1 400 Bad request\r\n" +
"Server: questDB/1.0\r\n" +
"Date: Thu, 1 Jan 1970 00:00:00 GMT\r\n" +
"Transfer-Encoding: chunked\r\n" +
"Content-Type: application/json; charset=utf-8\r\n" +
"\r\n" +
"1e\r\n" +
"{\"status\":\"Unexpected symbol\"}\r\n" +
"00\r\n" +
"\r\n",
"POST /upload?fmt=json&overwrite=true&forceHeader=true&name=clipboard-157200856 HTTP/1.1\r\n" +
"Host: localhost:9001\r\n" +
"Connection: keep-alive\r\n" +
"Content-Length: 832\r\n" +
"Accept: */*\r\n" +
"Origin: http://localhost:9000\r\n" +
"X-Requested-With: XMLHttpRequest\r\n" +
"User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36\r\n" +
"Sec-Fetch-Mode: cors\r\n" +
"Content-Type: multipart/form-data; boundary=----WebKitFormBoundaryOsOAD9cPKyHuxyBV\r\n" +
"Sec-Fetch-Site: same-origin\r\n" +
"Referer: http://localhost:9000/index.html\r\n" +
"Accept-Encoding: gzip, deflate, br\r\n" +
"Accept-Language: en-GB,en-US;q=0.9,en;q=0.8\r\n" +
"\r\n" +
"------WebKitFormBoundaryOsOAD9cPKyHuxyBV\r\n" +
"Content-Disposition: form-data; name=\"schema\"\r\n" +
"\r\n" +
"[{\"name\":\"timestamp,\"type\":\"DATE\"},{\"name\":\"bid\",\"type\":\"INT\"}]\r\n" +
"------WebKitFormBoundaryOsOAD9cPKyHuxyBV\r\n" +
"Content-Disposition: form-data; name=\"data\"\r\n" +
"\r\n" +
"timestamp,bid\r\n" +
"27/05/2018 00:00:01,100\r\n" +
"27/05/2018 00:00:02,101\r\n" +
"27/05/2018 00:00:03,102\r\n" +
"27/05/2018 00:00:04,103\r\n" +
"27/05/2018 00:00:05,104\r\n" +
"27/05/2018 00:00:06,105\r\n" +
"27/05/2018 00:00:07,106\r\n" +
"27/05/2018 00:00:08,107\r\n" +
"27/05/2018 00:00:09,108\r\n" +
"27/05/2018 00:00:10,109\r\n" +
"27/05/2018 00:00:11,110\r\n" +
"27/05/2018 00:00:12,111\r\n" +
"27/05/2018 00:00:13,112\r\n" +
"27/05/2018 00:00:14,113\r\n" +
"27/05/2018 00:00:15,114\r\n" +
"27/05/2018 00:00:16,115\r\n" +
"27/05/2018 00:00:17,116\r\n" +
"27/05/2018 00:00:18,117\r\n" +
"27/05/2018 00:00:19,118\r\n" +
"27/05/2018 00:00:20,119\r\n" +
"27/05/2018 00:00:21,120\r\n" +
"\r\n" +
"------WebKitFormBoundaryOsOAD9cPKyHuxyBV--",
NetworkFacadeImpl.INSTANCE,
true,
1
);
}
@Test
public void testImportForceUnknownTimestamp() throws Exception {
testImport(
......@@ -1014,10 +1118,7 @@ public class IODispatcherTest {
@Override
public HttpRequestProcessor newInstance() {
return new TextImportProcessor(
httpConfiguration.getTextImportProcessorConfiguration(),
engine
);
return new TextImportProcessor(engine);
}
});
......@@ -3319,10 +3420,7 @@ public class IODispatcherTest {
@Override
public HttpRequestProcessor newInstance() {
return new TextImportProcessor(
httpConfiguration.getTextImportProcessorConfiguration(),
engine
);
return new TextImportProcessor(engine);
}
});
......@@ -3349,48 +3447,6 @@ public class IODispatcherTest {
});
}
private static void assertDownloadResponse(long fd, Rnd rnd, long buffer, int len, int nonRepeatedContentLength, String expectedResponseHeader, long expectedResponseLen) {
int expectedHeaderLen = expectedResponseHeader.length();
int headerCheckRemaining = expectedResponseHeader.length();
long downloadedSoFar = 0;
int contentRemaining = 0;
while (downloadedSoFar < expectedResponseLen) {
int contentOffset = 0;
int n = Net.recv(fd, buffer, len);
Assert.assertTrue(n > -1);
if (n > 0) {
if (headerCheckRemaining > 0) {
for (int i = 0; i < n && headerCheckRemaining > 0; i++) {
if (expectedResponseHeader.charAt(expectedHeaderLen - headerCheckRemaining) != (char) Unsafe.getUnsafe().getByte(buffer + i)) {
Assert.fail("at " + (expectedHeaderLen - headerCheckRemaining));
}
headerCheckRemaining--;
contentOffset++;
}
}
if (headerCheckRemaining == 0) {
for (int i = contentOffset; i < n; i++) {
if (contentRemaining == 0) {
contentRemaining = nonRepeatedContentLength;
rnd.reset();
}
Assert.assertEquals(rnd.nextByte(), Unsafe.getUnsafe().getByte(buffer + i));
contentRemaining--;
}
}
downloadedSoFar += n;
}
}
}
private static void sendRequest(String request, long fd, long buffer) {
final int requestLen = request.length();
Chars.strcpy(request, requestLen, buffer);
Assert.assertEquals(requestLen, Net.send(fd, buffer, requestLen));
}
@NotNull
private DefaultHttpServerConfiguration createHttpServerConfiguration(
String baseDir,
......
......@@ -21,7 +21,6 @@ http.net.listen.backlog=64
http.net.snd.buf.size=4m
http.net.rcv.buf.size=8m
http.text.abort.broken.uploads=false
http.text.adapter.set.config=/loader.json
http.text.date.adapter.pool.capacity=32
http.text.json.cache.limit=64k
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册