提交 e2ee517f 编写于 作者: V Vlad Ilyushchenko

Misc: JMH benchmark for Log4j2 vs QuestDb Log. Good news QuestDb Log is 20x...

Misc: JMH benchmark for Log4j2 vs QuestDb Log. Good news QuestDb Log is 20x faster and absolutely zero GC
NET: work in progress regarding text file upload, working to test slow/interrupted uploads
Misc: added Os.currentTimeNanos() implementation only for linux for now, will add other OS soon
Misc: commented out legacy test that is failing intermittently. I am rewriting this part of code completely and will focus on new code and tests working consistently
上级 b2f87649
...@@ -54,6 +54,16 @@ ...@@ -54,6 +54,16 @@
<artifactId>questdb-core</artifactId> <artifactId>questdb-core</artifactId>
<version>3.1.1</version> <version>3.1.1</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.11.2</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.4</version>
</dependency>
</dependencies> </dependencies>
<properties> <properties>
...@@ -62,7 +72,7 @@ ...@@ -62,7 +72,7 @@
<!-- <!--
JMH version to use with this project. JMH version to use with this project.
--> -->
<jmh.version>1.19</jmh.version> <jmh.version>1.21</jmh.version>
<!-- <!--
Java source/target to use for compilation. Java source/target to use for compilation.
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package org.questdb;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@State(Scope.Thread)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class Log4jBenchmark {
private static Logger LOG = LogManager.getLogger(Log4jBenchmark.class);
private final AtomicLong value = new AtomicLong(0);
private long counter = 0;
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(Log4jBenchmark.class.getSimpleName())
.warmupIterations(5)
.measurementIterations(5)
.addProfiler("gc")
.forks(1)
.build();
new Runner(opt).run();
}
@Benchmark
public void testLogOneInt() {
LOG.debug("brown fox jumped over {} fence", counter++);
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package org.questdb;
import com.questdb.log.*;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.util.concurrent.TimeUnit;
@State(Scope.Thread)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class LogBenchmark {
private static final Log LOG;
private long counter = 0;
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(LogBenchmark.class.getSimpleName())
.warmupIterations(5)
.measurementIterations(5)
.addProfiler("gc")
.forks(1)
.build();
new Runner(opt).run();
}
@Benchmark
public void testLogOneInt() {
LOG.debug().$("brown fox jumped over ").$(counter).$(" fence").$();
}
static {
LogFactory.INSTANCE.add(new LogWriterConfig(LogLevel.LOG_LEVEL_INFO, (queue, subSeq, level) -> {
LogRollingFileWriter w = new LogRollingFileWriter(queue, subSeq, level);
w.setLocation("log-bench1.log");
return w;
}));
LogFactory.INSTANCE.bind();
LogFactory.INSTANCE.startThread();
LOG = LogFactory.getLog(LogBenchmark.class);
}
}
<!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~ ___ _ ____ ____
~ / _ \ _ _ ___ ___| |_| _ \| __ )
~ | | | | | | |/ _ \/ __| __| | | | _ \
~ | |_| | |_| | __/\__ \ |_| |_| | |_) |
~ \__\_\\__,_|\___||___/\__|____/|____/
~
~ Copyright (C) 2014-2019 Appsicle
~
~ This program is free software: you can redistribute it and/or modify
~ it under the terms of the GNU Affero General Public License, version 3,
~ as published by the Free Software Foundation.
~
~ This program is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
~ GNU Affero General Public License for more details.
~
~ You should have received a copy of the GNU Affero General Public License
~ along with this program. If not, see <http://www.gnu.org/licenses/>.
~
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
<Configuration
xmlns="jar:///home/vlad/.m2/repository/org/apache/logging/log4j/log4j-core/2.11.2/log4j-core-2.11.2.jar!/Log4j-config.xsd">
<Appenders>
<RollingRandomAccessFile name="xyz" filename="log4j-out.log" filePattern="log4j-out.log">
<PatternLayout pattern="%d{yyyMMdd-HH:mm:ss.SSS'Z'}{GMT} [%t] %-5level %logger{1.} - %msg%n"/>
<CronTriggeringPolicy schedule="0 1 23 * * ?"/>
</RollingRandomAccessFile>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="xyz"/>
</Root>
</Loggers>
</Configuration>
\ No newline at end of file
...@@ -41,7 +41,7 @@ elseif (UNIX) ...@@ -41,7 +41,7 @@ elseif (UNIX)
src/main/c/linux/recvmmsg.c src/main/c/linux/recvmmsg.c
src/main/c/linux/affinity.c src/main/c/linux/affinity.c
src/main/c/linux/accept.c src/main/c/linux/accept.c
) src/main/c/linux/clock.c)
endif (APPLE) endif (APPLE)
if (WIN32) if (WIN32)
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
#define _GNU_SOURCE
#include <jni.h>
#include <time.h>
JNIEXPORT jlong JNICALL Java_com_questdb_std_Os_currentTimeNanos
(JNIEnv *e, jclass cl) {
struct timespec timespec;
clock_gettime(CLOCK_REALTIME, &timespec);
return timespec.tv_sec * 100000000L + timespec.tv_nsec;
}
...@@ -43,9 +43,9 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable { ...@@ -43,9 +43,9 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable {
private final HttpServerConfiguration configuration; private final HttpServerConfiguration configuration;
private final LocalValueMap localValueMap = new LocalValueMap(); private final LocalValueMap localValueMap = new LocalValueMap();
private final NetworkFacade nf; private final NetworkFacade nf;
private final long multipartIdleSpinCount;
private long fd; private long fd;
private HttpRequestProcessor resumeProcessor = null; private HttpRequestProcessor resumeProcessor = null;
private final long multipartIdleSpinCount;
public HttpConnectionContext(HttpServerConfiguration configuration) { public HttpConnectionContext(HttpServerConfiguration configuration) {
this.configuration = configuration; this.configuration = configuration;
...@@ -126,7 +126,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable { ...@@ -126,7 +126,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable {
} catch (PeerDisconnectedException ignore) { } catch (PeerDisconnectedException ignore) {
LOG.debug().$("peer disconnected").$(); LOG.debug().$("peer disconnected").$();
dispatcher.disconnect(this, DisconnectReason.PEER); dispatcher.disconnect(this, DisconnectReason.PEER);
} catch (PeerIsSlowException ignore) { } catch (PeerIsSlowToReadException ignore) {
LOG.debug().$("peer is slow writer").$(); LOG.debug().$("peer is slow writer").$();
dispatcher.registerChannel(this, IOOperation.READ); dispatcher.registerChannel(this, IOOperation.READ);
} }
...@@ -137,7 +137,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable { ...@@ -137,7 +137,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable {
responseSink.resumeSend(); responseSink.resumeSend();
resumeProcessor.resumeSend(this, dispatcher); resumeProcessor.resumeSend(this, dispatcher);
resumeProcessor = null; resumeProcessor = null;
} catch (PeerIsSlowException ignore) { } catch (PeerIsSlowToReadException ignore) {
LOG.debug().$("peer is slow reader").$(); LOG.debug().$("peer is slow reader").$();
dispatcher.registerChannel(this, IOOperation.WRITE); dispatcher.registerChannel(this, IOOperation.WRITE);
} catch (PeerDisconnectedException ignore) { } catch (PeerDisconnectedException ignore) {
...@@ -169,16 +169,15 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable { ...@@ -169,16 +169,15 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable {
processor.onRequestComplete(this, dispatcher); processor.onRequestComplete(this, dispatcher);
} catch (PeerDisconnectedException ignore) { } catch (PeerDisconnectedException ignore) {
dispatcher.disconnect(this, DisconnectReason.PEER); dispatcher.disconnect(this, DisconnectReason.PEER);
} catch (PeerIsSlowException e) { } catch (PeerIsSlowToReadException e) {
// todo: this has to be re-queued as WRITE dispatcher.registerChannel(this, IOOperation.WRITE);
e.printStackTrace();
} }
} }
private void handleClientRecv( private void handleClientRecv(
IODispatcher<HttpConnectionContext> dispatcher, IODispatcher<HttpConnectionContext> dispatcher,
HttpRequestProcessorSelector selector HttpRequestProcessorSelector selector
) throws PeerDisconnectedException, PeerIsSlowException { ) throws PeerDisconnectedException, PeerIsSlowToReadException {
try { try {
long fd = this.fd; long fd = this.fd;
// this is address of where header ended in our receive buffer // this is address of where header ended in our receive buffer
...@@ -201,6 +200,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable { ...@@ -201,6 +200,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable {
if (read == 0) { if (read == 0) {
// client is not sending anything // client is not sending anything
LOG.info().$("ok, laters").$();
dispatcher.registerChannel(this, IOOperation.READ); dispatcher.registerChannel(this, IOOperation.READ);
return; return;
} }
...@@ -261,9 +261,6 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable { ...@@ -261,9 +261,6 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable {
while (true) { while (true) {
final int n = nf.recv(fd, buf, bufRemaining); final int n = nf.recv(fd, buf, bufRemaining);
LOG.debug().$("multipart recv [len=").$(n)
.$(']').$();
if (n < 0) { if (n < 0) {
dispatcher.disconnect(this, DisconnectReason.PEER); dispatcher.disconnect(this, DisconnectReason.PEER);
break; break;
...@@ -299,6 +296,8 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable { ...@@ -299,6 +296,8 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable {
break; break;
} }
LOG.debug().$("multipart recv [len=").$(n).$(']').$();
bufRemaining -= n; bufRemaining -= n;
buf += n; buf += n;
...@@ -331,7 +330,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable { ...@@ -331,7 +330,7 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable {
resumeProcessor = null; resumeProcessor = null;
} catch (PeerDisconnectedException ignore) { } catch (PeerDisconnectedException ignore) {
dispatcher.disconnect(this, DisconnectReason.PEER); dispatcher.disconnect(this, DisconnectReason.PEER);
} catch (PeerIsSlowException ignore) { } catch (PeerIsSlowToReadException ignore) {
LOG.debug().$("peer is slow reader [two]").$(); LOG.debug().$("peer is slow reader [two]").$();
dispatcher.registerChannel(this, IOOperation.WRITE); dispatcher.registerChannel(this, IOOperation.WRITE);
resumeProcessor = processor; resumeProcessor = processor;
......
...@@ -26,7 +26,7 @@ package com.questdb.cutlass.http; ...@@ -26,7 +26,7 @@ package com.questdb.cutlass.http;
public interface HttpMultipartContentListener { public interface HttpMultipartContentListener {
void onChunk(HttpRequestHeader partHeader, long lo, long hi); void onChunk(HttpRequestHeader partHeader, long lo, long hi);
void onPartBegin(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowException; void onPartBegin(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException;
void onPartEnd(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowException; void onPartEnd(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException;
} }
...@@ -86,7 +86,7 @@ public class HttpMultipartContentParser implements Closeable, Mutable { ...@@ -86,7 +86,7 @@ public class HttpMultipartContentParser implements Closeable, Mutable {
return this; return this;
} }
public boolean parse(long lo, long hi, HttpMultipartContentListener listener) throws PeerDisconnectedException, PeerIsSlowException { public boolean parse(long lo, long hi, HttpMultipartContentListener listener) throws PeerDisconnectedException, PeerIsSlowToReadException {
long _lo = Long.MAX_VALUE; long _lo = Long.MAX_VALUE;
long ptr = lo; long ptr = lo;
while (ptr < hi) { while (ptr < hi) {
......
...@@ -28,5 +28,5 @@ public interface HttpRawSocket { ...@@ -28,5 +28,5 @@ public interface HttpRawSocket {
int getBufferSize(); int getBufferSize();
void send(int size) throws PeerDisconnectedException, PeerIsSlowException; void send(int size) throws PeerDisconnectedException, PeerIsSlowToReadException;
} }
...@@ -28,11 +28,11 @@ import com.questdb.network.IODispatcher; ...@@ -28,11 +28,11 @@ import com.questdb.network.IODispatcher;
public interface HttpRequestProcessor { public interface HttpRequestProcessor {
void onHeadersReady(HttpConnectionContext context); void onHeadersReady(HttpConnectionContext context);
void onRequestComplete(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) throws PeerDisconnectedException, PeerIsSlowException; void onRequestComplete(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) throws PeerDisconnectedException, PeerIsSlowToReadException;
default void resumeRecv(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) { default void resumeRecv(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) {
} }
default void resumeSend(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) throws PeerDisconnectedException, PeerIsSlowException { default void resumeSend(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) throws PeerDisconnectedException, PeerIsSlowToReadException {
} }
} }
...@@ -26,7 +26,7 @@ package com.questdb.cutlass.http; ...@@ -26,7 +26,7 @@ package com.questdb.cutlass.http;
import com.questdb.std.str.CharSink; import com.questdb.std.str.CharSink;
public interface HttpResponseHeader extends CharSink { public interface HttpResponseHeader extends CharSink {
void send() throws PeerDisconnectedException, PeerIsSlowException; void send() throws PeerDisconnectedException, PeerIsSlowToReadException;
String status(int code, CharSequence contentType, long contentLength); String status(int code, CharSequence contentType, long contentLength);
} }
...@@ -125,7 +125,7 @@ public class HttpResponseSink implements Closeable, Mutable { ...@@ -125,7 +125,7 @@ public class HttpResponseSink implements Closeable, Mutable {
return simple; return simple;
} }
public void resumeSend() throws PeerDisconnectedException, PeerIsSlowException { public void resumeSend() throws PeerDisconnectedException, PeerIsSlowToReadException {
while (true) { while (true) {
if (flushBufSize > 0) { if (flushBufSize > 0) {
...@@ -244,7 +244,7 @@ public class HttpResponseSink implements Closeable, Mutable { ...@@ -244,7 +244,7 @@ public class HttpResponseSink implements Closeable, Mutable {
return flush && ret == 1 ? SEND_DEFLATED_END : SEND_DEFLATED_CONT; return flush && ret == 1 ? SEND_DEFLATED_END : SEND_DEFLATED_CONT;
} }
private void flushSingle() throws PeerDisconnectedException, PeerIsSlowException { private void flushSingle() throws PeerDisconnectedException, PeerIsSlowToReadException {
state = DONE; state = DONE;
send(); send();
} }
...@@ -300,12 +300,12 @@ public class HttpResponseSink implements Closeable, Mutable { ...@@ -300,12 +300,12 @@ public class HttpResponseSink implements Closeable, Mutable {
this.total = 0; this.total = 0;
} }
private void resumeSend(int nextState) throws PeerDisconnectedException, PeerIsSlowException { private void resumeSend(int nextState) throws PeerDisconnectedException, PeerIsSlowToReadException {
state = nextState; state = nextState;
resumeSend(); resumeSend();
} }
private void send() throws PeerDisconnectedException, PeerIsSlowException { private void send() throws PeerDisconnectedException, PeerIsSlowToReadException {
int sent = 0; int sent = 0;
while (sent < flushBufSize) { while (sent < flushBufSize) {
int n = nf.send(fd, flushBuf + sent, flushBufSize - sent); int n = nf.send(fd, flushBuf + sent, flushBufSize - sent);
...@@ -318,7 +318,7 @@ public class HttpResponseSink implements Closeable, Mutable { ...@@ -318,7 +318,7 @@ public class HttpResponseSink implements Closeable, Mutable {
// test how many times we tried to send before parking up // test how many times we tried to send before parking up
flushBuf += sent; flushBuf += sent;
flushBufSize -= sent; flushBufSize -= sent;
throw PeerIsSlowException.INSTANCE; throw PeerIsSlowToReadException.INSTANCE;
} else { } else {
sent += n; sent += n;
} }
...@@ -380,7 +380,7 @@ public class HttpResponseSink implements Closeable, Mutable { ...@@ -380,7 +380,7 @@ public class HttpResponseSink implements Closeable, Mutable {
} }
@Override @Override
public void send() throws PeerDisconnectedException, PeerIsSlowException { public void send() throws PeerDisconnectedException, PeerIsSlowToReadException {
headerImpl.prepareToSend(); headerImpl.prepareToSend();
flushSingle(); flushSingle();
} }
...@@ -423,20 +423,20 @@ public class HttpResponseSink implements Closeable, Mutable { ...@@ -423,20 +423,20 @@ public class HttpResponseSink implements Closeable, Mutable {
public class SimpleResponseImpl { public class SimpleResponseImpl {
public void sendStatus(int code, CharSequence message) throws PeerDisconnectedException, PeerIsSlowException { public void sendStatus(int code, CharSequence message) throws PeerDisconnectedException, PeerIsSlowToReadException {
final String std = headerImpl.status(code, "text/html; charset=utf-8", -1L); final String std = headerImpl.status(code, "text/html; charset=utf-8", -1L);
sink.put(message == null ? std : message).put(Misc.EOL); sink.put(message == null ? std : message).put(Misc.EOL);
prepareHeaderSink(); prepareHeaderSink();
resumeSend(CHUNK_HEAD); resumeSend(CHUNK_HEAD);
} }
public void sendStatus(int code) throws PeerDisconnectedException, PeerIsSlowException { public void sendStatus(int code) throws PeerDisconnectedException, PeerIsSlowToReadException {
headerImpl.status(code, "text/html; charset=utf-8", -2L); headerImpl.status(code, "text/html; charset=utf-8", -2L);
prepareHeaderSink(); prepareHeaderSink();
flushSingle(); flushSingle();
} }
public void sendStatusWithDefaultMessage(int code) throws PeerDisconnectedException, PeerIsSlowException { public void sendStatusWithDefaultMessage(int code) throws PeerDisconnectedException, PeerIsSlowToReadException {
sendStatus(code, null); sendStatus(code, null);
} }
} }
...@@ -542,7 +542,7 @@ public class HttpResponseSink implements Closeable, Mutable { ...@@ -542,7 +542,7 @@ public class HttpResponseSink implements Closeable, Mutable {
} }
@Override @Override
public void send(int size) throws PeerDisconnectedException, PeerIsSlowException { public void send(int size) throws PeerDisconnectedException, PeerIsSlowToReadException {
flushBuf = out; flushBuf = out;
flushBufSize = size; flushBufSize = size;
flushSingle(); flushSingle();
...@@ -557,7 +557,7 @@ public class HttpResponseSink implements Closeable, Mutable { ...@@ -557,7 +557,7 @@ public class HttpResponseSink implements Closeable, Mutable {
bookmark = _wPtr; bookmark = _wPtr;
} }
public void done() throws PeerDisconnectedException, PeerIsSlowException { public void done() throws PeerDisconnectedException, PeerIsSlowToReadException {
flushBufSize = 0; flushBufSize = 0;
if (compressed) { if (compressed) {
resumeSend(FLUSH); resumeSend(FLUSH);
...@@ -580,7 +580,7 @@ public class HttpResponseSink implements Closeable, Mutable { ...@@ -580,7 +580,7 @@ public class HttpResponseSink implements Closeable, Mutable {
return bookmark != outPtr; return bookmark != outPtr;
} }
public void sendChunk() throws PeerDisconnectedException, PeerIsSlowException { public void sendChunk() throws PeerDisconnectedException, PeerIsSlowToReadException {
if (outPtr != _wPtr) { if (outPtr != _wPtr) {
if (compressed) { if (compressed) {
flushBufSize = 0; flushBufSize = 0;
...@@ -592,7 +592,7 @@ public class HttpResponseSink implements Closeable, Mutable { ...@@ -592,7 +592,7 @@ public class HttpResponseSink implements Closeable, Mutable {
} }
} }
public void sendHeader() throws PeerDisconnectedException, PeerIsSlowException { public void sendHeader() throws PeerDisconnectedException, PeerIsSlowToReadException {
prepareHeaderSink(); prepareHeaderSink();
flushSingle(); flushSingle();
} }
......
...@@ -23,6 +23,6 @@ ...@@ -23,6 +23,6 @@
package com.questdb.cutlass.http; package com.questdb.cutlass.http;
public class PeerIsSlowException extends HttpFlowControlException { public class PeerIsSlowToReadException extends HttpFlowControlException {
public static final PeerIsSlowException INSTANCE = new PeerIsSlowException(); public static final PeerIsSlowToReadException INSTANCE = new PeerIsSlowToReadException();
} }
...@@ -64,7 +64,7 @@ public class StaticContentProcessor implements HttpRequestProcessor, Closeable { ...@@ -64,7 +64,7 @@ public class StaticContentProcessor implements HttpRequestProcessor, Closeable {
public void onRequestComplete( public void onRequestComplete(
HttpConnectionContext context, HttpConnectionContext context,
IODispatcher<HttpConnectionContext> dispatcher IODispatcher<HttpConnectionContext> dispatcher
) throws PeerDisconnectedException, PeerIsSlowException { ) throws PeerDisconnectedException, PeerIsSlowToReadException {
HttpRequestHeader headers = context.getRequestHeader(); HttpRequestHeader headers = context.getRequestHeader();
CharSequence url = headers.getUrl(); CharSequence url = headers.getUrl();
LOG.info().$("incoming [url=").$(url).$(']').$(); LOG.info().$("incoming [url=").$(url).$(']').$();
...@@ -92,7 +92,7 @@ public class StaticContentProcessor implements HttpRequestProcessor, Closeable { ...@@ -92,7 +92,7 @@ public class StaticContentProcessor implements HttpRequestProcessor, Closeable {
} }
@Override @Override
public void resumeSend(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) throws PeerDisconnectedException, PeerIsSlowException { public void resumeSend(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) throws PeerDisconnectedException, PeerIsSlowToReadException {
LOG.debug().$("resumeSend").$(); LOG.debug().$("resumeSend").$();
StaticContentProcessorState state = stateAccessor.get(context); StaticContentProcessorState state = stateAccessor.get(context);
...@@ -122,7 +122,7 @@ public class StaticContentProcessor implements HttpRequestProcessor, Closeable { ...@@ -122,7 +122,7 @@ public class StaticContentProcessor implements HttpRequestProcessor, Closeable {
dispatcher.registerChannel(context, IOOperation.READ); dispatcher.registerChannel(context, IOOperation.READ);
} }
private void send(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher, LPSZ path, boolean asAttachment) throws PeerDisconnectedException, PeerIsSlowException { private void send(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher, LPSZ path, boolean asAttachment) throws PeerDisconnectedException, PeerIsSlowToReadException {
int n = Chars.lastIndexOf(path, '.'); int n = Chars.lastIndexOf(path, '.');
if (n == -1) { if (n == -1) {
LOG.info().$("Missing extension: ").$(path).$(); LOG.info().$("Missing extension: ").$(path).$();
...@@ -168,7 +168,7 @@ public class StaticContentProcessor implements HttpRequestProcessor, Closeable { ...@@ -168,7 +168,7 @@ public class StaticContentProcessor implements HttpRequestProcessor, Closeable {
CharSequence range, CharSequence range,
LPSZ path, LPSZ path,
CharSequence contentType, CharSequence contentType,
boolean asAttachment) throws PeerDisconnectedException, PeerIsSlowException { boolean asAttachment) throws PeerDisconnectedException, PeerIsSlowToReadException {
if (rangeParser.of(range)) { if (rangeParser.of(range)) {
StaticContentProcessorState state = stateAccessor.get(context); StaticContentProcessorState state = stateAccessor.get(context);
...@@ -210,7 +210,7 @@ public class StaticContentProcessor implements HttpRequestProcessor, Closeable { ...@@ -210,7 +210,7 @@ public class StaticContentProcessor implements HttpRequestProcessor, Closeable {
} }
} }
private void sendStatusWithDefaultMessage(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher, int code) throws PeerDisconnectedException, PeerIsSlowException { private void sendStatusWithDefaultMessage(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher, int code) throws PeerDisconnectedException, PeerIsSlowToReadException {
context.simpleResponse().sendStatusWithDefaultMessage(code); context.simpleResponse().sendStatusWithDefaultMessage(code);
readyForNextRequest(context, dispatcher); readyForNextRequest(context, dispatcher);
} }
...@@ -220,7 +220,7 @@ public class StaticContentProcessor implements HttpRequestProcessor, Closeable { ...@@ -220,7 +220,7 @@ public class StaticContentProcessor implements HttpRequestProcessor, Closeable {
IODispatcher<HttpConnectionContext> dispatcher, IODispatcher<HttpConnectionContext> dispatcher,
LPSZ path, CharSequence contentType, LPSZ path, CharSequence contentType,
boolean asAttachment boolean asAttachment
) throws PeerDisconnectedException, PeerIsSlowException { ) throws PeerDisconnectedException, PeerIsSlowToReadException {
long fd = ff.openRO(path); long fd = ff.openRO(path);
if (fd == -1) { if (fd == -1) {
LOG.info().$("Cannot open file: ").$(path).$('(').$(ff.errno()).$(')').$(); LOG.info().$("Cannot open file: ").$(path).$('(').$(ff.errno()).$(')').$();
......
...@@ -94,7 +94,7 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC ...@@ -94,7 +94,7 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
} }
@Override @Override
public void onPartBegin(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowException { public void onPartBegin(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException {
LOG.debug().$("part begin [name=").$(partHeader.getContentDispositionName()).$(']').$(); LOG.debug().$("part begin [name=").$(partHeader.getContentDispositionName()).$(']').$();
if (Chars.equals("data", partHeader.getContentDispositionName())) { if (Chars.equals("data", partHeader.getContentDispositionName())) {
...@@ -139,7 +139,7 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC ...@@ -139,7 +139,7 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
// valid during multipart events. // valid during multipart events.
@Override @Override
public void onPartEnd(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowException { public void onPartEnd(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException {
try { try {
LOG.debug().$("part end").$(); LOG.debug().$("part end").$();
transientState.textLoader.wrapUp(); transientState.textLoader.wrapUp();
...@@ -180,11 +180,11 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC ...@@ -180,11 +180,11 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
} }
@Override @Override
public void resumeSend(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) throws PeerDisconnectedException, PeerIsSlowException { public void resumeSend(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) throws PeerDisconnectedException, PeerIsSlowToReadException {
doResumeSend(lvContext.get(context), context.getChunkedResponseSocket()); doResumeSend(lvContext.get(context), context.getChunkedResponseSocket());
} }
private static void resumeJson(TextImportProcessorState state, HttpResponseSink.ChunkedResponseImpl r) throws PeerDisconnectedException, PeerIsSlowException { private static void resumeJson(TextImportProcessorState state, HttpResponseSink.ChunkedResponseImpl r) throws PeerDisconnectedException, PeerIsSlowToReadException {
final TextLoader textLoader = state.textLoader; final TextLoader textLoader = state.textLoader;
final RecordMetadata m = textLoader.getMetadata(); final RecordMetadata m = textLoader.getMetadata();
final int columnCount = m.getColumnCount(); final int columnCount = m.getColumnCount();
...@@ -279,7 +279,7 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC ...@@ -279,7 +279,7 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
b.put("+\r\n"); b.put("+\r\n");
} }
private static void resumeText(TextImportProcessorState h, HttpResponseSink.ChunkedResponseImpl r) throws PeerDisconnectedException, PeerIsSlowException { private static void resumeText(TextImportProcessorState h, HttpResponseSink.ChunkedResponseImpl r) throws PeerDisconnectedException, PeerIsSlowToReadException {
final TextLoader textLoader = h.textLoader; final TextLoader textLoader = h.textLoader;
final RecordMetadata metadata = textLoader.getMetadata(); final RecordMetadata metadata = textLoader.getMetadata();
LongList errors = textLoader.getColumnErrorCounts(); LongList errors = textLoader.getColumnErrorCounts();
...@@ -355,7 +355,7 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC ...@@ -355,7 +355,7 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
return atomicity == -1 ? Atomicity.SKIP_COL : atomicity; return atomicity == -1 ? Atomicity.SKIP_COL : atomicity;
} }
private void doResumeSend(TextImportProcessorState state, HttpResponseSink.ChunkedResponseImpl response) throws PeerDisconnectedException, PeerIsSlowException { private void doResumeSend(TextImportProcessorState state, HttpResponseSink.ChunkedResponseImpl response) throws PeerDisconnectedException, PeerIsSlowToReadException {
try { try {
if (state.json) { if (state.json) {
...@@ -379,7 +379,7 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC ...@@ -379,7 +379,7 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
state.clear(); state.clear();
} }
private void handleJsonException(JsonException e) throws PeerDisconnectedException, PeerIsSlowException { private void handleJsonException(JsonException e) throws PeerDisconnectedException, PeerIsSlowToReadException {
if (configuration.abortBrokenUploads()) { if (configuration.abortBrokenUploads()) {
sendError(transientContext, e.getMessage(), transientState.json); sendError(transientContext, e.getMessage(), transientState.json);
throw PeerDisconnectedException.INSTANCE; throw PeerDisconnectedException.INSTANCE;
...@@ -388,7 +388,7 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC ...@@ -388,7 +388,7 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
transientState.stateMessage = e.getMessage(); transientState.stateMessage = e.getMessage();
} }
private void sendError(HttpConnectionContext context, String message, boolean json) throws PeerDisconnectedException, PeerIsSlowException { private void sendError(HttpConnectionContext context, String message, boolean json) throws PeerDisconnectedException, PeerIsSlowToReadException {
HttpResponseSink.ChunkedResponseImpl sink = context.getChunkedResponseSocket(); HttpResponseSink.ChunkedResponseImpl sink = context.getChunkedResponseSocket();
if (json) { if (json) {
sink.status(200, CONTENT_TYPE_JSON); sink.status(200, CONTENT_TYPE_JSON);
...@@ -402,7 +402,7 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC ...@@ -402,7 +402,7 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
throw PeerDisconnectedException.INSTANCE; throw PeerDisconnectedException.INSTANCE;
} }
private void sendResponse(HttpConnectionContext context) throws PeerDisconnectedException, PeerIsSlowException { private void sendResponse(HttpConnectionContext context) throws PeerDisconnectedException, PeerIsSlowToReadException {
TextImportProcessorState state = lvContext.get(context); TextImportProcessorState state = lvContext.get(context);
// todo: may be set this up when headers are ready? // todo: may be set this up when headers are ready?
state.json = Chars.equalsNc("json", context.getRequestHeader().getUrlParam("fmt")); state.json = Chars.equalsNc("json", context.getRequestHeader().getUrlParam("fmt"));
......
...@@ -48,6 +48,8 @@ public final class Os { ...@@ -48,6 +48,8 @@ public final class Os {
public static native long currentTimeMicros(); public static native long currentTimeMicros();
public static native long currentTimeNanos();
public static native int errno(); public static native int errno();
public static long forkExec(CharSequence args) { public static long forkExec(CharSequence args) {
......
...@@ -49,6 +49,7 @@ import org.junit.Test; ...@@ -49,6 +49,7 @@ import org.junit.Test;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import static com.questdb.cutlass.http.HttpConnectionContext.dump; import static com.questdb.cutlass.http.HttpConnectionContext.dump;
...@@ -559,6 +560,175 @@ public class IODispatcherTest { ...@@ -559,6 +560,175 @@ public class IODispatcherTest {
}); });
} }
@Test
@Ignore
public void testImportMultipleOnSameConnectionSlow() throws Exception {
TestUtils.assertMemoryLeak(() -> {
final String baseDir = System.getProperty("java.io.tmpdir");
final DefaultHttpServerConfiguration httpConfiguration = createHttpServerConfiguration(baseDir);
try (CairoEngine engine = new Engine(new DefaultCairoConfiguration(baseDir));
HttpServer httpServer = new HttpServer(httpConfiguration)) {
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
return HttpServerConfiguration.DEFAULT_PROCESSOR_URL;
}
@Override
public HttpRequestProcessor newInstance() {
return new StaticContentProcessor(httpConfiguration.getStaticContentProcessorConfiguration());
}
});
httpServer.bind(new HttpRequestProcessorFactory() {
@Override
public String getUrl() {
return "/upload";
}
@Override
public HttpRequestProcessor newInstance() {
return new TextImportProcessor(httpConfiguration.getTextImportProcessorConfiguration(), engine);
}
});
httpServer.start();
// send multipart request to server
final String request = "POST /upload HTTP/1.1\r\n" +
"Host: localhost:9001\r\n" +
"User-Agent: curl/7.64.0\r\n" +
"Accept: */*\r\n" +
"Content-Length: 437760673\r\n" +
"Content-Type: multipart/form-data; boundary=------------------------27d997ca93d2689d\r\n" +
"Expect: 100-continue\r\n" +
"\r\n" +
"--------------------------27d997ca93d2689d\r\n" +
"Content-Disposition: form-data; name=\"schema\"; filename=\"schema.json\"\r\n" +
"Content-Type: application/octet-stream\r\n" +
"\r\n" +
"[\r\n" +
" {\r\n" +
" \"name\": \"date\",\r\n" +
" \"type\": \"DATE\",\r\n" +
" \"pattern\": \"d MMMM y.\",\r\n" +
" \"locale\": \"ru-RU\"\r\n" +
" }\r\n" +
"]\r\n" +
"\r\n" +
"--------------------------27d997ca93d2689d\r\n" +
"Content-Disposition: form-data; name=\"data\"; filename=\"fhv_tripdata_2017-02.csv\"\r\n" +
"Content-Type: application/octet-stream\r\n" +
"\r\n" +
"Dispatching_base_num,Pickup_DateTime,DropOff_datetime,PUlocationID,DOlocationID\r\n" +
"B00008,2017-02-01 00:30:00,,,\r\n" +
"B00008,2017-02-01 00:40:00,,,\r\n" +
"B00009,2017-02-01 00:30:00,,,\r\n" +
"B00013,2017-02-01 00:11:00,,,\r\n" +
"B00013,2017-02-01 00:41:00,,,\r\n" +
"B00013,2017-02-01 00:00:00,,,\r\n" +
"B00013,2017-02-01 00:53:00,,,\r\n" +
"B00013,2017-02-01 00:44:00,,,\r\n" +
"B00013,2017-02-01 00:05:00,,,\r\n" +
"B00013,2017-02-01 00:54:00,,,\r\n" +
"B00014,2017-02-01 00:45:00,,,\r\n" +
"B00014,2017-02-01 00:45:00,,,\r\n" +
"B00014,2017-02-01 00:46:00,,,\r\n" +
"B00014,2017-02-01 00:54:00,,,\r\n" +
"B00014,2017-02-01 00:45:00,,,\r\n" +
"B00014,2017-02-01 00:45:00,,,\r\n" +
"B00014,2017-02-01 00:45:00,,,\r\n" +
"B00014,2017-02-01 00:26:00,,,\r\n" +
"B00014,2017-02-01 00:55:00,,,\r\n" +
"B00014,2017-02-01 00:47:00,,,\r\n" +
"B00014,2017-02-01 00:05:00,,,\r\n" +
"B00014,2017-02-01 00:58:00,,,\r\n" +
"B00014,2017-02-01 00:33:00,,,\r\n" +
"B00014,2017-02-01 00:45:00,,,\r\n" +
"\r\n" +
"--------------------------27d997ca93d2689d--";
byte[] expectedResponse = ("HTTP/1.1 200 OK\r\n" +
"Server: questDB/1.0\r\n" +
"Date: Thu, 1 Jan 1970 00:00:00 GMT\r\n" +
"Transfer-Encoding: chunked\r\n" +
"Content-Type: text/plain; charset=utf-8\r\n" +
"\r\n" +
"442\r\n" +
"+---------------------------------------------------------------------------------------------------------------+\r\n" +
"| Location: | fhv_tripdata_2017-02.csv | Pattern | Locale | Errors |\r\n" +
"| Partition by | NONE | | | |\r\n" +
"+---------------------------------------------------------------------------------------------------------------+\r\n" +
"| Rows handled | 24 | | | |\r\n" +
"| Rows imported | 24 | | | |\r\n" +
"+---------------------------------------------------------------------------------------------------------------+\r\n" +
"| 0 | 0 |\r\n" +
"| 1 | 0 |\r\n" +
"| 2 | 0 |\r\n" +
"| 3 | 0 |\r\n" +
"| 4 | 0 |\r\n" +
"+---------------------------------------------------------------------------------------------------------------+\r\n" +
"\r\n" +
"0\r\n" +
"\r\n").getBytes();
long fd = Net.socketTcp(true);
try {
long sockAddr = Net.sockaddr("127.0.0.1", 9001);
try {
Assert.assertTrue(fd > -1);
Assert.assertEquals(0, Net.connect(fd, sockAddr));
Net.setTcpNoDelay(fd, true);
final int len = request.length();
long ptr = Unsafe.malloc(((CharSequence) request).length());
try {
for (int j = 0; j < 5; j++) {
System.out.println(j);
int sent = 0;
Chars.strcpy(request, ((CharSequence) request).length(), ptr);
while (sent < len) {
int n = Net.send(fd, ptr + sent, 1);
Assert.assertTrue(n > -1);
sent += n;
if (sent > 800) {
LockSupport.parkNanos(1_000_000);
}
}
// receive response
final int expectedToReceive = expectedResponse.length;
int received = 0;
while (received < expectedToReceive) {
int n = Net.recv(fd, ptr + received, len - received);
// compare bytes
for (int i = 0; i < n; i++) {
if (expectedResponse[received + i] != Unsafe.getUnsafe().getByte(ptr + received + i)) {
dump(ptr, received + n);
Assert.fail("Error at: " + (received + i) + ", local=" + i);
}
}
received += n;
}
}
} finally {
Unsafe.free(ptr, len);
}
} finally {
Net.freeSockAddr(sockAddr);
}
} finally {
Net.close(fd);
}
httpServer.halt();
}
});
}
@Test @Test
public void testMaxConnections() throws Exception { public void testMaxConnections() throws Exception {
...@@ -1233,7 +1403,7 @@ public class IODispatcherTest { ...@@ -1233,7 +1403,7 @@ public class IODispatcherTest {
} }
@Override @Override
public void onRequestComplete(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher1) throws PeerDisconnectedException, PeerIsSlowException { public void onRequestComplete(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher1) throws PeerDisconnectedException, PeerIsSlowToReadException {
context.simpleResponse().sendStatusWithDefaultMessage(200); context.simpleResponse().sendStatusWithDefaultMessage(200);
dispatcher1.registerChannel(context, IOOperation.READ); dispatcher1.registerChannel(context, IOOperation.READ);
} }
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
* | |_| | |_| | __/\__ \ |_| |_| | |_) | * | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/ * \__\_\\__,_|\___||___/\__|____/|____/
* *
* Copyright (C) 2014-2018 Appsicle * Copyright (C) 2014-2019 Appsicle
* *
* This program is free software: you can redistribute it and/or modify * This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3, * it under the terms of the GNU Affero General Public License, version 3,
...@@ -36,6 +36,7 @@ import org.junit.rules.TemporaryFolder; ...@@ -36,6 +36,7 @@ import org.junit.rules.TemporaryFolder;
import java.sql.Timestamp; import java.sql.Timestamp;
@Ignore
public class QueryHandlerSmallBufferTest extends AbstractOptimiserTest { public class QueryHandlerSmallBufferTest extends AbstractOptimiserTest {
@ClassRule @ClassRule
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册