提交 8b04a7cf 编写于 作者: V Vlad Ilyushchenko

NET: bugfixes for MimeTypes, which didn't read file correctly on Windows....

NET: bugfixes for MimeTypes, which didn't read file correctly on Windows. HttpServer did not stop properly, which is also fixed. StaticContentProcessor test validates downloaded content and headers. Commented out relic unit test - i will be deleting those as soon as I ported all the features.
上级 5d4f5aee
......@@ -28,6 +28,7 @@ import com.questdb.network.DefaultIODispatcherConfiguration;
import com.questdb.network.IODispatcherConfiguration;
import com.questdb.std.FilesFacade;
import com.questdb.std.FilesFacadeImpl;
import com.questdb.std.Os;
import com.questdb.std.str.Path;
import com.questdb.std.time.MillisecondClock;
import com.questdb.std.time.MillisecondClockImpl;
......@@ -58,7 +59,13 @@ class DefaultHttpServerConfiguration implements HttpServerConfiguration {
};
public DefaultHttpServerConfiguration() {
try (Path path = new Path().of(this.getClass().getResource("/site/conf/mime.types").getFile()).$()) {
String defaultFilePath = this.getClass().getResource("/site/conf/mime.types").getFile();
if (Os.type == Os.WINDOWS) {
// on Windows Java returns "/C:/dir/file". This leading slash is Java specific and doesn't bode well
// with OS file open methods.
defaultFilePath = defaultFilePath.substring(1);
}
try (Path path = new Path().of(defaultFilePath).$()) {
this.mimeTypesCache = new MimeTypesCache(FilesFacadeImpl.INSTANCE, path);
}
}
......
......@@ -44,9 +44,11 @@ public class HttpConnectionContext implements IOContext, Locality {
private final long fd;
private final LocalValueMap localValueMap = new LocalValueMap();
private HttpRequestProcessor resumeProcessor = null;
private final NetworkFacade nf;
public HttpConnectionContext(HttpServerConfiguration configuration, long fd) {
this.configuration = configuration;
this.nf = configuration.getDispatcherConfiguration().getNetworkFacade();
this.csPool = new ObjectPool<>(DirectByteCharSequence.FACTORY, configuration.getConnectionWrapperObjPoolSize());
this.headerParser = new HttpHeaderParser(configuration.getConnectionHeaderBufferSize(), csPool);
this.multipartContentHeaderParser = new HttpHeaderParser(configuration.getConnectionMultipartHeaderBufferSize(), csPool);
......@@ -98,18 +100,21 @@ public class HttpConnectionContext implements IOContext, Locality {
return localValueMap;
}
public void handleClientOperation(int operation, NetworkFacade nf, IODispatcher<HttpConnectionContext> dispatcher, HttpRequestProcessorSelector selector) {
public void handleClientOperation(int operation, IODispatcher<HttpConnectionContext> dispatcher, HttpRequestProcessorSelector selector) {
switch (operation) {
case IOOperation.READ:
handleClientRecv(nf, dispatcher, selector);
handleClientRecv(dispatcher, selector);
break;
case IOOperation.WRITE:
if (resumeProcessor != null) {
try {
responseSink.resume();
resumeProcessor.resume(this);
resumeProcessor = null;
} catch (PeerIsSlowException ignore) {
dispatcher.registerChannel(this, IOOperation.WRITE);
} catch (PeerDisconnectedException ignore) {
dispatcher.disconnect(this, DisconnectReason.PEER);
}
} else {
assert false;
......@@ -125,7 +130,7 @@ public class HttpConnectionContext implements IOContext, Locality {
return responseSink.getSimple();
}
private void checkRemainingInputAndCompleteRequest(NetworkFacade nf, IODispatcher<HttpConnectionContext> dispatcher, long fd, HttpRequestProcessor processor) {
private void checkRemainingInputAndCompleteRequest(IODispatcher<HttpConnectionContext> dispatcher, long fd, HttpRequestProcessor processor) {
int read;// consume and throw away the remainder of TCP input
read = nf.recv(fd, recvBuffer, 1);
if (read != 0) {
......@@ -142,7 +147,6 @@ public class HttpConnectionContext implements IOContext, Locality {
}
private void handleClientRecv(
NetworkFacade nf,
IODispatcher<HttpConnectionContext> dispatcher,
HttpRequestProcessorSelector selector
) {
......@@ -212,7 +216,7 @@ public class HttpConnectionContext implements IOContext, Locality {
}
} while (!multipartContentParser.parse(recvBuffer, recvBuffer + read, multipartListener));
}
checkRemainingInputAndCompleteRequest(nf, dispatcher, fd, processor);
checkRemainingInputAndCompleteRequest(dispatcher, fd, processor);
} else {
// Do not expect any more bytes to be sent to us before
......
......@@ -23,7 +23,9 @@
package com.questdb.cutlass.http;
import com.questdb.network.Net;
import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.network.NetworkFacade;
import com.questdb.std.*;
import com.questdb.std.ex.ZLibException;
import com.questdb.std.str.AbstractCharSink;
......@@ -34,6 +36,8 @@ import java.io.Closeable;
import java.nio.ByteBuffer;
public class HttpResponseSink implements Closeable, Mutable {
private final static Log LOG = LogFactory.getLog(HttpResponseSink.class);
private static final int CHUNK_HEAD = 1;
private static final int CHUNK_DATA = 2;
private static final int FIN = 3;
......@@ -56,6 +60,7 @@ public class HttpResponseSink implements Closeable, Mutable {
private final FixedSizeResponseImpl fixedSize = new FixedSizeResponseImpl();
private final ChunkedResponseImpl chunkedResponse = new ChunkedResponseImpl();
private final DirectBufferResponse directBufferResponse = new DirectBufferResponse();
private final NetworkFacade nf;
private final int responseBufferSize;
private final long fd;
private final HeaderOnlyResponse headerOnlyResponse = new HeaderOnlyResponse();
......@@ -73,6 +78,7 @@ public class HttpResponseSink implements Closeable, Mutable {
public HttpResponseSink(HttpServerConfiguration configuration, long fd) {
this.responseBufferSize = Numbers.ceilPow2(configuration.getConnectionSendBufferSize());
this.nf = configuration.getDispatcherConfiguration().getNetworkFacade();
this.out = Unsafe.calloc(responseBufferSize);
this.headerSink = new HttpResponseHeaderSink(1024, configuration.getClock());
// size is 32bit int, as hex string max 8 bytes
......@@ -187,9 +193,10 @@ public class HttpResponseSink implements Closeable, Mutable {
private void flush() {
int sent = 0;
while (sent < flushBufSize) {
int n = Net.send(fd, flushBuf + sent, flushBufSize - sent);
int n = nf.send(fd, flushBuf + sent, flushBufSize - sent);
if (n < 0) {
// disconnected
LOG.info().$("disconnected [errno=").$(Os.errno()).$(']').$();
throw PeerDisconnectedException.INSTANCE;
}
if (n == 0) {
......
......@@ -26,6 +26,7 @@ package com.questdb.cutlass.http;
import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.mp.Job;
import com.questdb.mp.SOCountDownLatch;
import com.questdb.mp.Worker;
import com.questdb.network.IOContextFactory;
import com.questdb.network.IODispatcher;
......@@ -37,7 +38,6 @@ import com.questdb.std.ObjHashSet;
import com.questdb.std.ObjList;
import java.io.Closeable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
public class HttpServer implements Closeable {
......@@ -45,8 +45,8 @@ public class HttpServer implements Closeable {
private final HttpServerConfiguration configuration;
private final HttpContextFactory httpContextFactory;
private final ObjList<HttpRequestProcessorSelectorImpl> selectors;
private final CountDownLatch workerHaltLatch;
private final CountDownLatch started = new CountDownLatch(1);
private final SOCountDownLatch workerHaltLatch;
private final SOCountDownLatch started = new SOCountDownLatch(1);
private final int workerCount;
private final AtomicBoolean running = new AtomicBoolean();
private final ObjList<Worker> workers;
......@@ -71,7 +71,7 @@ public class HttpServer implements Closeable {
// halt latch that each worker will count down to let main
// thread know that server is done and allow server shutdown
// gracefully
this.workerHaltLatch = new CountDownLatch(workerCount);
this.workerHaltLatch = new SOCountDownLatch(workerCount);
}
public void bind(HttpRequestProcessorFactory factory) {
......@@ -89,20 +89,23 @@ public class HttpServer implements Closeable {
@Override
public void close() {
halt();
Misc.free(dispatcher);
}
public CountDownLatch getStartedLatch() {
public SOCountDownLatch getStartedLatch() {
return started;
}
public void halt() throws InterruptedException {
public void halt() {
if (running.compareAndSet(true, false)) {
LOG.info().$("stopping").$();
started.await();
for (int i = 0; i < workerCount; i++) {
workers.getQuick(i).halt();
}
workerHaltLatch.await();
LOG.info().$("stopped").$();
}
}
......@@ -126,7 +129,7 @@ public class HttpServer implements Closeable {
public boolean run() {
return dispatcher.processIOQueue(
(operation, context, dispatcher1)
-> context.handleClientOperation(operation, nf, dispatcher1, selector)
-> context.handleClientOperation(operation, dispatcher1, selector)
);
}
});
......
......@@ -33,7 +33,6 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.util.Comparator;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class LogFactory implements Closeable {
......@@ -50,7 +49,7 @@ public class LogFactory implements Closeable {
private final CharSequenceObjHashMap<ScopeConfiguration> scopeConfigMap = new CharSequenceObjHashMap<>();
private final ObjList<ScopeConfiguration> scopeConfigs = new ObjList<>();
private final ObjHashSet<LogWriter> jobs = new ObjHashSet<>();
private final CountDownLatch workerHaltLatch = new CountDownLatch(1);
private final SOCountDownLatch workerHaltLatch = new SOCountDownLatch(1);
private final MicrosecondClock clock;
private Worker worker = null;
private boolean configured = false;
......@@ -165,10 +164,7 @@ public class LogFactory implements Closeable {
public void haltThread() {
if (worker != null) {
worker.halt();
try {
workerHaltLatch.await();
} catch (InterruptedException ignore) {
}
worker = null;
}
}
......
......@@ -28,7 +28,6 @@ import com.questdb.std.ObjHashSet;
import com.questdb.std.Os;
import com.questdb.std.Unsafe;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
......@@ -38,18 +37,18 @@ public class Worker extends Thread {
private static final long SLEEP_THRESHOLD = 10000000L;
private final static AtomicInteger COUNTER = new AtomicInteger();
private final ObjHashSet<? extends Job> jobs;
private final CountDownLatch haltLatch;
private final SOCountDownLatch haltLatch;
private final int affinity;
private final Log log;
@SuppressWarnings("FieldCanBeLocal")
private volatile int running = 0;
private volatile int fence;
public Worker(ObjHashSet<? extends Job> jobs, CountDownLatch haltLatch) {
public Worker(ObjHashSet<? extends Job> jobs, SOCountDownLatch haltLatch) {
this(jobs, haltLatch, -1, null);
}
public Worker(ObjHashSet<? extends Job> jobs, CountDownLatch haltLatch, int affinity, Log log) {
public Worker(ObjHashSet<? extends Job> jobs, SOCountDownLatch haltLatch, int affinity, Log log) {
this.log = log;
this.jobs = jobs;
this.haltLatch = haltLatch;
......
......@@ -44,7 +44,7 @@ public class HttpServer {
private static final ObjectFactory<Event<IOContext>> EVENT_FACTORY = Event::new;
private final InetSocketAddress address;
private final ObjList<Worker> workers;
private final CountDownLatch haltLatch;
private final SOCountDownLatch haltLatch;
private final int workerCount;
private final CountDownLatch startComplete = new CountDownLatch(1);
private final UrlMatcher urlMatcher;
......@@ -61,7 +61,7 @@ public class HttpServer {
this.address = new InetSocketAddress(configuration.getHttpIP(), configuration.getHttpPort());
this.urlMatcher = env.matcher;
this.workerCount = configuration.getHttpThreads();
this.haltLatch = new CountDownLatch(workerCount);
this.haltLatch = new SOCountDownLatch(workerCount);
this.workers = new ObjList<>(workerCount);
this.contextFactory = (fd, clock) -> new IOContext(new NetworkChannelImpl(fd), configuration, clock);
}
......
......@@ -50,6 +50,13 @@ public final class Files {
static final AtomicLong OPEN_FILE_COUNT = new AtomicLong();
static {
Os.init();
UTF_8 = Charset.forName("UTF-8");
PAGE_SIZE = getPageSize();
SEPARATOR = Os.type == Os.WINDOWS ? '\\' : '/';
}
private Files() {
} // Prevent construction.
......@@ -292,11 +299,4 @@ public final class Files {
private native static boolean setLastModified(long lpszName, long millis);
private static native boolean rename(long lpszOld, long lpszNew);
static {
Os.init();
UTF_8 = Charset.forName("UTF-8");
PAGE_SIZE = getPageSize();
SEPARATOR = Os.type == Os.WINDOWS ? '\\' : '/';
}
}
......@@ -34,7 +34,7 @@ import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
public class FullFwdDataFrameCursorTest extends AbstractCairoTest {
......@@ -2229,13 +2229,13 @@ public class FullFwdDataFrameCursorTest extends AbstractCairoTest {
final static class MyWorkScheduler implements CairoWorkScheduler {
private final int nWorkers = 2;
private final CountDownLatch workerHaltLatch = new CountDownLatch(nWorkers);
private final SOCountDownLatch workerHaltLatch = new SOCountDownLatch(nWorkers);
private final Worker[] workers = new Worker[nWorkers];
private final RingQueue<ColumnIndexerEntry> queue = new RingQueue<>(ColumnIndexerEntry::new, 1024);
private final Sequence pubSeq;
private final Sequence subSeq;
private final ObjHashSet<Job> jobs = new ObjHashSet<>();
private boolean active = false;
private final AtomicBoolean active = new AtomicBoolean(false);
public MyWorkScheduler(Sequence pubSequence, Sequence subSequence) {
this.pubSeq = pubSequence;
......@@ -2266,8 +2266,8 @@ public class FullFwdDataFrameCursorTest extends AbstractCairoTest {
return subSeq;
}
void halt() throws InterruptedException {
if (active) {
void halt() {
if (active.compareAndSet(true, false)) {
for (int i = 0; i < nWorkers; i++) {
workers[i].halt();
}
......@@ -2276,12 +2276,13 @@ public class FullFwdDataFrameCursorTest extends AbstractCairoTest {
}
void start() {
pubSeq.then(subSeq).then(pubSeq);
for (int i = 0; i < nWorkers; i++) {
workers[i] = new Worker(jobs, workerHaltLatch);
workers[i].start();
if (active.compareAndSet(false, true)) {
pubSeq.then(subSeq).then(pubSeq);
for (int i = 0; i < nWorkers; i++) {
workers[i] = new Worker(jobs, workerHaltLatch);
workers[i].start();
}
}
active = true;
}
}
}
\ No newline at end of file
......@@ -132,7 +132,6 @@ public class IODispatcherTest {
TestUtils.assertMemoryLeak(() -> {
HttpServerConfiguration httpServerConfiguration = new DefaultHttpServerConfiguration();
NetworkFacade nf = NetworkFacadeImpl.INSTANCE;
SOCountDownLatch connectLatch = new SOCountDownLatch(1);
SOCountDownLatch contextClosedLatch = new SOCountDownLatch(1);
AtomicInteger closeCount = new AtomicInteger(0);
......@@ -187,7 +186,7 @@ public class IODispatcherTest {
while (serverRunning.get()) {
dispatcher.run();
dispatcher.processIOQueue(
(operation, context, disp) -> context.handleClientOperation(operation, nf, disp, selector)
(operation, context, disp) -> context.handleClientOperation(operation, disp, selector)
);
}
serverHaltLatch.countDown();
......@@ -234,7 +233,6 @@ public class IODispatcherTest {
int N = 200;
NetworkFacade nf = NetworkFacadeImpl.INSTANCE;
AtomicInteger openCount = new AtomicInteger(0);
AtomicInteger closeCount = new AtomicInteger(0);
......@@ -289,7 +287,7 @@ public class IODispatcherTest {
do {
dispatcher.run();
dispatcher.processIOQueue(
(operation, context, disp) -> context.handleClientOperation(operation, nf, disp, selector)
(operation, context, disp) -> context.handleClientOperation(operation, disp, selector)
);
} while (serverRunning.get());
serverHaltLatch.countDown();
......@@ -349,7 +347,6 @@ public class IODispatcherTest {
TestUtils.assertMemoryLeak(() -> {
HttpServerConfiguration httpServerConfiguration = new DefaultHttpServerConfiguration();
NetworkFacade nf = NetworkFacadeImpl.INSTANCE;
SOCountDownLatch connectLatch = new SOCountDownLatch(1);
SOCountDownLatch contextClosedLatch = new SOCountDownLatch(1);
SOCountDownLatch requestReceivedLatch = new SOCountDownLatch(1);
......@@ -419,7 +416,7 @@ public class IODispatcherTest {
while (serverRunning.get()) {
dispatcher.run();
dispatcher.processIOQueue(
(operation, context, disp) -> context.handleClientOperation(operation, nf, disp, selector)
(operation, context, disp) -> context.handleClientOperation(operation, disp, selector)
);
}
serverHaltLatch.countDown();
......@@ -516,7 +513,6 @@ public class IODispatcherTest {
}
};
NetworkFacade nf = NetworkFacadeImpl.INSTANCE;
SOCountDownLatch connectLatch = new SOCountDownLatch(1);
SOCountDownLatch contextClosedLatch = new SOCountDownLatch(1);
AtomicInteger closeCount = new AtomicInteger(0);
......@@ -586,7 +582,7 @@ public class IODispatcherTest {
while (serverRunning.get()) {
dispatcher.run();
dispatcher.processIOQueue(
(operation, context, disp) -> context.handleClientOperation(operation, nf, disp, selector)
(operation, context, disp) -> context.handleClientOperation(operation, disp, selector)
);
}
serverHaltLatch.countDown();
......@@ -668,7 +664,6 @@ public class IODispatcherTest {
TestUtils.assertMemoryLeak(() -> {
HttpServerConfiguration httpServerConfiguration = new DefaultHttpServerConfiguration();
NetworkFacade nf = NetworkFacadeImpl.INSTANCE;
SOCountDownLatch connectLatch = new SOCountDownLatch(1);
SOCountDownLatch contextClosedLatch = new SOCountDownLatch(1);
AtomicInteger closeCount = new AtomicInteger(0);
......@@ -740,7 +735,7 @@ public class IODispatcherTest {
while (serverRunning.get()) {
dispatcher.run();
dispatcher.processIOQueue(
(operation, context, disp) -> context.handleClientOperation(operation, nf, disp, selector)
(operation, context, disp) -> context.handleClientOperation(operation, disp, selector)
);
}
serverHaltLatch.countDown();
......@@ -795,7 +790,7 @@ public class IODispatcherTest {
}
@Test
public void testStaticContentHandlerSimple() throws InterruptedException {
public void testStaticContentHandlerSimple() {
String baseDir = System.getProperty("java.io.tmpdir");
final DefaultHttpServerConfiguration httpConfiguration = new DefaultHttpServerConfiguration() {
......@@ -825,7 +820,13 @@ public class IODispatcherTest {
public StaticContentProcessorConfiguration getStaticContentProcessorConfiguration() {
return staticContentProcessorConfiguration;
}
@Override
public MillisecondClock getClock() {
return () -> 0;
}
};
try (HttpServer httpServer = new HttpServer(httpConfiguration)) {
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
......@@ -843,76 +844,115 @@ public class IODispatcherTest {
// create 100Mb file in /tmp directory
try (Path path = new Path().of(baseDir).concat("questdb-temp.txt").$()) {
long fd = Files.openAppend(path);
Files.truncate(fd, 0);
final int bufLen = 1024 * 1024;
long buf = Unsafe.malloc(bufLen); // 1Mb buffer
Rnd rnd = new Rnd();
for (int i = 0; i < bufLen / 8; i++) {
Unsafe.getUnsafe().putLong(buf + i * 8, rnd.nextLong());
}
try {
if (Files.exists(path)) {
Assert.assertTrue(Files.remove(path));
}
long fd = Files.openAppend(path);
for (int i = 0; i < 20; i++) {
Assert.assertEquals(bufLen, Files.append(fd, buf, bufLen));
}
final int bufLen = 1024 * 1024;
long buf = Unsafe.malloc(bufLen); // 1Mb buffer
Rnd rnd = new Rnd();
for (int i = 0; i < bufLen; i++) {
Unsafe.getUnsafe().putLong(buf + i, rnd.nextByte());
}
Files.close(fd);
Unsafe.free(buf, bufLen);
}
for (int i = 0; i < 20; i++) {
Assert.assertEquals(bufLen, Files.append(fd, buf, bufLen));
}
httpServer.getStartedLatch().await();
Files.close(fd);
Files.setLastModified(path, 122222212222L);
Unsafe.free(buf, bufLen);
// send request to server to download file we just created
final String request = "GET /questdb-temp.txt HTTP/1.1\r\n" +
"Host: localhost:9000\r\n" +
"Connection: keep-alive\r\n" +
"Cache-Control: max-age=0\r\n" +
"Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8\r\n" +
"User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.48 Safari/537.36\r\n" +
"Accept-Encoding: gzip,deflate,sdch\r\n" +
"Accept-Language: en-US,en;q=0.8\r\n" +
"Cookie: textwrapon=false; textautoformat=false; wysiwyg=textarea\r\n" +
"\r\n";
httpServer.getStartedLatch().await();
long fd = Net.socketTcp(true);
try {
long sockAddr = Net.sockaddr("127.0.0.1", 9001);
try {
Assert.assertTrue(fd > -1);
Assert.assertEquals(0, Net.connect(fd, sockAddr));
// send request to server to download file we just created
final String request = "GET /questdb-temp.txt HTTP/1.1\r\n" +
"Host: localhost:9000\r\n" +
"Connection: keep-alive\r\n" +
"Cache-Control: max-age=0\r\n" +
"Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8\r\n" +
"User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.48 Safari/537.36\r\n" +
"Accept-Encoding: gzip,deflate,sdch\r\n" +
"Accept-Language: en-US,en;q=0.8\r\n" +
"Cookie: textwrapon=false; textautoformat=false; wysiwyg=textarea\r\n" +
"\r\n";
String expectedResponseHeader = "HTTP/1.1 200 OK\r\n" +
"Server: questDB/1.0\r\n" +
"Date: Thu, 1 Jan 1970 00:00:00 GMT\r\n" +
"Content-Length: 20971520\r\n" +
"Content-Type: text/plain\r\n" +
"ETag: \"122225812000\"\r\n" + // this is last modified timestamp on the file, we set this value when we created file
"\r\n";
int headerLen = expectedResponseHeader.length();
int headerCheckRemaining = expectedResponseHeader.length();
int len = request.length();
long buffer = TestUtils.toMemory(request);
// prepare random generator to validate the downloaded content
rnd.reset();
fd = Net.socketTcp(true);
try {
int part1 = len / 2;
Assert.assertEquals(part1, Net.send(fd, buffer, part1));
Assert.assertEquals(len - part1, Net.send(fd, buffer + part1, len - part1));
// download
long downloadedSoFar = 0;
while (downloadedSoFar < 20971672) {
int n = Net.recv(fd, buffer, len);
if (n > 0) {
downloadedSoFar += n;
long sockAddr = Net.sockaddr("127.0.0.1", 9001);
try {
Assert.assertTrue(fd > -1);
Assert.assertEquals(0, Net.connect(fd, sockAddr));
int len = request.length();
long buffer = TestUtils.toMemory(request);
try {
int part1 = len / 2;
Assert.assertEquals(part1, Net.send(fd, buffer, part1));
Assert.assertEquals(len - part1, Net.send(fd, buffer + part1, len - part1));
// download
long downloadedSoFar = 0;
int contentRemaining = 0;
while (downloadedSoFar < 20971670) {
long contentOffset = 0;
int n = Net.recv(fd, buffer, len);
if (n > 0) {
if (headerCheckRemaining > 0) {
for (int i = 0; i < n; i++) {
if (expectedResponseHeader.charAt(headerLen - headerCheckRemaining) != (char) Unsafe.getUnsafe().getByte(buffer + i)) {
Assert.fail("at " + (headerLen - headerCheckRemaining));
}
headerCheckRemaining--;
contentOffset++;
}
} else {
for (int i = 0; i < n; i++) {
if (contentRemaining == 0) {
contentRemaining = bufLen;
rnd.reset();
}
Assert.assertEquals(rnd.nextByte(), Unsafe.getUnsafe().getByte(buffer + contentOffset + i));
contentRemaining--;
}
}
downloadedSoFar += n;
}
}
} finally {
Unsafe.free(buffer, len);
}
} finally {
Net.freeSockAddr(sockAddr);
}
} finally {
Unsafe.free(buffer, len);
Net.close(fd);
LOG.info().$("closed [fd=").$(fd).$(']').$();
}
httpServer.halt();
} finally {
Net.freeSockAddr(sockAddr);
Files.remove(path);
}
} finally {
Net.close(fd);
LOG.info().$("closed [fd=").$(fd).$(']').$();
}
httpServer.halt();
}
}
......@@ -1042,7 +1082,7 @@ public class IODispatcherTest {
while (serverRunning.get()) {
dispatcher.run();
dispatcher.processIOQueue(
(operation, context, disp) -> context.handleClientOperation(operation, nf, disp, selector)
(operation, context, disp) -> context.handleClientOperation(operation, disp, selector)
);
}
......
......@@ -26,12 +26,15 @@ package com.questdb.cutlass.http;
import com.questdb.std.Chars;
import com.questdb.std.FilesFacade;
import com.questdb.std.FilesFacadeImpl;
import com.questdb.std.Os;
import com.questdb.std.str.LPSZ;
import com.questdb.std.str.Path;
import com.questdb.test.tools.TestUtils;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicInteger;
public class MimeTypesCacheTest {
@Test()
public void testCannotOpen() throws Exception {
......@@ -50,7 +53,14 @@ public class MimeTypesCacheTest {
@Test()
public void testCannotRead1() throws Exception {
AtomicInteger closeCount = new AtomicInteger(0);
testFailure(new FilesFacadeImpl() {
@Override
public boolean close(long fd) {
closeCount.incrementAndGet();
return true;
}
@Override
public long length(long fd) {
return 1024;
......@@ -66,11 +76,21 @@ public class MimeTypesCacheTest {
return -1;
}
}, "could not read");
Assert.assertEquals(1, closeCount.get());
}
@Test()
public void testCannotRead2() throws Exception {
AtomicInteger closeCount = new AtomicInteger(0);
testFailure(new FilesFacadeImpl() {
@Override
public boolean close(long fd) {
closeCount.incrementAndGet();
return true;
}
@Override
public long length(long fd) {
return 1024;
......@@ -86,6 +106,8 @@ public class MimeTypesCacheTest {
return 128;
}
}, "could not read");
Assert.assertEquals(1, closeCount.get());
}
@Test
......@@ -95,7 +117,13 @@ public class MimeTypesCacheTest {
@Override
public void run() {
try (Path path = new Path()) {
path.of(this.getClass().getResource("/mime_test.types").getPath()).$();
String filePath;
if (Os.type == Os.WINDOWS) {
filePath = this.getClass().getResource("/mime_test.types").getFile().substring(1);
} else {
filePath = this.getClass().getResource("/mime_test.types").getFile();
}
path.of(filePath).$();
MimeTypesCache mimeTypes = new MimeTypesCache(FilesFacadeImpl.INSTANCE, path);
Assert.assertEquals(6, mimeTypes.size());
TestUtils.assertEquals("application/andrew-inset", mimeTypes.get("ez"));
......@@ -111,6 +139,7 @@ public class MimeTypesCacheTest {
@Test()
public void testWrongFileSize() throws Exception {
AtomicInteger closeCount = new AtomicInteger();
testFailure(new FilesFacadeImpl() {
@Override
public long length(long fd) {
......@@ -121,11 +150,20 @@ public class MimeTypesCacheTest {
public long openRO(LPSZ name) {
return 123L;
}
@Override
public boolean close(long fd) {
closeCount.incrementAndGet();
return true;
}
}, "wrong file size");
Assert.assertEquals(1, closeCount.get());
}
@Test()
public void testWrongFileSize2() throws Exception {
AtomicInteger closeCount = new AtomicInteger(0);
testFailure(new FilesFacadeImpl() {
@Override
public long length(long fd) {
......@@ -136,11 +174,22 @@ public class MimeTypesCacheTest {
public long openRO(LPSZ name) {
return 123L;
}
@Override
public boolean close(long fd) {
closeCount.incrementAndGet();
return true;
}
}, "wrong file size");
Assert.assertEquals(1, closeCount.get());
}
@Test()
public void testWrongFileSize4() throws Exception {
AtomicInteger closeCount = new AtomicInteger();
testFailure(new FilesFacadeImpl() {
@Override
public long length(long fd) {
......@@ -151,7 +200,15 @@ public class MimeTypesCacheTest {
public long openRO(LPSZ name) {
return 123L;
}
@Override
public boolean close(long fd) {
closeCount.incrementAndGet();
return true;
}
}, "wrong file size");
Assert.assertEquals(1, closeCount.get());
}
private void testFailure(FilesFacade ff, CharSequence startsWith) throws Exception {
......
......@@ -26,6 +26,7 @@ package com.questdb.cutlass.line.udp;
import com.questdb.cairo.*;
import com.questdb.cairo.pool.WriterPool;
import com.questdb.mp.Job;
import com.questdb.mp.SOCountDownLatch;
import com.questdb.mp.Worker;
import com.questdb.network.Net;
import com.questdb.network.NetworkFacadeImpl;
......@@ -35,8 +36,6 @@ import com.questdb.test.tools.TestUtils;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
......@@ -242,7 +241,7 @@ public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
try {
CountDownLatch workerHaltLatch = new CountDownLatch(1);
SOCountDownLatch workerHaltLatch = new SOCountDownLatch(1);
// create table
......@@ -284,7 +283,7 @@ public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
Assert.assertTrue(count > 0);
worker.halt();
Assert.assertTrue(workerHaltLatch.await(3, TimeUnit.SECONDS));
workerHaltLatch.await();
StringSink sink = new StringSink();
RecordCursorPrinter printer = new RecordCursorPrinter(sink);
......
......@@ -342,6 +342,7 @@ public class HttpServerTest extends AbstractJournalTest {
}
@Test
@Ignore
public void testImportAppend() throws Exception {
final AtomicInteger errors = new AtomicInteger();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册