提交 5d4f5aee 编写于 作者: V Vlad Ilyushchenko

NET: first cut of all new HttpServer. Skeleton is ok, static data processor...

NET: first cut of all new HttpServer. Skeleton is ok, static data processor delivers its first file (in unit test)
上级 40fa82b3
...@@ -23,7 +23,46 @@ ...@@ -23,7 +23,46 @@
package com.questdb.cutlass.http; package com.questdb.cutlass.http;
import com.questdb.cutlass.http.processors.StaticContentProcessorConfiguration;
import com.questdb.network.DefaultIODispatcherConfiguration;
import com.questdb.network.IODispatcherConfiguration;
import com.questdb.std.FilesFacade;
import com.questdb.std.FilesFacadeImpl;
import com.questdb.std.str.Path;
import com.questdb.std.time.MillisecondClock;
import com.questdb.std.time.MillisecondClockImpl;
class DefaultHttpServerConfiguration implements HttpServerConfiguration { class DefaultHttpServerConfiguration implements HttpServerConfiguration {
protected final MimeTypesCache mimeTypesCache;
private final IODispatcherConfiguration dispatcherConfiguration = new DefaultIODispatcherConfiguration();
private final StaticContentProcessorConfiguration staticContentProcessorConfiguration = new StaticContentProcessorConfiguration() {
@Override
public FilesFacade getFilesFacade() {
return FilesFacadeImpl.INSTANCE;
}
@Override
public CharSequence getIndexFileName() {
return "index.html";
}
@Override
public MimeTypesCache getMimeTypesCache() {
return mimeTypesCache;
}
@Override
public CharSequence getPublicDirectory() {
return ".";
}
};
public DefaultHttpServerConfiguration() {
try (Path path = new Path().of(this.getClass().getResource("/site/conf/mime.types").getFile()).$()) {
this.mimeTypesCache = new MimeTypesCache(FilesFacadeImpl.INSTANCE, path);
}
}
@Override @Override
public int getConnectionHeaderBufferSize() { public int getConnectionHeaderBufferSize() {
return 1024; return 1024;
...@@ -48,4 +87,24 @@ class DefaultHttpServerConfiguration implements HttpServerConfiguration { ...@@ -48,4 +87,24 @@ class DefaultHttpServerConfiguration implements HttpServerConfiguration {
public int getConnectionWrapperObjPoolSize() { public int getConnectionWrapperObjPoolSize() {
return 128; return 128;
} }
@Override
public MillisecondClock getClock() {
return MillisecondClockImpl.INSTANCE;
}
@Override
public IODispatcherConfiguration getDispatcherConfiguration() {
return dispatcherConfiguration;
}
@Override
public StaticContentProcessorConfiguration getStaticContentProcessorConfiguration() {
return staticContentProcessorConfiguration;
}
@Override
public int getWorkerCount() {
return 2;
}
} }
...@@ -26,12 +26,10 @@ package com.questdb.cutlass.http; ...@@ -26,12 +26,10 @@ package com.questdb.cutlass.http;
import com.questdb.log.Log; import com.questdb.log.Log;
import com.questdb.log.LogFactory; import com.questdb.log.LogFactory;
import com.questdb.network.*; import com.questdb.network.*;
import com.questdb.std.Chars; import com.questdb.std.*;
import com.questdb.std.ObjectPool;
import com.questdb.std.Unsafe;
import com.questdb.std.str.DirectByteCharSequence; import com.questdb.std.str.DirectByteCharSequence;
public class HttpConnectionContext implements IOContext { public class HttpConnectionContext implements IOContext, Locality {
private static final Log LOG = LogFactory.getLog(HttpConnectionContext.class); private static final Log LOG = LogFactory.getLog(HttpConnectionContext.class);
private final HttpHeaderParser headerParser; private final HttpHeaderParser headerParser;
...@@ -39,10 +37,13 @@ public class HttpConnectionContext implements IOContext { ...@@ -39,10 +37,13 @@ public class HttpConnectionContext implements IOContext {
private final int recvBufferSize; private final int recvBufferSize;
private final HttpMultipartContentParser multipartContentParser; private final HttpMultipartContentParser multipartContentParser;
private final HttpHeaderParser multipartContentHeaderParser; private final HttpHeaderParser multipartContentHeaderParser;
private final HttpResponseSink responseSink;
private final ObjectPool<DirectByteCharSequence> csPool; private final ObjectPool<DirectByteCharSequence> csPool;
private final long sendBuffer; private final long sendBuffer;
private final HttpServerConfiguration configuration; private final HttpServerConfiguration configuration;
private final long fd; private final long fd;
private final LocalValueMap localValueMap = new LocalValueMap();
private HttpRequestProcessor resumeProcessor = null;
public HttpConnectionContext(HttpServerConfiguration configuration, long fd) { public HttpConnectionContext(HttpServerConfiguration configuration, long fd) {
this.configuration = configuration; this.configuration = configuration;
...@@ -54,6 +55,14 @@ public class HttpConnectionContext implements IOContext { ...@@ -54,6 +55,14 @@ public class HttpConnectionContext implements IOContext {
this.recvBuffer = Unsafe.malloc(recvBufferSize); this.recvBuffer = Unsafe.malloc(recvBufferSize);
this.sendBuffer = Unsafe.malloc(configuration.getConnectionSendBufferSize()); this.sendBuffer = Unsafe.malloc(configuration.getConnectionSendBufferSize());
this.fd = fd; this.fd = fd;
this.responseSink = new HttpResponseSink(configuration, fd);
}
public void clear() {
this.headerParser.clear();
this.multipartContentParser.clear();
this.multipartContentParser.clear();
this.csPool.clear();
} }
@Override @Override
...@@ -61,6 +70,7 @@ public class HttpConnectionContext implements IOContext { ...@@ -61,6 +70,7 @@ public class HttpConnectionContext implements IOContext {
csPool.clear(); csPool.clear();
multipartContentParser.close(); multipartContentParser.close();
multipartContentHeaderParser.close(); multipartContentHeaderParser.close();
responseSink.close();
headerParser.close(); headerParser.close();
Unsafe.free(recvBuffer, recvBufferSize); Unsafe.free(recvBuffer, recvBufferSize);
Unsafe.free(sendBuffer, configuration.getConnectionSendBufferSize()); Unsafe.free(sendBuffer, configuration.getConnectionSendBufferSize());
...@@ -71,15 +81,21 @@ public class HttpConnectionContext implements IOContext { ...@@ -71,15 +81,21 @@ public class HttpConnectionContext implements IOContext {
return fd; return fd;
} }
public HttpResponseSink.DirectBufferResponse getDirectBufferResponse() {
return responseSink.getDirectBufferResponse();
}
public HttpResponseSink.HeaderOnlyResponse getHeaderOnlyResponse() {
return responseSink.getHeaderOnlyResponse();
}
public HttpHeaders getHeaders() { public HttpHeaders getHeaders() {
return headerParser; return headerParser;
} }
public void clear() { @Override
this.headerParser.clear(); public LocalValueMap getMap() {
this.multipartContentParser.clear(); return localValueMap;
this.multipartContentParser.clear();
this.csPool.clear();
} }
public void handleClientOperation(int operation, NetworkFacade nf, IODispatcher<HttpConnectionContext> dispatcher, HttpRequestProcessorSelector selector) { public void handleClientOperation(int operation, NetworkFacade nf, IODispatcher<HttpConnectionContext> dispatcher, HttpRequestProcessorSelector selector) {
...@@ -87,12 +103,28 @@ public class HttpConnectionContext implements IOContext { ...@@ -87,12 +103,28 @@ public class HttpConnectionContext implements IOContext {
case IOOperation.READ: case IOOperation.READ:
handleClientRecv(nf, dispatcher, selector); handleClientRecv(nf, dispatcher, selector);
break; break;
case IOOperation.WRITE:
if (resumeProcessor != null) {
try {
responseSink.resume();
resumeProcessor.resume(this);
} catch (PeerIsSlowException ignore) {
dispatcher.registerChannel(this, IOOperation.WRITE);
}
} else {
assert false;
}
break;
default: default:
dispatcher.disconnect(this, DisconnectReason.SILLY); dispatcher.disconnect(this, DisconnectReason.SILLY);
break; break;
} }
} }
public HttpResponseSink.SimpleResponseImpl simpleResponse() {
return responseSink.getSimple();
}
private void checkRemainingInputAndCompleteRequest(NetworkFacade nf, IODispatcher<HttpConnectionContext> dispatcher, long fd, HttpRequestProcessor processor) { private void checkRemainingInputAndCompleteRequest(NetworkFacade nf, IODispatcher<HttpConnectionContext> dispatcher, long fd, HttpRequestProcessor processor) {
int read;// consume and throw away the remainder of TCP input int read;// consume and throw away the remainder of TCP input
read = nf.recv(fd, recvBuffer, 1); read = nf.recv(fd, recvBuffer, 1);
...@@ -101,7 +133,11 @@ public class HttpConnectionContext implements IOContext { ...@@ -101,7 +133,11 @@ public class HttpConnectionContext implements IOContext {
dispatcher.disconnect(this, DisconnectReason.PEER); dispatcher.disconnect(this, DisconnectReason.PEER);
} else { } else {
LOG.debug().$("good [fd=").$(fd).$(']').$(); LOG.debug().$("good [fd=").$(fd).$(']').$();
processor.onRequestComplete(this, dispatcher); try {
processor.onRequestComplete(this, dispatcher);
} catch (PeerDisconnectedException ignore) {
dispatcher.disconnect(this, DisconnectReason.PEER);
}
} }
} }
...@@ -136,7 +172,12 @@ public class HttpConnectionContext implements IOContext { ...@@ -136,7 +172,12 @@ public class HttpConnectionContext implements IOContext {
headerEnd = headerParser.parse(recvBuffer, recvBuffer + read, true); headerEnd = headerParser.parse(recvBuffer, recvBuffer + read, true);
} }
final HttpRequestProcessor processor = selector.select(headerParser.getUrl()); HttpRequestProcessor processor = selector.select(headerParser.getUrl());
if (processor == null) {
processor = selector.getDefaultProcessor();
}
final boolean multipartRequest = Chars.equalsNc("multipart/form-data", headerParser.getContentType()); final boolean multipartRequest = Chars.equalsNc("multipart/form-data", headerParser.getContentType());
final boolean multipartProcessor = processor instanceof HttpMultipartContentListener; final boolean multipartProcessor = processor instanceof HttpMultipartContentListener;
...@@ -173,8 +214,28 @@ public class HttpConnectionContext implements IOContext { ...@@ -173,8 +214,28 @@ public class HttpConnectionContext implements IOContext {
} }
checkRemainingInputAndCompleteRequest(nf, dispatcher, fd, processor); checkRemainingInputAndCompleteRequest(nf, dispatcher, fd, processor);
} else { } else {
processor.onHeadersReady(this);
checkRemainingInputAndCompleteRequest(nf, dispatcher, fd, processor); // Do not expect any more bytes to be sent to us before
// we respond back to client. We will disconnect the client when
// they abuse protocol. In addition, we will not call processor
// if client has disconnected before we had a chance to reply.
read = nf.recv(fd, recvBuffer, 1);
if (read != 0) {
LOG.debug().$("disconnect after request [fd=").$(fd).$(']').$();
dispatcher.disconnect(this, DisconnectReason.PEER);
} else {
processor.onHeadersReady(this);
LOG.debug().$("good [fd=").$(fd).$(']').$();
try {
processor.onRequestComplete(this, dispatcher);
resumeProcessor = null;
} catch (PeerDisconnectedException ignore) {
dispatcher.disconnect(this, DisconnectReason.PEER);
} catch (PeerIsSlowException ignore) {
dispatcher.registerChannel(this, IOOperation.WRITE);
resumeProcessor = processor;
}
}
} }
} catch (HttpException e) { } catch (HttpException e) {
e.printStackTrace(); e.printStackTrace();
......
...@@ -54,6 +54,16 @@ public class HttpException extends RuntimeException implements Sinkable { ...@@ -54,6 +54,16 @@ public class HttpException extends RuntimeException implements Sinkable {
return this; return this;
} }
public HttpException put(int value) {
message.put(value);
return this;
}
public HttpException put(long value) {
message.put(value);
return this;
}
@Override @Override
public void toSink(CharSink sink) { public void toSink(CharSink sink) {
sink.put(message); sink.put(message);
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.http;
import com.questdb.std.Chars;
import com.questdb.std.Numbers;
import com.questdb.std.NumericException;
public class HttpRangeParser {
private static final String BYTES = "bytes=";
private long lo;
private long hi;
public long getHi() {
return hi;
}
public long getLo() {
return lo;
}
public boolean of(CharSequence range) {
if (!Chars.startsWith(range, BYTES)) {
return false;
}
int n = Chars.indexOf(range, BYTES.length(), '-');
if (n == -1) {
return false;
}
try {
this.lo = Numbers.parseLong(range, BYTES.length(), n);
if (n == range.length() - 1) {
this.hi = Long.MAX_VALUE;
} else {
this.hi = Numbers.parseLong(range, n + 1, range.length());
}
return true;
} catch (NumericException e) {
return false;
}
}
}
...@@ -26,7 +26,10 @@ package com.questdb.cutlass.http; ...@@ -26,7 +26,10 @@ package com.questdb.cutlass.http;
import com.questdb.network.IODispatcher; import com.questdb.network.IODispatcher;
public interface HttpRequestProcessor { public interface HttpRequestProcessor {
void onHeadersReady(HttpConnectionContext connectionContext); void onHeadersReady(HttpConnectionContext context);
void onRequestComplete(HttpConnectionContext connectionContext, IODispatcher<HttpConnectionContext> dispatcher); void onRequestComplete(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher);
default void resume(HttpConnectionContext context) {
}
} }
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.http;
import com.questdb.std.ObjectFactory;
public interface HttpRequestProcessorFactory extends ObjectFactory<HttpRequestProcessor> {
String getUrl();
}
...@@ -23,7 +23,8 @@ ...@@ -23,7 +23,8 @@
package com.questdb.cutlass.http; package com.questdb.cutlass.http;
@FunctionalInterface
public interface HttpRequestProcessorSelector { public interface HttpRequestProcessorSelector {
HttpRequestProcessor select(CharSequence url); HttpRequestProcessor select(CharSequence url);
HttpRequestProcessor getDefaultProcessor();
} }
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.http;
import com.questdb.std.*;
import com.questdb.std.str.AbstractCharSink;
import com.questdb.std.str.CharSink;
import com.questdb.std.time.DateFormatUtils;
import com.questdb.std.time.MillisecondClock;
import java.io.Closeable;
import java.nio.ByteBuffer;
public class HttpResponseHeaderSink extends AbstractCharSink implements Closeable, Mutable {
private static final IntObjHashMap<String> httpStatusMap = new IntObjHashMap<>();
private final long headerPtr;
private final long limit;
private final ByteBuffer headers;
private final MillisecondClock clock;
private long _wptr;
private boolean chunky;
private int code;
public HttpResponseHeaderSink(int bufferSize, MillisecondClock clock) {
this.clock = clock;
int sz = Numbers.ceilPow2(bufferSize);
this.headers = ByteBuffer.allocateDirect(sz);
this.headerPtr = _wptr = ByteBuffers.getAddress(headers);
this.limit = headerPtr + sz;
}
@Override
public void clear() {
headers.clear();
_wptr = headerPtr;
chunky = false;
}
@Override
public void close() {
ByteBuffers.release(headers);
}
public int getCode() {
return code;
}
@Override
public CharSink put(CharSequence cs) {
int len = cs.length();
long p = _wptr;
if (p + len < limit) {
Chars.strcpy(cs, len, p);
_wptr += len;
} else {
throw NoSpaceLeftInResponseBufferException.INSTANCE;
}
return this;
}
@Override
public CharSink put(char c) {
if (_wptr < limit) {
Unsafe.getUnsafe().putByte(_wptr++, (byte) c);
return this;
}
throw NoSpaceLeftInResponseBufferException.INSTANCE;
}
public String status(int code, CharSequence contentType, long contentLength) {
this.code = code;
String status = httpStatusMap.get(code);
if (status == null) {
throw new IllegalArgumentException("Illegal status code: " + code);
}
put("HTTP/1.1 ").put(code).put(' ').put(status).put(Misc.EOL);
put("Server: ").put("questDB/1.0").put(Misc.EOL);
put("Date: ");
DateFormatUtils.formatHTTP(this, clock.getTicks());
put(Misc.EOL);
if (contentLength > -2) {
if (this.chunky = (contentLength == -1)) {
put("Transfer-Encoding: ").put("chunked").put(Misc.EOL);
} else {
put("Content-Length: ").put(contentLength).put(Misc.EOL);
}
}
if (contentType != null) {
put("Content-Type: ").put(contentType).put(Misc.EOL);
}
return status;
}
ByteBuffer prepareBuffer() {
if (!chunky) {
put(Misc.EOL);
}
headers.limit((int) (_wptr - headerPtr));
return headers;
}
static {
httpStatusMap.put(200, "OK");
httpStatusMap.put(206, "Partial content");
httpStatusMap.put(304, "Not Modified");
httpStatusMap.put(400, "Bad request");
httpStatusMap.put(404, "Not Found");
httpStatusMap.put(416, "Request range not satisfiable");
httpStatusMap.put(431, "Headers too large");
httpStatusMap.put(500, "Internal server error");
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.http;
import com.questdb.network.Net;
import com.questdb.std.*;
import com.questdb.std.ex.ZLibException;
import com.questdb.std.str.AbstractCharSink;
import com.questdb.std.str.CharSink;
import com.questdb.std.str.DirectUnboundedByteSink;
import java.io.Closeable;
import java.nio.ByteBuffer;
public class HttpResponseSink implements Closeable, Mutable {
private static final int CHUNK_HEAD = 1;
private static final int CHUNK_DATA = 2;
private static final int FIN = 3;
private static final int MULTI_CHUNK = 4;
private static final int DEFLATE = 5;
private static final int MULTI_BUF_CHUNK = 6;
private static final int END_CHUNK = 7;
private static final int DONE = 8;
private static final int FLUSH = 9;
private static final int SEND_DEFLATED_CONT = 10;
private static final int SEND_DEFLATED_END = 11;
private final long out;
private final long outPtr;
private final long limit;
private final long chunkHeaderBuf;
private final DirectUnboundedByteSink chunkSink;
private final HttpResponseHeaderSink headerSink;
private final SimpleResponseImpl simple = new SimpleResponseImpl();
private final ResponseSinkImpl sink = new ResponseSinkImpl();
private final FixedSizeResponseImpl fixedSize = new FixedSizeResponseImpl();
private final ChunkedResponseImpl chunkedResponse = new ChunkedResponseImpl();
private final DirectBufferResponse directBufferResponse = new DirectBufferResponse();
private final int responseBufferSize;
private final long fd;
private final HeaderOnlyResponse headerOnlyResponse = new HeaderOnlyResponse();
private long _wPtr;
private ByteBuffer zout;
private long flushBuf;
private int flushBufSize;
private int state;
private long z_streamp = 0;
private boolean compressed = false;
private long pzout;
private int crc = 0;
private long total = 0;
private boolean header = true;
public HttpResponseSink(HttpServerConfiguration configuration, long fd) {
this.responseBufferSize = Numbers.ceilPow2(configuration.getConnectionSendBufferSize());
this.out = Unsafe.calloc(responseBufferSize);
this.headerSink = new HttpResponseHeaderSink(1024, configuration.getClock());
// size is 32bit int, as hex string max 8 bytes
this.chunkHeaderBuf = Unsafe.calloc(8 + 2 * Misc.EOL.length());
this.chunkSink = new DirectUnboundedByteSink(chunkHeaderBuf);
this.chunkSink.put(Misc.EOL);
this.outPtr = this._wPtr = out;
this.limit = outPtr + responseBufferSize;
this.fd = fd;
}
@Override
public void clear() {
Unsafe.getUnsafe().setMemory(out, responseBufferSize, (byte) 0);
headerSink.clear();
this._wPtr = outPtr;
if (zout != null) {
zout.clear();
}
resetZip();
}
@Override
public void close() {
Unsafe.free(out, responseBufferSize);
Unsafe.free(chunkHeaderBuf, 8 + 2 * Misc.EOL.length());
headerSink.close();
ByteBuffers.release(zout);
if (z_streamp != 0) {
Zip.deflateEnd(z_streamp);
z_streamp = 0;
}
}
public int getCode() {
return headerSink.getCode();
}
public DirectBufferResponse getDirectBufferResponse() {
return directBufferResponse;
}
public HeaderOnlyResponse getHeaderOnlyResponse() {
return headerOnlyResponse;
}
public SimpleResponseImpl getSimple() {
return simple;
}
public void resume() {
machine0();
}
private int deflate(boolean flush) {
final int sz = this.responseBufferSize - 8;
long p = pzout + Zip.gzipHeaderLen;
int ret;
int len;
int availIn;
// compress input until we run out of either input or output
do {
ret = Zip.deflate(z_streamp, p, sz, flush);
if (ret < 0) {
throw HttpException.instance("could not deflate [ret=").put(ret);
}
len = sz - Zip.availOut(z_streamp);
availIn = Zip.availIn(z_streamp);
} while (len == 0 && availIn > 0);
// zip did not write anything out, nothing to send
// we assume that input was too small and needs flushing
if (len == 0) {
return DONE;
}
// this is ZLib error, can't continue
if (len < 0) {
throw ZLibException.INSTANCE;
}
// augment zout with header and trailer and prepare for flush
// header
if (header) {
Unsafe.getUnsafe().copyMemory(Zip.gzipHeader, pzout, Zip.gzipHeaderLen);
header = false;
zout.position(0);
} else {
zout.position(Zip.gzipHeaderLen);
}
// trailer
if (flush && ret == 1) {
Unsafe.getUnsafe().putInt(p + len, crc); // crc
Unsafe.getUnsafe().putInt(p + len + 4, (int) total); // total
zout.limit(Zip.gzipHeaderLen + len + 8);
} else {
zout.limit(Zip.gzipHeaderLen + len);
}
// first we need to flush chunk header
prepareChunk(zout.remaining());
// if there is input remaining, don't change
return flush && ret == 1 ? SEND_DEFLATED_END : SEND_DEFLATED_CONT;
}
private void flush() {
int sent = 0;
while (sent < flushBufSize) {
int n = Net.send(fd, flushBuf + sent, flushBufSize - sent);
if (n < 0) {
// disconnected
throw PeerDisconnectedException.INSTANCE;
}
if (n == 0) {
// test how many times we tried to send before parking up
flushBuf += sent;
flushBufSize -= sent;
throw PeerIsSlowException.INSTANCE;
} else {
sent += n;
}
}
}
private void flushSingle() {
state = DONE;
flush();
}
private void machine(int nextState) {
state = nextState;
machine0();
}
private void machine0() {
while (true) {
if (flushBufSize > 0) {
flush();
}
switch (state) {
case MULTI_CHUNK:
if (compressed) {
prepareCompressedBody();
state = DEFLATE;
} else {
prepareBody();
state = DONE;
}
break;
case DEFLATE:
state = deflate(false);
break;
case SEND_DEFLATED_END:
flushBuf = ByteBuffers.getAddress(zout) + zout.position();
flushBufSize = zout.limit() - zout.position();
state = END_CHUNK;
break;
case SEND_DEFLATED_CONT:
flushBuf = ByteBuffers.getAddress(zout) + zout.position();
flushBufSize = zout.limit() - zout.position();
state = DONE;
break;
case MULTI_BUF_CHUNK:
flushBuf = out;
state = DONE;
break;
case CHUNK_HEAD:
prepareChunk((int) (_wPtr - outPtr));
state = CHUNK_DATA;
break;
case CHUNK_DATA:
prepareBody();
state = END_CHUNK;
break;
case END_CHUNK:
prepareChunk(0);
state = FIN;
break;
case FIN:
sink.put(Misc.EOL);
prepareBody();
state = DONE;
break;
case FLUSH:
state = deflate(true);
break;
case DONE:
return;
default:
break;
}
}
}
private void prepareBody() {
flushBuf = out;
flushBufSize = (int) (_wPtr - outPtr);
_wPtr = outPtr;
}
private void prepareChunk(int len) {
chunkSink.clear(Misc.EOL.length());
Numbers.appendHex(chunkSink, len);
chunkSink.put(Misc.EOL);
flushBuf = chunkHeaderBuf;
flushBufSize = chunkSink.length();
}
private void prepareCompressedBody() {
if (z_streamp == 0) {
z_streamp = Zip.deflateInit();
zout = ByteBuffer.allocateDirect(responseBufferSize);
pzout = ByteBuffers.getAddress(zout);
}
int r = (int) (_wPtr - outPtr);
Zip.setInput(z_streamp, outPtr, r);
this.crc = Zip.crc32(this.crc, outPtr, r);
this.total += r;
_wPtr = outPtr;
}
private void prepareHeaderSink() {
ByteBuffer that = headerSink.prepareBuffer();
flushBuf = ByteBuffers.getAddress(that);
flushBufSize = that.limit();
}
private void resetZip() {
if (z_streamp != 0) {
Zip.deflateReset(z_streamp);
}
this.crc = 0;
this.total = 0;
}
public class SimpleResponseImpl {
public void send(int code) {
send(code, null);
}
public void send(int code, CharSequence message) {
final String std = headerSink.status(code, "text/html; charset=utf-8", -1L);
sink.put(message == null ? std : message).put(Misc.EOL);
prepareHeaderSink();
machine(CHUNK_HEAD);
}
public void sendEmptyBody(int code) {
headerSink.status(code, "text/html; charset=utf-8", -2);
prepareHeaderSink();
flushSingle();
}
}
private class ResponseSinkImpl extends AbstractCharSink {
@Override
public void flush() {
prepareHeaderSink();
machine(CHUNK_HEAD);
}
@Override
public CharSink put(CharSequence seq) {
int len = seq.length();
long p = _wPtr;
if (p + len < limit) {
Chars.strcpy(seq, len, p);
_wPtr = p + len;
} else {
throw NoSpaceLeftInResponseBufferException.INSTANCE;
}
return this;
}
@Override
public CharSink put(char c) {
if (_wPtr < limit) {
Unsafe.getUnsafe().putByte(_wPtr++, (byte) c);
return this;
}
throw NoSpaceLeftInResponseBufferException.INSTANCE;
}
@Override
public CharSink put(float value, int scale) {
if (value == value) {
return super.put(value, scale);
}
put("null");
return this;
}
@Override
public CharSink put(double value, int scale) {
if (value == value) {
return super.put(value, scale);
}
put("null");
return this;
}
@Override
protected void putUtf8Special(char c) {
if (c < 32) {
escapeSpace(c);
} else {
switch (c) {
case '/':
case '\"':
case '\\':
put('\\');
// intentional fall through
default:
put(c);
break;
}
}
}
public void status(int status, CharSequence contentType) {
headerSink.status(status, contentType, -1);
}
private void escapeSpace(char c) {
switch (c) {
case '\b':
put("\\b");
break;
case '\f':
put("\\f");
break;
case '\n':
put("\\n");
break;
case '\r':
put("\\r");
break;
case '\t':
put("\\t");
break;
default:
put(c);
break;
}
}
}
public class FixedSizeResponseImpl {
public void done() {
}
public CharSink headers() {
return headerSink;
}
public void sendChunk() {
flushBuf = out;
flushBufSize = 0;
flushSingle();
}
public void sendHeader() {
ByteBuffer that = headerSink.prepareBuffer();
flushBuf = ByteBuffers.getAddress(that);
flushBufSize = that.limit();
flushSingle();
}
public void status(int status, CharSequence contentType, long len) {
headerSink.status(status, contentType, len);
}
}
public class DirectBufferResponse {
public long getBuffer() {
return out;
}
public int getBufferSize() {
return responseBufferSize;
}
public CharSink headerSink() {
return headerSink;
}
public void send(int size) {
flushBuf = out;
flushBufSize = size;
flushSingle();
}
public void sendHeader() {
ByteBuffer that = headerSink.prepareBuffer();
flushBuf = ByteBuffers.getAddress(that);
flushBufSize = that.limit();
flushSingle();
}
public void status(int status, CharSequence contentType, long len) {
headerSink.status(status, contentType, len);
}
}
public class HeaderOnlyResponse {
public HttpResponseHeaderSink getHeaderSink() {
return headerSink;
}
public void send() {
ByteBuffer that = headerSink.prepareBuffer();
flushBuf = ByteBuffers.getAddress(that);
flushBufSize = that.limit();
flushSingle();
}
}
private class ChunkedResponseImpl extends ResponseSinkImpl {
private long bookmark = outPtr;
public void bookmark() {
bookmark = _wPtr;
}
public void done() {
flushBufSize = 0;
if (compressed) {
machine(FLUSH);
} else {
machine(END_CHUNK);
}
}
@Override
public void flush() {
sendChunk();
}
@Override
public void status(int status, CharSequence contentType) {
super.status(status, contentType);
if (compressed) {
headerSink.put("Content-Encoding: gzip").put(Misc.EOL);
}
}
public CharSink headers() {
return headerSink;
}
public boolean resetToBookmark() {
_wPtr = bookmark;
return bookmark != outPtr;
}
public void sendChunk() {
if (outPtr != _wPtr) {
if (compressed) {
flushBufSize = 0;
machine(MULTI_CHUNK);
} else {
prepareChunk((int) (_wPtr - outPtr));
machine(MULTI_CHUNK);
}
}
}
public void sendHeader() {
ByteBuffer that = headerSink.prepareBuffer();
flushBuf = ByteBuffers.getAddress(that);
flushBufSize = that.limit();
flushSingle();
}
public void setCompressed(boolean compressed) {
HttpResponseSink.this.compressed = compressed;
}
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.http;
import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.mp.Job;
import com.questdb.mp.Worker;
import com.questdb.network.IOContextFactory;
import com.questdb.network.IODispatcher;
import com.questdb.network.IODispatchers;
import com.questdb.network.NetworkFacade;
import com.questdb.std.CharSequenceObjHashMap;
import com.questdb.std.Misc;
import com.questdb.std.ObjHashSet;
import com.questdb.std.ObjList;
import java.io.Closeable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
public class HttpServer implements Closeable {
private final static Log LOG = LogFactory.getLog(HttpServer.class);
private final HttpServerConfiguration configuration;
private final HttpContextFactory httpContextFactory;
private final ObjList<HttpRequestProcessorSelectorImpl> selectors;
private final CountDownLatch workerHaltLatch;
private final CountDownLatch started = new CountDownLatch(1);
private final int workerCount;
private final AtomicBoolean running = new AtomicBoolean();
private final ObjList<Worker> workers;
private IODispatcher<HttpConnectionContext> dispatcher;
public HttpServer(HttpServerConfiguration configuration) {
this.configuration = configuration;
this.httpContextFactory = new HttpContextFactory();
// We want processor instances to be local to each worker
// this will allow processors to use their member variables
// for state. This of course it not request state, but
// still it is better than having multiple threads hit
// same class instance.
this.workerCount = configuration.getWorkerCount();
this.workers = new ObjList<>(workerCount);
this.selectors = new ObjList<>(workerCount);
for (int i = 0; i < workerCount; i++) {
selectors.add(new HttpRequestProcessorSelectorImpl());
}
// halt latch that each worker will count down to let main
// thread know that server is done and allow server shutdown
// gracefully
this.workerHaltLatch = new CountDownLatch(workerCount);
}
public void bind(HttpRequestProcessorFactory factory) {
final String url = factory.getUrl();
assert url != null;
for (int i = 0; i < workerCount; i++) {
HttpRequestProcessorSelectorImpl selector = selectors.getQuick(i);
if (HttpServerConfiguration.DEFAULT_PROCESSOR_URL.equals(url)) {
selector.defaultRequestProcessor = factory.newInstance();
} else {
selector.processorMap.put(url, factory.newInstance());
}
}
}
@Override
public void close() {
Misc.free(dispatcher);
}
public CountDownLatch getStartedLatch() {
return started;
}
public void halt() throws InterruptedException {
if (running.compareAndSet(true, false)) {
started.await();
for (int i = 0; i < workerCount; i++) {
workers.getQuick(i).halt();
}
workerHaltLatch.await();
}
}
public void start() {
if (running.compareAndSet(false, true)) {
dispatcher = IODispatchers.create(
configuration.getDispatcherConfiguration(),
httpContextFactory
);
final NetworkFacade nf = configuration.getDispatcherConfiguration().getNetworkFacade();
for (int i = 0; i < workerCount; i++) {
ObjHashSet<Job> jobs = new ObjHashSet<>();
final int index = i;
jobs.add(dispatcher);
jobs.add(new Job() {
private final HttpRequestProcessorSelector selector = selectors.getQuick(index);
@Override
public boolean run() {
return dispatcher.processIOQueue(
(operation, context, dispatcher1)
-> context.handleClientOperation(operation, nf, dispatcher1, selector)
);
}
});
Worker worker = new Worker(jobs, workerHaltLatch, -1, LOG);
worker.setName("questdb-http-" + i);
workers.add(worker);
worker.start();
}
LOG.info().$("started").$();
started.countDown();
}
}
private static class HttpRequestProcessorSelectorImpl implements HttpRequestProcessorSelector {
private final CharSequenceObjHashMap<HttpRequestProcessor> processorMap = new CharSequenceObjHashMap<>();
private HttpRequestProcessor defaultRequestProcessor = null;
@Override
public HttpRequestProcessor select(CharSequence url) {
return processorMap.get(url);
}
@Override
public HttpRequestProcessor getDefaultProcessor() {
return defaultRequestProcessor;
}
}
private class HttpContextFactory implements IOContextFactory<HttpConnectionContext> {
@Override
public HttpConnectionContext newInstance(long fd) {
return new HttpConnectionContext(configuration, fd);
}
}
}
...@@ -23,7 +23,13 @@ ...@@ -23,7 +23,13 @@
package com.questdb.cutlass.http; package com.questdb.cutlass.http;
import com.questdb.cutlass.http.processors.StaticContentProcessorConfiguration;
import com.questdb.network.IODispatcherConfiguration;
import com.questdb.std.time.MillisecondClock;
public interface HttpServerConfiguration { public interface HttpServerConfiguration {
String DEFAULT_PROCESSOR_URL = "*";
int getConnectionHeaderBufferSize(); int getConnectionHeaderBufferSize();
int getConnectionMultipartHeaderBufferSize(); int getConnectionMultipartHeaderBufferSize();
...@@ -33,4 +39,12 @@ public interface HttpServerConfiguration { ...@@ -33,4 +39,12 @@ public interface HttpServerConfiguration {
int getConnectionSendBufferSize(); int getConnectionSendBufferSize();
int getConnectionWrapperObjPoolSize(); int getConnectionWrapperObjPoolSize();
MillisecondClock getClock();
IODispatcherConfiguration getDispatcherConfiguration();
StaticContentProcessorConfiguration getStaticContentProcessorConfiguration();
int getWorkerCount();
} }
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.http;
import com.questdb.std.CharSequenceObjHashMap;
import com.questdb.std.FilesFacade;
import com.questdb.std.Transient;
import com.questdb.std.Unsafe;
import com.questdb.std.str.DirectByteCharSequence;
import com.questdb.std.str.Path;
public final class MimeTypesCache extends CharSequenceObjHashMap<CharSequence> {
public MimeTypesCache(@Transient FilesFacade ff, @Transient Path path) {
long fd = ff.openRO(path);
if (fd < 0) {
throw HttpException.instance("could not open [file=").put(path).put(']');
}
final long fileSize = ff.length(fd);
if (fileSize < 1 || fileSize > 1024 * 1024L) {
ff.close(fd);
throw HttpException.instance("wrong file size [file=").put(path).put(", size=").put(fileSize).put(']');
}
long buffer = Unsafe.malloc(fileSize);
long read = ff.read(fd, buffer, fileSize, 0);
try {
if (read != fileSize) {
Unsafe.free(buffer, fileSize);
throw HttpException.instance("could not read [file=").put(path).put(", size=").put(fileSize).put(", read=").put(read).put(", errno=").put(ff.errno()).put(']');
}
} finally {
ff.close(fd);
}
final DirectByteCharSequence dbcs = new DirectByteCharSequence();
try {
long p = buffer;
long hi = p + fileSize;
long _lo = p;
boolean newline = true;
boolean comment = false;
CharSequence contentType = null;
while (p < hi) {
char b = (char) Unsafe.getUnsafe().getByte(p++);
switch (b) {
case '#':
comment = newline;
break;
case ' ':
case '\t':
if (!comment) {
if (newline || _lo == p - 1) {
_lo = p;
} else {
String s = dbcs.of(_lo, p - 1).toString();
_lo = p;
if (contentType == null) {
contentType = s;
} else {
this.put(s, contentType);
}
newline = false;
}
}
break;
case '\n':
case '\r':
newline = true;
comment = false;
if (_lo < p - 1 && contentType != null) {
String s = dbcs.of(_lo, p - 1).toString();
this.put(s, contentType);
}
contentType = null;
_lo = p;
break;
default:
if (newline) {
newline = false;
}
break;
}
}
} finally {
Unsafe.free(buffer, fileSize);
}
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.http;
public class NoSpaceLeftInResponseBufferException extends HttpException {
public final static NoSpaceLeftInResponseBufferException INSTANCE = new NoSpaceLeftInResponseBufferException();
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.http;
public class PeerDisconnectedException extends HttpException {
public static final PeerDisconnectedException INSTANCE = new PeerDisconnectedException();
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.http;
public class PeerIsSlowException extends HttpException {
public static final PeerIsSlowException INSTANCE = new PeerIsSlowException();
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.http.processors;
import com.questdb.cutlass.http.*;
import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.network.IODispatcher;
import com.questdb.std.*;
import com.questdb.std.str.FileNameExtractorCharSequence;
import com.questdb.std.str.LPSZ;
import com.questdb.std.str.PrefixedPath;
import java.io.Closeable;
public class StaticContentProcessor implements HttpRequestProcessor {
private static final Log LOG = LogFactory.getLog(StaticContentProcessor.class);
private final MimeTypesCache mimeTypes;
private final LocalValue<StaticContentProcessorState> stateAccessor = new LocalValue<>();
private final HttpRangeParser rangeParser = new HttpRangeParser();
private final PrefixedPath prefixedPath;
private final CharSequence indexFileName;
private final FilesFacade ff;
public StaticContentProcessor(StaticContentProcessorConfiguration configuration) {
this.mimeTypes = configuration.getMimeTypesCache();
this.prefixedPath = new PrefixedPath(configuration.getPublicDirectory());
this.indexFileName = configuration.getIndexFileName();
this.ff = configuration.getFilesFacade();
}
@Override
public void onHeadersReady(HttpConnectionContext context) {
}
@Override
public void onRequestComplete(
HttpConnectionContext context,
IODispatcher<HttpConnectionContext> dispatcher
) {
HttpHeaders headers = context.getHeaders();
CharSequence url = headers.getUrl();
LOG.info().$("handling static: ").$(url).$();
if (Chars.contains(url, "..")) {
LOG.info().$("URL abuse: ").$(url).$();
context.simpleResponse().send(404);
} else {
PrefixedPath path = prefixedPath.rewind();
if (Chars.equals(url, '/')) {
path.concat(indexFileName);
} else {
path.concat(url);
}
path.$();
if (ff.exists(path)) {
send(context, path, headers.getUrlParam("attachment") != null);
} else {
LOG.info().$("not found [path=").$(path).$(']').$();
context.simpleResponse().send(404);
}
}
}
@Override
public void resume(HttpConnectionContext context) {
StaticContentProcessorState state = stateAccessor.get(context);
if (state == null || state.fd == -1) {
return;
}
HttpResponseSink.DirectBufferResponse r = context.getDirectBufferResponse();
long wptr = r.getBuffer();
int sz = r.getBufferSize();
long l;
// todo: check what happens when this code cannot read file
while (state.bytesSent < state.sendMax && (l = ff.read(state.fd, wptr, sz, state.bytesSent)) > 0) {
if (l + state.bytesSent > state.sendMax) {
l = state.sendMax - state.bytesSent;
}
state.bytesSent += l;
r.send((int) l);
}
// reached the end naturally?
state.clear();
}
private void send(HttpConnectionContext context, LPSZ path, boolean asAttachment) {
int n = Chars.lastIndexOf(path, '.');
if (n == -1) {
LOG.info().$("Missing extension: ").$(path).$();
context.simpleResponse().send(404);
return;
}
HttpHeaders headers = context.getHeaders();
CharSequence contentType = mimeTypes.valueAt(mimeTypes.keyIndex(path, n + 1, path.length()));
CharSequence val;
if ((val = headers.getHeader("Range")) != null) {
sendRange(context, val, path, contentType, asAttachment);
return;
}
int l;
// attempt not to send file when remote side already has
// up-to-date version of the same
if ((val = headers.getHeader("If-None-Match")) != null
&& (l = val.length()) > 2
&& val.charAt(0) == '"'
&& val.charAt(l - 1) == '"') {
try {
long that = Numbers.parseLong(val, 1, l - 1);
if (that == ff.getLastModified(path)) {
context.simpleResponse().sendEmptyBody(304);
return;
}
} catch (NumericException e) {
LOG.info().$("Received wrong tag [").$(val).$("] for ").$(path).$();
context.simpleResponse().send(400);
return;
}
}
sendVanilla(context, path, contentType, asAttachment);
}
private void sendRange(
HttpConnectionContext context,
CharSequence range,
LPSZ path,
CharSequence contentType,
boolean asAttachment) {
if (rangeParser.of(range)) {
StaticContentProcessorState state = stateAccessor.get(context);
if (state == null) {
stateAccessor.set(context, state = new StaticContentProcessorState());
}
state.fd = ff.openRO(path);
if (state.fd == -1) {
LOG.info().$("Cannot open file: ").$(path).$();
context.simpleResponse().send(404);
return;
}
state.bytesSent = 0;
final long length = ff.length(path);
final long lo = rangeParser.getLo();
final long hi = rangeParser.getHi();
if (lo > length || (hi != Long.MAX_VALUE && hi > length) || lo > hi) {
context.simpleResponse().send(416);
} else {
state.bytesSent = lo;
state.sendMax = hi == Long.MAX_VALUE ? length : hi;
final HttpResponseSink.HeaderOnlyResponse r = context.getHeaderOnlyResponse();
final HttpResponseHeaderSink headers = r.getHeaderSink();
headers.status(206, contentType, state.sendMax - lo);
if (asAttachment) {
headers.put("Content-Disposition: attachment; filename=\"").put(FileNameExtractorCharSequence.get(path)).put('\"').put(Misc.EOL);
}
headers.put("Accept-Ranges: bytes").put(Misc.EOL);
headers.put("Content-Range: bytes ").put(lo).put('-').put(state.sendMax).put('/').put(length).put(Misc.EOL);
headers.put("ETag: ").put(ff.getLastModified(path)).put(Misc.EOL);
r.send();
resume(context);
}
} else {
context.simpleResponse().send(416);
}
}
private void sendVanilla(HttpConnectionContext context, LPSZ path, CharSequence contentType, boolean asAttachment) {
long fd = ff.openRO(path);
if (fd == -1) {
LOG.info().$("Cannot open file: ").$(path).$('(').$(Os.errno()).$(')').$();
context.simpleResponse().send(404);
} else {
StaticContentProcessorState h = stateAccessor.get(context);
if (h == null) {
stateAccessor.set(context, h = new StaticContentProcessorState());
}
h.fd = fd;
h.bytesSent = 0;
final long length = ff.length(path);
h.sendMax = length;
final HttpResponseSink.HeaderOnlyResponse r = context.getHeaderOnlyResponse();
final HttpResponseHeaderSink headers = r.getHeaderSink();
headers.status(200, contentType, length);
if (asAttachment) {
headers.put("Content-Disposition: attachment; filename=\"").put(FileNameExtractorCharSequence.get(path)).put("\"").put(Misc.EOL);
}
headers.put("ETag: ").put('"').put(ff.getLastModified(path)).put('"').put(Misc.EOL);
r.send();
resume(context);
}
}
private static class StaticContentProcessorState implements Mutable, Closeable {
long fd = -1;
long bytesSent;
long sendMax;
@Override
public void clear() {
if (fd > -1) {
Files.close(fd);
fd = -1;
}
bytesSent = 0;
sendMax = Long.MAX_VALUE;
}
@Override
public void close() {
clear();
}
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.http.processors;
import com.questdb.cutlass.http.MimeTypesCache;
import com.questdb.std.FilesFacade;
public interface StaticContentProcessorConfiguration {
FilesFacade getFilesFacade();
CharSequence getIndexFileName();
MimeTypesCache getMimeTypesCache();
CharSequence getPublicDirectory();
}
...@@ -25,7 +25,6 @@ package com.questdb.net.http; ...@@ -25,7 +25,6 @@ package com.questdb.net.http;
import com.questdb.ServerConfiguration; import com.questdb.ServerConfiguration;
import com.questdb.ex.ResponseContentBufferTooSmallException; import com.questdb.ex.ResponseContentBufferTooSmallException;
import com.questdb.ex.ZLibException;
import com.questdb.log.Log; import com.questdb.log.Log;
import com.questdb.log.LogFactory; import com.questdb.log.LogFactory;
import com.questdb.log.LogRecord; import com.questdb.log.LogRecord;
...@@ -33,6 +32,7 @@ import com.questdb.net.NonBlockingSecureSocketChannel; ...@@ -33,6 +32,7 @@ import com.questdb.net.NonBlockingSecureSocketChannel;
import com.questdb.std.*; import com.questdb.std.*;
import com.questdb.std.ex.DisconnectedChannelException; import com.questdb.std.ex.DisconnectedChannelException;
import com.questdb.std.ex.SlowWritableChannelException; import com.questdb.std.ex.SlowWritableChannelException;
import com.questdb.std.ex.ZLibException;
import com.questdb.std.str.AbstractCharSink; import com.questdb.std.str.AbstractCharSink;
import com.questdb.std.str.CharSink; import com.questdb.std.str.CharSink;
import com.questdb.std.str.DirectUnboundedByteSink; import com.questdb.std.str.DirectUnboundedByteSink;
......
...@@ -32,7 +32,7 @@ public interface IODispatcher<C extends IOContext> extends Closeable, Job { ...@@ -32,7 +32,7 @@ public interface IODispatcher<C extends IOContext> extends Closeable, Job {
void registerChannel(C context, int operation); void registerChannel(C context, int operation);
void processIOQueue(IORequestProcessor<C> processor); boolean processIOQueue(IORequestProcessor<C> processor);
void disconnect(C context, int disconnectReason); void disconnect(C context, int disconnectReason);
} }
...@@ -125,15 +125,22 @@ public class IODispatcherLinux<C extends IOContext> extends SynchronizedJob impl ...@@ -125,15 +125,22 @@ public class IODispatcherLinux<C extends IOContext> extends SynchronizedJob impl
} }
@Override @Override
public void processIOQueue(IORequestProcessor<C> processor) { public boolean processIOQueue(IORequestProcessor<C> processor) {
long cursor = ioEventSubSeq.next(); long cursor = ioEventSubSeq.next();
while (cursor == -2) {
cursor = ioEventSubSeq.next();
}
if (cursor > -1) { if (cursor > -1) {
IOEvent<C> event = ioEventQueue.get(cursor); IOEvent<C> event = ioEventQueue.get(cursor);
C connectionContext = event.context; C connectionContext = event.context;
final int operation = event.operation; final int operation = event.operation;
ioEventSubSeq.done(cursor); ioEventSubSeq.done(cursor);
processor.onRequest(operation, connectionContext, this); processor.onRequest(operation, connectionContext, this);
return true;
} }
return false;
} }
@Override @Override
......
...@@ -137,15 +137,22 @@ public class IODispatcherOsx<C extends IOContext> extends SynchronizedJob implem ...@@ -137,15 +137,22 @@ public class IODispatcherOsx<C extends IOContext> extends SynchronizedJob implem
} }
@Override @Override
public void processIOQueue(IORequestProcessor<C> processor) { public boolean processIOQueue(IORequestProcessor<C> processor) {
long cursor = ioEventSubSeq.next(); long cursor = ioEventSubSeq.next();
while (cursor == -2) {
cursor = ioEventSubSeq.next();
}
if (cursor > -1) { if (cursor > -1) {
IOEvent<C> event = ioEventQueue.get(cursor); IOEvent<C> event = ioEventQueue.get(cursor);
C connectionContext = event.context; C connectionContext = event.context;
final int operation = event.operation; final int operation = event.operation;
ioEventSubSeq.done(cursor); ioEventSubSeq.done(cursor);
processor.onRequest(operation, connectionContext, this); processor.onRequest(operation, connectionContext, this);
return true;
} }
return false;
} }
@Override @Override
......
...@@ -133,15 +133,22 @@ public class IODispatcherWindows<C extends IOContext> extends SynchronizedJob im ...@@ -133,15 +133,22 @@ public class IODispatcherWindows<C extends IOContext> extends SynchronizedJob im
} }
@Override @Override
public void processIOQueue(IORequestProcessor<C> processor) { public boolean processIOQueue(IORequestProcessor<C> processor) {
long cursor = ioEventSubSeq.next(); long cursor = ioEventSubSeq.next();
while (cursor == -2) {
cursor = ioEventSubSeq.next();
}
if (cursor > -1) { if (cursor > -1) {
IOEvent<C> event = ioEventQueue.get(cursor); IOEvent<C> event = ioEventQueue.get(cursor);
C connectionContext = event.context; C connectionContext = event.context;
final int operation = event.operation; final int operation = event.operation;
ioEventSubSeq.done(cursor); ioEventSubSeq.done(cursor);
processor.onRequest(operation, connectionContext, this); processor.onRequest(operation, connectionContext, this);
return true;
} }
return false;
} }
@Override @Override
......
...@@ -47,6 +47,8 @@ public interface FilesFacade { ...@@ -47,6 +47,8 @@ public interface FilesFacade {
int findType(long findPtr); int findType(long findPtr);
long getLastModified(LPSZ path);
long getMapPageSize(); long getMapPageSize();
long getOpenFileCount(); long getOpenFileCount();
......
...@@ -91,6 +91,11 @@ public class FilesFacadeImpl implements FilesFacade { ...@@ -91,6 +91,11 @@ public class FilesFacadeImpl implements FilesFacade {
return Files.findType(findPtr); return Files.findType(findPtr);
} }
@Override
public long getLastModified(LPSZ path) {
return Files.getLastModified(path);
}
@Override @Override
public long getMapPageSize() { public long getMapPageSize() {
if (mapPageSize == 0) { if (mapPageSize == 0) {
......
...@@ -21,9 +21,8 @@ ...@@ -21,9 +21,8 @@
* *
******************************************************************************/ ******************************************************************************/
package com.questdb.ex; package com.questdb.std.ex;
@SuppressWarnings("ThrowableInstanceNeverThrown")
public final class ZLibException extends RuntimeException { public final class ZLibException extends RuntimeException {
public final static ZLibException INSTANCE = new ZLibException(); public final static ZLibException INSTANCE = new ZLibException();
......
...@@ -23,6 +23,8 @@ ...@@ -23,6 +23,8 @@
package com.questdb.std.str; package com.questdb.std.str;
import com.questdb.std.Numbers;
public final class PrefixedPath extends Path { public final class PrefixedPath extends Path {
private final int prefixLen; private final int prefixLen;
...@@ -35,7 +37,7 @@ public final class PrefixedPath extends Path { ...@@ -35,7 +37,7 @@ public final class PrefixedPath extends Path {
} }
public PrefixedPath(CharSequence prefix) { public PrefixedPath(CharSequence prefix) {
this(prefix, 128); this(prefix, Numbers.ceilPow2(prefix.length() + 32));
} }
public PrefixedPath rewind() { public PrefixedPath rewind() {
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.cutlass.http;
import com.questdb.std.Chars;
import com.questdb.std.FilesFacade;
import com.questdb.std.FilesFacadeImpl;
import com.questdb.std.str.LPSZ;
import com.questdb.std.str.Path;
import com.questdb.test.tools.TestUtils;
import org.junit.Assert;
import org.junit.Test;
public class MimeTypesCacheTest {
@Test()
public void testCannotOpen() throws Exception {
TestUtils.assertMemoryLeak(() -> {
try (Path path = new Path()) {
path.of("/tmp/sdrqwhlkkhlkhasdlkahdoiquweoiuweoiqwe.ok").$();
try {
new MimeTypesCache(FilesFacadeImpl.INSTANCE, path);
Assert.fail();
} catch (HttpException e) {
Assert.assertTrue(Chars.startsWith(e.getMessage(), "could not open"));
}
}
});
}
@Test()
public void testCannotRead1() throws Exception {
testFailure(new FilesFacadeImpl() {
@Override
public long length(long fd) {
return 1024;
}
@Override
public long openRO(LPSZ name) {
return 123L;
}
@Override
public long read(long fd, long buf, long len, long offset) {
return -1;
}
}, "could not read");
}
@Test()
public void testCannotRead2() throws Exception {
testFailure(new FilesFacadeImpl() {
@Override
public long length(long fd) {
return 1024;
}
@Override
public long openRO(LPSZ name) {
return 123L;
}
@Override
public long read(long fd, long buf, long len, long offset) {
return 128;
}
}, "could not read");
}
@Test
public void testSimple() throws Exception {
TestUtils.assertMemoryLeak(new TestUtils.LeakProneCode() {
@Override
public void run() {
try (Path path = new Path()) {
path.of(this.getClass().getResource("/mime_test.types").getPath()).$();
MimeTypesCache mimeTypes = new MimeTypesCache(FilesFacadeImpl.INSTANCE, path);
Assert.assertEquals(6, mimeTypes.size());
TestUtils.assertEquals("application/andrew-inset", mimeTypes.get("ez"));
TestUtils.assertEquals("application/inkml+xml", mimeTypes.get("ink"));
TestUtils.assertEquals("application/inkml+xml", mimeTypes.get("inkml"));
TestUtils.assertEquals("application/mp21", mimeTypes.get("m21"));
TestUtils.assertEquals("application/mp21", mimeTypes.get("mp21"));
TestUtils.assertEquals("application/mp4", mimeTypes.get("mp4s"));
}
}
});
}
@Test()
public void testWrongFileSize() throws Exception {
testFailure(new FilesFacadeImpl() {
@Override
public long length(long fd) {
return 0;
}
@Override
public long openRO(LPSZ name) {
return 123L;
}
}, "wrong file size");
}
@Test()
public void testWrongFileSize2() throws Exception {
testFailure(new FilesFacadeImpl() {
@Override
public long length(long fd) {
return -1;
}
@Override
public long openRO(LPSZ name) {
return 123L;
}
}, "wrong file size");
}
@Test()
public void testWrongFileSize4() throws Exception {
testFailure(new FilesFacadeImpl() {
@Override
public long length(long fd) {
return 1024 * 1024 * 2;
}
@Override
public long openRO(LPSZ name) {
return 123L;
}
}, "wrong file size");
}
private void testFailure(FilesFacade ff, CharSequence startsWith) throws Exception {
TestUtils.assertMemoryLeak(() -> {
try (Path path = new Path()) {
try {
new MimeTypesCache(ff, path);
Assert.fail();
} catch (HttpException e) {
Assert.assertTrue(Chars.startsWith(e.getMessage(), startsWith));
}
}
});
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册