From 5d6720a71318339ab0f2af74433c65801542e608 Mon Sep 17 00:00:00 2001 From: Vlad Ilyushchenko Date: Wed, 12 May 2021 16:52:08 +0100 Subject: [PATCH] chore(http): refactored http tests to improve stability (#1008) --- .../cutlass/http/HttpConnectionContext.java | 21 ++-- .../http/HttpContextConfiguration.java | 7 ++ .../http/HttpServerConfigurationBuilder.java | 11 ++ .../cutlass/http/IODispatcherTest.java | 109 ++++++++++-------- 4 files changed, 93 insertions(+), 55 deletions(-) diff --git a/core/src/main/java/io/questdb/cutlass/http/HttpConnectionContext.java b/core/src/main/java/io/questdb/cutlass/http/HttpConnectionContext.java index 84b84356b..014f6fb05 100644 --- a/core/src/main/java/io/questdb/cutlass/http/HttpConnectionContext.java +++ b/core/src/main/java/io/questdb/cutlass/http/HttpConnectionContext.java @@ -63,6 +63,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable, Retr private int nCompletedRequests; private long totalBytesSent; private int receivedBytes; + private final Runnable onPeerDisconnect; public HttpConnectionContext(HttpContextConfiguration configuration) { this.nf = configuration.getNetworkFacade(); @@ -78,6 +79,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable, Retr this.allowDeflateBeforeSend = configuration.allowDeflateBeforeSend(); cairoSecurityContext = new CairoSecurityContextImpl(!configuration.readOnlySecurityContext()); this.serverKeepAlive = configuration.getServerKeepAlive(); + this.onPeerDisconnect = configuration.onPeerDisconnect(); } @Override @@ -300,7 +302,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable, Retr while (true) { final int n = nf.recv(fd, buf, bufRemaining); if (n < 0) { - dispatcher.disconnect(this); + handlePeerDisconnect(); break; } @@ -442,7 +444,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable, Retr pendingRetry = true; return false; } catch (PeerDisconnectedException ignore) { - dispatcher.disconnect(this); + handlePeerDisconnect(); } catch (PeerIsSlowToReadException e2) { LOG.info().$("peer is slow on running the rerun [fd=").$(fd).$(", thread=") .$(Thread.currentThread().getId()).$(']').$(); @@ -478,7 +480,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable, Retr LOG.debug().$("peer is slow reader").$(); dispatcher.registerChannel(this, IOOperation.WRITE); } catch (PeerDisconnectedException ignore) { - dispatcher.disconnect(this); + handlePeerDisconnect(); } catch (ServerDisconnectException ignore) { LOG.info().$("kicked out [fd=").$(fd).$(']').$(); dispatcher.disconnect(this); @@ -524,7 +526,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable, Retr .$(", errno=").$(nf.errno()) .$(']').$(); // peer disconnect - dispatcher.disconnect(this); + handlePeerDisconnect(); return false; } @@ -571,7 +573,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable, Retr if (read != 0) { dumpBuffer(recvBuffer, read); LOG.info().$("disconnect after request [fd=").$(fd).$(']').$(); - dispatcher.disconnect(this); + handlePeerDisconnect(); busyRecv = false; } else { processor.onHeadersReady(this); @@ -586,7 +588,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable, Retr scheduleRetry(processor, rescheduleContext); busyRecv = false; } catch (PeerDisconnectedException e) { - dispatcher.disconnect(this); + handlePeerDisconnect(); busyRecv = false; } catch (ServerDisconnectException e) { LOG.info().$("kicked out [fd=").$(fd).$(']').$(); @@ -626,7 +628,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable, Retr try { doFail(e, processor); } catch (PeerDisconnectedException peerDisconnectedException) { - dispatcher.disconnect(this); + handlePeerDisconnect(); } catch (PeerIsSlowToReadException peerIsSlowToReadException) { LOG.info().$("peer is slow to receive failed to retry response [fd=").$(fd).$(']').$(); processor.parkRequest(this); @@ -638,6 +640,11 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable, Retr } } + private void handlePeerDisconnect() { + dispatcher.disconnect(this); + onPeerDisconnect.run(); + } + private void doFail(HttpException e, HttpRequestProcessor processor) throws PeerIsSlowToReadException, PeerDisconnectedException, ServerDisconnectException { LOG.info().$("failing client query with: ").$(e.getMessage()).$(); diff --git a/core/src/main/java/io/questdb/cutlass/http/HttpContextConfiguration.java b/core/src/main/java/io/questdb/cutlass/http/HttpContextConfiguration.java index 8a3a75b79..411ba0a0c 100644 --- a/core/src/main/java/io/questdb/cutlass/http/HttpContextConfiguration.java +++ b/core/src/main/java/io/questdb/cutlass/http/HttpContextConfiguration.java @@ -29,6 +29,9 @@ import io.questdb.std.datetime.millitime.MillisecondClock; public interface HttpContextConfiguration { + Runnable NONE= () -> { + }; + boolean allowDeflateBeforeSend(); MillisecondClock getClock(); @@ -56,4 +59,8 @@ public interface HttpContextConfiguration { boolean getServerKeepAlive(); boolean readOnlySecurityContext(); + + default Runnable onPeerDisconnect(){ + return NONE; + } } diff --git a/core/src/test/java/io/questdb/cutlass/http/HttpServerConfigurationBuilder.java b/core/src/test/java/io/questdb/cutlass/http/HttpServerConfigurationBuilder.java index de79cf00d..84c725dbe 100644 --- a/core/src/test/java/io/questdb/cutlass/http/HttpServerConfigurationBuilder.java +++ b/core/src/test/java/io/questdb/cutlass/http/HttpServerConfigurationBuilder.java @@ -50,6 +50,7 @@ public class HttpServerConfigurationBuilder { private int rerunProcessingQueueSize = 4096; private int receiveBufferSize = 1024 * 1024; private long multipartIdleSpinCount = -1; + private Runnable onPeerDisconnect = HttpContextConfiguration.NONE; public HttpServerConfigurationBuilder withNetwork(NetworkFacade nf) { this.nf = nf; @@ -96,6 +97,11 @@ public class HttpServerConfigurationBuilder { return this; } + public HttpServerConfigurationBuilder withOnPeerDisconnect(Runnable runnable) { + this.onPeerDisconnect = runnable; + return this; + } + public HttpServerConfigurationBuilder withReceiveBufferSize(int receiveBufferSize) { this.receiveBufferSize = receiveBufferSize; return this; @@ -258,6 +264,11 @@ public class HttpServerConfigurationBuilder { public String getHttpVersion() { return httpProtocolVersion; } + + @Override + public Runnable onPeerDisconnect() { + return onPeerDisconnect; + } }; } diff --git a/core/src/test/java/io/questdb/cutlass/http/IODispatcherTest.java b/core/src/test/java/io/questdb/cutlass/http/IODispatcherTest.java index aaa766079..8746e6702 100644 --- a/core/src/test/java/io/questdb/cutlass/http/IODispatcherTest.java +++ b/core/src/test/java/io/questdb/cutlass/http/IODispatcherTest.java @@ -26,7 +26,6 @@ package io.questdb.cutlass.http; import io.questdb.cairo.*; import io.questdb.cairo.security.AllowAllCairoSecurityContext; -import io.questdb.cairo.sql.Record; import io.questdb.cutlass.NetUtils; import io.questdb.cutlass.http.processors.JsonQueryProcessor; import io.questdb.cutlass.http.processors.QueryCache; @@ -57,7 +56,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import java.io.BufferedInputStream; import java.io.InputStream; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; @@ -1269,7 +1267,7 @@ public class IODispatcherTest { public void testImportMultipleOnSameConnectionSlow() throws Exception { assertMemoryLeak(() -> { final String baseDir = temp.getRoot().getAbsolutePath(); - final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir, false, false); + final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir, false); final WorkerPool workerPool = new WorkerPool(new WorkerPoolConfiguration() { @Override public int[] getWorkerAffinity() { @@ -2108,7 +2106,7 @@ public class IODispatcherTest { public void testJsonQueryDataError() throws Exception { assertMemoryLeak(() -> { final String baseDir = temp.getRoot().getAbsolutePath(); - final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir, false, false); + final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir, false); final WorkerPool workerPool = new WorkerPool(new WorkerPoolConfiguration() { @Override public int[] getWorkerAffinity() { @@ -3243,7 +3241,7 @@ public class IODispatcherTest { public void testJsonQuerySyntaxError() throws Exception { assertMemoryLeak(() -> { final String baseDir = temp.getRoot().getAbsolutePath(); - final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir, false, false); + final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir, false); final WorkerPool workerPool = new WorkerPool(new WorkerPoolConfiguration() { @Override public int[] getWorkerAffinity() { @@ -3630,8 +3628,23 @@ public class IODispatcherTest { assertMemoryLeak(() -> { final NetworkFacade nf = NetworkFacadeImpl.INSTANCE; final String baseDir = temp.getRoot().getAbsolutePath(); - final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(nf, baseDir, 256, - false, false); + final int tableRowCount = 300_000; + + SOCountDownLatch peerDisconnectLatch = new SOCountDownLatch(1); + + DefaultHttpServerConfiguration httpConfiguration = new HttpServerConfigurationBuilder() + .withNetwork(nf) + .withBaseDir(baseDir) + .withSendBufferSize(256) + .withDumpingTraffic(false) + .withAllowDeflateBeforeSend(false) + .withServerKeepAlive(true) + .withHttpProtocolVersion("HTTP/1.1 ") + .withOnPeerDisconnect(peerDisconnectLatch::countDown) + .build(); + QueryCache.configure(httpConfiguration); + + final WorkerPool workerPool = new WorkerPool(new WorkerPoolConfiguration() { @Override public int[] getWorkerAffinity() { @@ -3648,8 +3661,11 @@ public class IODispatcherTest { return false; } }); - try (CairoEngine engine = new CairoEngine(new DefaultCairoConfiguration(baseDir)); - HttpServer httpServer = new HttpServer(httpConfiguration, workerPool, false)) { + + try ( + CairoEngine engine = new CairoEngine(new DefaultCairoConfiguration(baseDir)); + HttpServer httpServer = new HttpServer(httpConfiguration, workerPool, false) + ) { httpServer.bind(new HttpRequestProcessorFactory() { @Override public HttpRequestProcessor newInstance() { @@ -3676,7 +3692,6 @@ public class IODispatcherTest { }); final AtomicBoolean clientClosed = new AtomicBoolean(false); - final AtomicBoolean serverClosed = new AtomicBoolean(false); final int minClientReceivedBytesBeforeDisconnect = 180; final AtomicLong refClientFd = new AtomicLong(-1); HttpClientStateListener clientStateListener = new HttpClientStateListener() { @@ -3684,7 +3699,6 @@ public class IODispatcherTest { @Override public void onClosed() { - clientClosed.set(true); } @Override @@ -3696,6 +3710,7 @@ public class IODispatcherTest { if (fd != -1) { refClientFd.set(-1); nf.close(fd); + clientClosed.set(true); } } } @@ -3704,11 +3719,14 @@ public class IODispatcherTest { try { // create table with all column types - CairoTestUtils.createTestTable(engine.getConfiguration(), 10000, new Rnd(), - new TestRecord.ArrayBinarySequence()); + CairoTestUtils.createTestTable( + engine.getConfiguration(), + tableRowCount, + new Rnd(), + new TestRecord.ArrayBinarySequence() + ); // send multipart request to server - final String request = "GET /query?query=select+distinct+a+from+x+where+test_latched_counter() HTTP/1.1\r\n" + "Host: localhost:9001\r\n" + "Connection: keep-alive\r\n" + "Cache-Control: max-age=0\r\n" + "Upgrade-Insecure-Requests: 1\r\n" @@ -3716,22 +3734,7 @@ public class IODispatcherTest { + "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3\r\n" + "Accept-Encoding: gzip, deflate, br\r\n" + "Accept-Language: en-GB,en-US;q=0.9,en;q=0.8\r\n" + "\r\n"; - TestLatchedCounterFunctionFactory.reset(new TestLatchedCounterFunctionFactory.Callback() { - @Override - public boolean onGet(Record record, int count) { - if (count == 4) { - while (!clientClosed.get()) { - LockSupport.parkNanos(1); - } - } - return true; - } - @Override - public void onClose() { - serverClosed.set(true); - } - }); long fd = nf.socketTcp(true); try { long sockAddr = nf.sockaddr("127.0.0.1", 9001); @@ -3760,10 +3763,10 @@ public class IODispatcherTest { } finally { LOG.info().$("Closing client connection").$(); } - while (!serverClosed.get()) { - LockSupport.parkNanos(1); - } - Assert.assertEquals(6, TestLatchedCounterFunctionFactory.getCount()); + peerDisconnectLatch.await(); + // depending on how quick the CI hardware is we may end up processing different + // number of rows before query is interrupted + Assert.assertTrue(tableRowCount > TestLatchedCounterFunctionFactory.getCount()); } finally { workerPool.halt(); } @@ -4177,7 +4180,7 @@ public class IODispatcherTest { public void testSCPConnectDownloadDisconnect() throws Exception { assertMemoryLeak(() -> { final String baseDir = temp.getRoot().getAbsolutePath(); - final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir, false, false); + final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir, false); final WorkerPool workerPool = new WorkerPool(new WorkerPoolConfiguration() { @Override public int[] getWorkerAffinity() { @@ -4215,7 +4218,7 @@ public class IODispatcherTest { Rnd rnd = new Rnd(); final int diskBufferLen = 1024 * 1024; - writeRandomFile(path, rnd, 122222212222L, diskBufferLen); + writeRandomFile(path, rnd, 122222212222L); // httpServer.getStartedLatch().await(); @@ -4348,7 +4351,7 @@ public class IODispatcherTest { public void testSCPFullDownload() throws Exception { assertMemoryLeak(() -> { final String baseDir = temp.getRoot().getAbsolutePath(); - final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir, false, false); + final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir, false); final WorkerPool workerPool = new WorkerPool(new WorkerPoolConfiguration() { @Override public int[] getWorkerAffinity() { @@ -4386,7 +4389,7 @@ public class IODispatcherTest { Rnd rnd = new Rnd(); final int diskBufferLen = 1024 * 1024; - writeRandomFile(path, rnd, 122299092L, diskBufferLen); + writeRandomFile(path, rnd, 122299092L); long fd = Net.socketTcp(true); try { @@ -4543,7 +4546,7 @@ public class IODispatcherTest { Rnd rnd = new Rnd(); final int diskBufferLen = 1024 * 1024; - writeRandomFile(path, rnd, 122222212222L, diskBufferLen); + writeRandomFile(path, rnd, 122222212222L); // httpServer.getStartedLatch().await(); @@ -5297,6 +5300,7 @@ public class IODispatcherTest { final NetworkFacade nf = NetworkFacadeImpl.INSTANCE; final AtomicInteger requestsReceived = new AtomicInteger(); final AtomicBoolean finished = new AtomicBoolean(false); + final SOCountDownLatch senderHalt = new SOCountDownLatch(senderCount); try (IODispatcher dispatcher = IODispatchers.create( new DefaultIODispatcherConfiguration(), (fd, dispatcher1) -> new HttpConnectionContext(httpServerConfiguration.getHttpContextConfiguration()).of(fd, dispatcher1) @@ -5310,7 +5314,7 @@ public class IODispatcherTest { AtomicBoolean serverRunning = new AtomicBoolean(true); - CountDownLatch serverHaltLatch = new CountDownLatch(serverThreadCount); + SOCountDownLatch serverHaltLatch = new SOCountDownLatch(serverThreadCount); for (int j = 0; j < serverThreadCount; j++) { new Thread(() -> { final StringSink sink = new StringSink(); @@ -5414,6 +5418,7 @@ public class IODispatcherTest { } finally { completedCount.incrementAndGet(); Net.freeSockAddr(sockAddr); + senderHalt.countDown(); } }).start(); } @@ -5439,6 +5444,7 @@ public class IODispatcherTest { serverHaltLatch.await(); } finally { finished.set(true); + senderHalt.await(); } Assert.assertEquals(N * senderCount, requestsReceived.get()); }); @@ -5542,15 +5548,14 @@ public class IODispatcherTest { @NotNull private DefaultHttpServerConfiguration createHttpServerConfiguration( String baseDir, - boolean dumpTraffic, - boolean allowDeflateBeforeSend + boolean dumpTraffic ) { return createHttpServerConfiguration( NetworkFacadeImpl.INSTANCE, baseDir, 1024 * 1024, dumpTraffic, - allowDeflateBeforeSend + false ); } @@ -5562,7 +5567,15 @@ public class IODispatcherTest { boolean dumpTraffic, boolean allowDeflateBeforeSend ) { - return createHttpServerConfiguration(nf, baseDir, sendBufferSize, dumpTraffic, allowDeflateBeforeSend, true, "HTTP/1.1 "); + return createHttpServerConfiguration( + nf, + baseDir, + sendBufferSize, + dumpTraffic, + allowDeflateBeforeSend, + true, + "HTTP/1.1 " + ); } @NotNull @@ -5633,24 +5646,24 @@ public class IODispatcherTest { .run(code); } - private void writeRandomFile(Path path, Rnd rnd, long lastModified, int bufLen) { + private void writeRandomFile(Path path, Rnd rnd, long lastModified) { if (Files.exists(path)) { Assert.assertTrue(Files.remove(path)); } long fd = Files.openAppend(path); - long buf = Unsafe.malloc(bufLen); // 1Mb buffer - for (int i = 0; i < bufLen; i++) { + long buf = Unsafe.malloc(1048576); // 1Mb buffer + for (int i = 0; i < 1048576; i++) { Unsafe.getUnsafe().putByte(buf + i, rnd.nextByte()); } for (int i = 0; i < 20; i++) { - Assert.assertEquals(bufLen, Files.append(fd, buf, bufLen)); + Assert.assertEquals(1048576, Files.append(fd, buf, 1048576)); } Files.close(fd); Files.setLastModified(path, lastModified); - Unsafe.free(buf, bufLen); + Unsafe.free(buf, 1048576); } private static class QueryThread extends Thread { -- GitLab