未验证 提交 5d6720a7 编写于 作者: V Vlad Ilyushchenko 提交者: GitHub

chore(http): refactored http tests to improve stability (#1008)

上级 1aefed87
......@@ -63,6 +63,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable, Retr
private int nCompletedRequests;
private long totalBytesSent;
private int receivedBytes;
private final Runnable onPeerDisconnect;
public HttpConnectionContext(HttpContextConfiguration configuration) {
this.nf = configuration.getNetworkFacade();
......@@ -78,6 +79,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable, Retr
this.allowDeflateBeforeSend = configuration.allowDeflateBeforeSend();
cairoSecurityContext = new CairoSecurityContextImpl(!configuration.readOnlySecurityContext());
this.serverKeepAlive = configuration.getServerKeepAlive();
this.onPeerDisconnect = configuration.onPeerDisconnect();
}
@Override
......@@ -300,7 +302,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable, Retr
while (true) {
final int n = nf.recv(fd, buf, bufRemaining);
if (n < 0) {
dispatcher.disconnect(this);
handlePeerDisconnect();
break;
}
......@@ -442,7 +444,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable, Retr
pendingRetry = true;
return false;
} catch (PeerDisconnectedException ignore) {
dispatcher.disconnect(this);
handlePeerDisconnect();
} catch (PeerIsSlowToReadException e2) {
LOG.info().$("peer is slow on running the rerun [fd=").$(fd).$(", thread=")
.$(Thread.currentThread().getId()).$(']').$();
......@@ -478,7 +480,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable, Retr
LOG.debug().$("peer is slow reader").$();
dispatcher.registerChannel(this, IOOperation.WRITE);
} catch (PeerDisconnectedException ignore) {
dispatcher.disconnect(this);
handlePeerDisconnect();
} catch (ServerDisconnectException ignore) {
LOG.info().$("kicked out [fd=").$(fd).$(']').$();
dispatcher.disconnect(this);
......@@ -524,7 +526,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable, Retr
.$(", errno=").$(nf.errno())
.$(']').$();
// peer disconnect
dispatcher.disconnect(this);
handlePeerDisconnect();
return false;
}
......@@ -571,7 +573,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable, Retr
if (read != 0) {
dumpBuffer(recvBuffer, read);
LOG.info().$("disconnect after request [fd=").$(fd).$(']').$();
dispatcher.disconnect(this);
handlePeerDisconnect();
busyRecv = false;
} else {
processor.onHeadersReady(this);
......@@ -586,7 +588,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable, Retr
scheduleRetry(processor, rescheduleContext);
busyRecv = false;
} catch (PeerDisconnectedException e) {
dispatcher.disconnect(this);
handlePeerDisconnect();
busyRecv = false;
} catch (ServerDisconnectException e) {
LOG.info().$("kicked out [fd=").$(fd).$(']').$();
......@@ -626,7 +628,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable, Retr
try {
doFail(e, processor);
} catch (PeerDisconnectedException peerDisconnectedException) {
dispatcher.disconnect(this);
handlePeerDisconnect();
} catch (PeerIsSlowToReadException peerIsSlowToReadException) {
LOG.info().$("peer is slow to receive failed to retry response [fd=").$(fd).$(']').$();
processor.parkRequest(this);
......@@ -638,6 +640,11 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable, Retr
}
}
private void handlePeerDisconnect() {
dispatcher.disconnect(this);
onPeerDisconnect.run();
}
private void doFail(HttpException e, HttpRequestProcessor processor) throws
PeerIsSlowToReadException, PeerDisconnectedException, ServerDisconnectException {
LOG.info().$("failing client query with: ").$(e.getMessage()).$();
......
......@@ -29,6 +29,9 @@ import io.questdb.std.datetime.millitime.MillisecondClock;
public interface HttpContextConfiguration {
Runnable NONE= () -> {
};
boolean allowDeflateBeforeSend();
MillisecondClock getClock();
......@@ -56,4 +59,8 @@ public interface HttpContextConfiguration {
boolean getServerKeepAlive();
boolean readOnlySecurityContext();
default Runnable onPeerDisconnect(){
return NONE;
}
}
......@@ -50,6 +50,7 @@ public class HttpServerConfigurationBuilder {
private int rerunProcessingQueueSize = 4096;
private int receiveBufferSize = 1024 * 1024;
private long multipartIdleSpinCount = -1;
private Runnable onPeerDisconnect = HttpContextConfiguration.NONE;
public HttpServerConfigurationBuilder withNetwork(NetworkFacade nf) {
this.nf = nf;
......@@ -96,6 +97,11 @@ public class HttpServerConfigurationBuilder {
return this;
}
public HttpServerConfigurationBuilder withOnPeerDisconnect(Runnable runnable) {
this.onPeerDisconnect = runnable;
return this;
}
public HttpServerConfigurationBuilder withReceiveBufferSize(int receiveBufferSize) {
this.receiveBufferSize = receiveBufferSize;
return this;
......@@ -258,6 +264,11 @@ public class HttpServerConfigurationBuilder {
public String getHttpVersion() {
return httpProtocolVersion;
}
@Override
public Runnable onPeerDisconnect() {
return onPeerDisconnect;
}
};
}
......
......@@ -26,7 +26,6 @@ package io.questdb.cutlass.http;
import io.questdb.cairo.*;
import io.questdb.cairo.security.AllowAllCairoSecurityContext;
import io.questdb.cairo.sql.Record;
import io.questdb.cutlass.NetUtils;
import io.questdb.cutlass.http.processors.JsonQueryProcessor;
import io.questdb.cutlass.http.processors.QueryCache;
......@@ -57,7 +56,6 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.BufferedInputStream;
import java.io.InputStream;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
......@@ -1269,7 +1267,7 @@ public class IODispatcherTest {
public void testImportMultipleOnSameConnectionSlow() throws Exception {
assertMemoryLeak(() -> {
final String baseDir = temp.getRoot().getAbsolutePath();
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir, false, false);
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir, false);
final WorkerPool workerPool = new WorkerPool(new WorkerPoolConfiguration() {
@Override
public int[] getWorkerAffinity() {
......@@ -2108,7 +2106,7 @@ public class IODispatcherTest {
public void testJsonQueryDataError() throws Exception {
assertMemoryLeak(() -> {
final String baseDir = temp.getRoot().getAbsolutePath();
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir, false, false);
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir, false);
final WorkerPool workerPool = new WorkerPool(new WorkerPoolConfiguration() {
@Override
public int[] getWorkerAffinity() {
......@@ -3243,7 +3241,7 @@ public class IODispatcherTest {
public void testJsonQuerySyntaxError() throws Exception {
assertMemoryLeak(() -> {
final String baseDir = temp.getRoot().getAbsolutePath();
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir, false, false);
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir, false);
final WorkerPool workerPool = new WorkerPool(new WorkerPoolConfiguration() {
@Override
public int[] getWorkerAffinity() {
......@@ -3630,8 +3628,23 @@ public class IODispatcherTest {
assertMemoryLeak(() -> {
final NetworkFacade nf = NetworkFacadeImpl.INSTANCE;
final String baseDir = temp.getRoot().getAbsolutePath();
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(nf, baseDir, 256,
false, false);
final int tableRowCount = 300_000;
SOCountDownLatch peerDisconnectLatch = new SOCountDownLatch(1);
DefaultHttpServerConfiguration httpConfiguration = new HttpServerConfigurationBuilder()
.withNetwork(nf)
.withBaseDir(baseDir)
.withSendBufferSize(256)
.withDumpingTraffic(false)
.withAllowDeflateBeforeSend(false)
.withServerKeepAlive(true)
.withHttpProtocolVersion("HTTP/1.1 ")
.withOnPeerDisconnect(peerDisconnectLatch::countDown)
.build();
QueryCache.configure(httpConfiguration);
final WorkerPool workerPool = new WorkerPool(new WorkerPoolConfiguration() {
@Override
public int[] getWorkerAffinity() {
......@@ -3648,8 +3661,11 @@ public class IODispatcherTest {
return false;
}
});
try (CairoEngine engine = new CairoEngine(new DefaultCairoConfiguration(baseDir));
HttpServer httpServer = new HttpServer(httpConfiguration, workerPool, false)) {
try (
CairoEngine engine = new CairoEngine(new DefaultCairoConfiguration(baseDir));
HttpServer httpServer = new HttpServer(httpConfiguration, workerPool, false)
) {
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public HttpRequestProcessor newInstance() {
......@@ -3676,7 +3692,6 @@ public class IODispatcherTest {
});
final AtomicBoolean clientClosed = new AtomicBoolean(false);
final AtomicBoolean serverClosed = new AtomicBoolean(false);
final int minClientReceivedBytesBeforeDisconnect = 180;
final AtomicLong refClientFd = new AtomicLong(-1);
HttpClientStateListener clientStateListener = new HttpClientStateListener() {
......@@ -3684,7 +3699,6 @@ public class IODispatcherTest {
@Override
public void onClosed() {
clientClosed.set(true);
}
@Override
......@@ -3696,6 +3710,7 @@ public class IODispatcherTest {
if (fd != -1) {
refClientFd.set(-1);
nf.close(fd);
clientClosed.set(true);
}
}
}
......@@ -3704,11 +3719,14 @@ public class IODispatcherTest {
try {
// create table with all column types
CairoTestUtils.createTestTable(engine.getConfiguration(), 10000, new Rnd(),
new TestRecord.ArrayBinarySequence());
CairoTestUtils.createTestTable(
engine.getConfiguration(),
tableRowCount,
new Rnd(),
new TestRecord.ArrayBinarySequence()
);
// send multipart request to server
final String request = "GET /query?query=select+distinct+a+from+x+where+test_latched_counter() HTTP/1.1\r\n"
+ "Host: localhost:9001\r\n" + "Connection: keep-alive\r\n" + "Cache-Control: max-age=0\r\n"
+ "Upgrade-Insecure-Requests: 1\r\n"
......@@ -3716,22 +3734,7 @@ public class IODispatcherTest {
+ "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3\r\n"
+ "Accept-Encoding: gzip, deflate, br\r\n"
+ "Accept-Language: en-GB,en-US;q=0.9,en;q=0.8\r\n" + "\r\n";
TestLatchedCounterFunctionFactory.reset(new TestLatchedCounterFunctionFactory.Callback() {
@Override
public boolean onGet(Record record, int count) {
if (count == 4) {
while (!clientClosed.get()) {
LockSupport.parkNanos(1);
}
}
return true;
}
@Override
public void onClose() {
serverClosed.set(true);
}
});
long fd = nf.socketTcp(true);
try {
long sockAddr = nf.sockaddr("127.0.0.1", 9001);
......@@ -3760,10 +3763,10 @@ public class IODispatcherTest {
} finally {
LOG.info().$("Closing client connection").$();
}
while (!serverClosed.get()) {
LockSupport.parkNanos(1);
}
Assert.assertEquals(6, TestLatchedCounterFunctionFactory.getCount());
peerDisconnectLatch.await();
// depending on how quick the CI hardware is we may end up processing different
// number of rows before query is interrupted
Assert.assertTrue(tableRowCount > TestLatchedCounterFunctionFactory.getCount());
} finally {
workerPool.halt();
}
......@@ -4177,7 +4180,7 @@ public class IODispatcherTest {
public void testSCPConnectDownloadDisconnect() throws Exception {
assertMemoryLeak(() -> {
final String baseDir = temp.getRoot().getAbsolutePath();
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir, false, false);
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir, false);
final WorkerPool workerPool = new WorkerPool(new WorkerPoolConfiguration() {
@Override
public int[] getWorkerAffinity() {
......@@ -4215,7 +4218,7 @@ public class IODispatcherTest {
Rnd rnd = new Rnd();
final int diskBufferLen = 1024 * 1024;
writeRandomFile(path, rnd, 122222212222L, diskBufferLen);
writeRandomFile(path, rnd, 122222212222L);
// httpServer.getStartedLatch().await();
......@@ -4348,7 +4351,7 @@ public class IODispatcherTest {
public void testSCPFullDownload() throws Exception {
assertMemoryLeak(() -> {
final String baseDir = temp.getRoot().getAbsolutePath();
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir, false, false);
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir, false);
final WorkerPool workerPool = new WorkerPool(new WorkerPoolConfiguration() {
@Override
public int[] getWorkerAffinity() {
......@@ -4386,7 +4389,7 @@ public class IODispatcherTest {
Rnd rnd = new Rnd();
final int diskBufferLen = 1024 * 1024;
writeRandomFile(path, rnd, 122299092L, diskBufferLen);
writeRandomFile(path, rnd, 122299092L);
long fd = Net.socketTcp(true);
try {
......@@ -4543,7 +4546,7 @@ public class IODispatcherTest {
Rnd rnd = new Rnd();
final int diskBufferLen = 1024 * 1024;
writeRandomFile(path, rnd, 122222212222L, diskBufferLen);
writeRandomFile(path, rnd, 122222212222L);
// httpServer.getStartedLatch().await();
......@@ -5297,6 +5300,7 @@ public class IODispatcherTest {
final NetworkFacade nf = NetworkFacadeImpl.INSTANCE;
final AtomicInteger requestsReceived = new AtomicInteger();
final AtomicBoolean finished = new AtomicBoolean(false);
final SOCountDownLatch senderHalt = new SOCountDownLatch(senderCount);
try (IODispatcher<HttpConnectionContext> dispatcher = IODispatchers.create(
new DefaultIODispatcherConfiguration(),
(fd, dispatcher1) -> new HttpConnectionContext(httpServerConfiguration.getHttpContextConfiguration()).of(fd, dispatcher1)
......@@ -5310,7 +5314,7 @@ public class IODispatcherTest {
AtomicBoolean serverRunning = new AtomicBoolean(true);
CountDownLatch serverHaltLatch = new CountDownLatch(serverThreadCount);
SOCountDownLatch serverHaltLatch = new SOCountDownLatch(serverThreadCount);
for (int j = 0; j < serverThreadCount; j++) {
new Thread(() -> {
final StringSink sink = new StringSink();
......@@ -5414,6 +5418,7 @@ public class IODispatcherTest {
} finally {
completedCount.incrementAndGet();
Net.freeSockAddr(sockAddr);
senderHalt.countDown();
}
}).start();
}
......@@ -5439,6 +5444,7 @@ public class IODispatcherTest {
serverHaltLatch.await();
} finally {
finished.set(true);
senderHalt.await();
}
Assert.assertEquals(N * senderCount, requestsReceived.get());
});
......@@ -5542,15 +5548,14 @@ public class IODispatcherTest {
@NotNull
private DefaultHttpServerConfiguration createHttpServerConfiguration(
String baseDir,
boolean dumpTraffic,
boolean allowDeflateBeforeSend
boolean dumpTraffic
) {
return createHttpServerConfiguration(
NetworkFacadeImpl.INSTANCE,
baseDir,
1024 * 1024,
dumpTraffic,
allowDeflateBeforeSend
false
);
}
......@@ -5562,7 +5567,15 @@ public class IODispatcherTest {
boolean dumpTraffic,
boolean allowDeflateBeforeSend
) {
return createHttpServerConfiguration(nf, baseDir, sendBufferSize, dumpTraffic, allowDeflateBeforeSend, true, "HTTP/1.1 ");
return createHttpServerConfiguration(
nf,
baseDir,
sendBufferSize,
dumpTraffic,
allowDeflateBeforeSend,
true,
"HTTP/1.1 "
);
}
@NotNull
......@@ -5633,24 +5646,24 @@ public class IODispatcherTest {
.run(code);
}
private void writeRandomFile(Path path, Rnd rnd, long lastModified, int bufLen) {
private void writeRandomFile(Path path, Rnd rnd, long lastModified) {
if (Files.exists(path)) {
Assert.assertTrue(Files.remove(path));
}
long fd = Files.openAppend(path);
long buf = Unsafe.malloc(bufLen); // 1Mb buffer
for (int i = 0; i < bufLen; i++) {
long buf = Unsafe.malloc(1048576); // 1Mb buffer
for (int i = 0; i < 1048576; i++) {
Unsafe.getUnsafe().putByte(buf + i, rnd.nextByte());
}
for (int i = 0; i < 20; i++) {
Assert.assertEquals(bufLen, Files.append(fd, buf, bufLen));
Assert.assertEquals(1048576, Files.append(fd, buf, 1048576));
}
Files.close(fd);
Files.setLastModified(path, lastModified);
Unsafe.free(buf, bufLen);
Unsafe.free(buf, 1048576);
}
private static class QueryThread extends Thread {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册