diff --git a/core/src/main/java/io/questdb/cutlass/http/HttpConnectionContext.java b/core/src/main/java/io/questdb/cutlass/http/HttpConnectionContext.java index 15c6a7d740a8d40a40c3ec80221972fdc97b0129..11a3b52ba614e33b4429b56f4ddd7379fddab76e 100644 --- a/core/src/main/java/io/questdb/cutlass/http/HttpConnectionContext.java +++ b/core/src/main/java/io/questdb/cutlass/http/HttpConnectionContext.java @@ -146,6 +146,9 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable { } catch (PeerIsSlowToReadException ignore) { LOG.debug().$("peer is slow writer").$(); dispatcher.registerChannel(this, IOOperation.READ); + } catch (ServerDisconnectException ignore) { + LOG.info().$("kicked out [fd=").$(fd).$(']'); + dispatcher.disconnect(this); } break; case IOOperation.WRITE: @@ -159,6 +162,9 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable { dispatcher.registerChannel(this, IOOperation.WRITE); } catch (PeerDisconnectedException ignore) { dispatcher.disconnect(this); + } catch (ServerDisconnectException ignore) { + LOG.info().$("kicked out [fd=").$(fd).$(']'); + dispatcher.disconnect(this); } } else { assert false; @@ -192,7 +198,8 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable { } } - private void handleClientRecv(HttpRequestProcessorSelector selector) throws PeerDisconnectedException, PeerIsSlowToReadException { + private void handleClientRecv(HttpRequestProcessorSelector selector) + throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException { try { final long fd = this.fd; // this is address of where header ended in our receive buffer diff --git a/core/src/main/java/io/questdb/cutlass/http/HttpMultipartContentListener.java b/core/src/main/java/io/questdb/cutlass/http/HttpMultipartContentListener.java index ed775fa72f7bc1ee9842ed3d696da0273d1d0aa7..83cdb415089d5cd4ceccbbe64710144ae26657a8 100644 --- a/core/src/main/java/io/questdb/cutlass/http/HttpMultipartContentListener.java +++ b/core/src/main/java/io/questdb/cutlass/http/HttpMultipartContentListener.java @@ -26,11 +26,12 @@ package io.questdb.cutlass.http; import io.questdb.network.PeerDisconnectedException; import io.questdb.network.PeerIsSlowToReadException; +import io.questdb.network.ServerDisconnectException; public interface HttpMultipartContentListener { void onChunk(long lo, long hi) throws PeerDisconnectedException, PeerIsSlowToReadException; - void onPartBegin(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException; + void onPartBegin(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException; - void onPartEnd() throws PeerDisconnectedException, PeerIsSlowToReadException; + void onPartEnd() throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException; } diff --git a/core/src/main/java/io/questdb/cutlass/http/HttpMultipartContentParser.java b/core/src/main/java/io/questdb/cutlass/http/HttpMultipartContentParser.java index 1ea606964696ee61511b88d982ddc980c2dcf4e0..18ad225f511629f4ec30e49589c5917e1e1a0ce0 100644 --- a/core/src/main/java/io/questdb/cutlass/http/HttpMultipartContentParser.java +++ b/core/src/main/java/io/questdb/cutlass/http/HttpMultipartContentParser.java @@ -26,6 +26,7 @@ package io.questdb.cutlass.http; import io.questdb.network.PeerDisconnectedException; import io.questdb.network.PeerIsSlowToReadException; +import io.questdb.network.ServerDisconnectException; import io.questdb.std.Mutable; import io.questdb.std.Unsafe; import io.questdb.std.str.DirectByteCharSequence; @@ -89,7 +90,8 @@ public class HttpMultipartContentParser implements Closeable, Mutable { return this; } - public boolean parse(long lo, long hi, HttpMultipartContentListener listener) throws PeerDisconnectedException, PeerIsSlowToReadException { + public boolean parse(long lo, long hi, HttpMultipartContentListener listener) + throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException { long _lo = Long.MAX_VALUE; long ptr = lo; while (ptr < hi) { diff --git a/core/src/main/java/io/questdb/cutlass/http/HttpRequestProcessor.java b/core/src/main/java/io/questdb/cutlass/http/HttpRequestProcessor.java index 2c2b97132e0c50514e1e3a6be67a430a971d8613..7b089a560727f88d60fc31cabc3353dd7c35a7f1 100644 --- a/core/src/main/java/io/questdb/cutlass/http/HttpRequestProcessor.java +++ b/core/src/main/java/io/questdb/cutlass/http/HttpRequestProcessor.java @@ -26,6 +26,7 @@ package io.questdb.cutlass.http; import io.questdb.network.PeerDisconnectedException; import io.questdb.network.PeerIsSlowToReadException; +import io.questdb.network.ServerDisconnectException; public interface HttpRequestProcessor { void onHeadersReady(HttpConnectionContext context); @@ -35,6 +36,6 @@ public interface HttpRequestProcessor { default void resumeRecv(HttpConnectionContext context) { } - default void resumeSend(HttpConnectionContext context) throws PeerDisconnectedException, PeerIsSlowToReadException { + default void resumeSend(HttpConnectionContext context) throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException { } } diff --git a/core/src/main/java/io/questdb/cutlass/http/processors/TextImportProcessor.java b/core/src/main/java/io/questdb/cutlass/http/processors/TextImportProcessor.java index a0d958d0d3aed746e504ffde8a70ded85a23722b..0ef5a19a5816c18babc18ebed78b1f96970169ce 100644 --- a/core/src/main/java/io/questdb/cutlass/http/processors/TextImportProcessor.java +++ b/core/src/main/java/io/questdb/cutlass/http/processors/TextImportProcessor.java @@ -34,10 +34,7 @@ import io.questdb.cutlass.text.TextException; import io.questdb.cutlass.text.TextLoader; import io.questdb.log.Log; import io.questdb.log.LogFactory; -import io.questdb.network.IOOperation; -import io.questdb.network.NoSpaceLeftInResponseBufferException; -import io.questdb.network.PeerDisconnectedException; -import io.questdb.network.PeerIsSlowToReadException; +import io.questdb.network.*; import io.questdb.std.*; import io.questdb.std.str.CharSink; @@ -275,9 +272,10 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC } @Override - public void onPartBegin(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException { - LOG.debug().$("part begin [name=").$(partHeader.getContentDispositionName()).$(']').$(); - if (Chars.equals("data", partHeader.getContentDispositionName())) { + public void onPartBegin(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException { + final CharSequence contentDisposition = partHeader.getContentDispositionName(); + LOG.debug().$("part begin [name=").$(contentDisposition).$(']').$(); + if (Chars.equalsNc("data", contentDisposition)) { final HttpRequestHeader rh = transientContext.getRequestHeader(); CharSequence name = rh.getUrlParam("name"); @@ -285,10 +283,8 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC name = partHeader.getContentDispositionFilename(); } if (name == null) { - transientContext.simpleResponse().sendStatus(400, "no name given"); - // we have to disconnect to interrupt potentially large upload - transientContext.getDispatcher().disconnect(transientContext); - return; + transientContext.simpleResponse().sendStatus(400, "no file name given"); + throw ServerDisconnectException.INSTANCE; } transientState.analysed = false; @@ -304,17 +300,21 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC transientState.forceHeader = Chars.equalsNc("true", rh.getUrlParam("forceHeader")); transientState.messagePart = MESSAGE_DATA; - } else if (Chars.equals("schema", partHeader.getContentDispositionName())) { + } else if (Chars.equalsNc("schema", contentDisposition)) { transientState.textLoader.setState(TextLoader.LOAD_JSON_METADATA); transientState.messagePart = MESSAGE_SCHEMA; } else { - // todo: disconnect - transientState.messagePart = MESSAGE_UNKNOWN; + if (partHeader.getContentDisposition() == null) { + transientContext.simpleResponse().sendStatus(400, "'Content-Disposition' multipart header missing'"); + } else { + transientContext.simpleResponse().sendStatus(400, "invalid value in 'Content-Disposition' multipart header"); + } + throw ServerDisconnectException.INSTANCE; } } @Override - public void onPartEnd() throws PeerDisconnectedException, PeerIsSlowToReadException { + public void onPartEnd() throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException { try { LOG.debug().$("part end").$(); transientState.textLoader.wrapUp(); @@ -351,14 +351,14 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC @Override public void resumeSend( HttpConnectionContext context - ) throws PeerDisconnectedException, PeerIsSlowToReadException { + ) throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException { doResumeSend(LV.get(context), context.getChunkedResponseSocket()); } private void doResumeSend( TextImportProcessorState state, HttpChunkedResponseSocket socket - ) throws PeerDisconnectedException, PeerIsSlowToReadException { + ) throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException { try { if (state.json) { @@ -374,8 +374,7 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC // is larger that response content buffer // all we can do in this scenario is to log appropriately // and disconnect socket - // todo: this is a force disconnect - throw PeerDisconnectedException.INSTANCE; + throw ServerDisconnectException.INSTANCE; } } @@ -410,7 +409,8 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC socket.done(); } - private void sendResponse(HttpConnectionContext context) throws PeerDisconnectedException, PeerIsSlowToReadException { + private void sendResponse(HttpConnectionContext context) + throws PeerDisconnectedException, PeerIsSlowToReadException, ServerDisconnectException { TextImportProcessorState state = LV.get(context); // todo: may be set this up when headers are ready? state.json = Chars.equalsNc("json", context.getRequestHeader().getUrlParam("fmt")); diff --git a/core/src/main/java/io/questdb/network/ServerDisconnectException.java b/core/src/main/java/io/questdb/network/ServerDisconnectException.java new file mode 100644 index 0000000000000000000000000000000000000000..488511b263e0113517e452514794849829cf8019 --- /dev/null +++ b/core/src/main/java/io/questdb/network/ServerDisconnectException.java @@ -0,0 +1,29 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2020 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.network; + +public class ServerDisconnectException extends Exception { + public static final ServerDisconnectException INSTANCE = new ServerDisconnectException(); +} diff --git a/core/src/test/java/io/questdb/cutlass/http/IODispatcherTest.java b/core/src/test/java/io/questdb/cutlass/http/IODispatcherTest.java index c819074b6159bf56a9c215c84e7dce83d5eee52e..3cb483962e8e88bcea3b6668ddec9fd50584323f 100644 --- a/core/src/test/java/io/questdb/cutlass/http/IODispatcherTest.java +++ b/core/src/test/java/io/questdb/cutlass/http/IODispatcherTest.java @@ -409,6 +409,110 @@ public class IODispatcherTest { "\r\n" + "--------------------------27d997ca93d2689d\r\n" + "content-disposition: form-data; name=\"data\"; filename=\"fhv_tripdata_2017-02.csv\"\r\n" + + "content-type: application/octet-stream\r\n" + + "\r\n" + + "9988" + + "\r\n" + + "--------------------------27d997ca93d2689d--", + NetworkFacadeImpl.INSTANCE, + true, + 1 + ); + } + + @Test + public void testMissingContentDisposition() throws Exception { + testImport( + "HTTP/1.1 400 Bad request\r\n" + + "Server: questDB/1.0\r\n" + + "Date: Thu, 1 Jan 1970 00:00:00 GMT\r\n" + + "Transfer-Encoding: chunked\r\n" + + "Content-Type: text/html; charset=utf-8\r\n" + + "\r\n" + + "31\r\n" + + "'Content-Disposition' multipart header missing'\r\n" + + "\r\n" + + "00\r\n" + + "\r\n", + "POST /upload HTTP/1.1\r\n" + + "host: localhost:9001\r\n" + + "User-Agent: curl/7.64.0\r\n" + + "Accept: */*\r\n" + + "Content-Length: 437760673\r\n" + + "Content-Type: multipart/form-data; boundary=------------------------27d997ca93d2689d\r\n" + + "Expect: 100-continue\r\n" + + "\r\n" + + "--------------------------27d997ca93d2689d\r\n" + + "Content-Type: application/octet-stream\r\n" + + "\r\n" + + "9988" + + "\r\n" + + "--------------------------27d997ca93d2689d--", + NetworkFacadeImpl.INSTANCE, + true, + 1 + ); + } + + @Test + public void testMissingContentDispositionName() throws Exception { + testImport( + "HTTP/1.1 400 Bad request\r\n" + + "Server: questDB/1.0\r\n" + + "Date: Thu, 1 Jan 1970 00:00:00 GMT\r\n" + + "Transfer-Encoding: chunked\r\n" + + "Content-Type: text/html; charset=utf-8\r\n" + + "\r\n" + + "39\r\n" + + "invalid value in 'Content-Disposition' multipart header\r\n" + + "\r\n" + + "00\r\n" + + "\r\n", + "POST /upload HTTP/1.1\r\n" + + "host: localhost:9001\r\n" + + "User-Agent: curl/7.64.0\r\n" + + "Accept: */*\r\n" + + "Content-Length: 437760673\r\n" + + "Content-Type: multipart/form-data; boundary=------------------------27d997ca93d2689d\r\n" + + "Expect: 100-continue\r\n" + + "\r\n" + + "--------------------------27d997ca93d2689d\r\n" + + "content-disposition: ; filename=\"fhv_tripdata_2017-02.csv\"\r\n" + + "Content-Type: application/octet-stream\r\n" + + "\r\n" + + "9988" + + "\r\n" + + "--------------------------27d997ca93d2689d--", + NetworkFacadeImpl.INSTANCE, + true, + 1 + ); + } + + @Test + public void testMissingContentDispositionFileName() throws Exception { + testImport( + "HTTP/1.1 400 Bad request\r\n" + + "Server: questDB/1.0\r\n" + + "Date: Thu, 1 Jan 1970 00:00:00 GMT\r\n" + + "Transfer-Encoding: chunked\r\n" + + "Content-Type: text/html; charset=utf-8\r\n" + + "\r\n" + + "14\r\n" + + "no file name given\r\n" + + "\r\n" + + "00\r\n" + + "\r\n", + "POST /upload HTTP/1.1\r\n" + + "host: localhost:9001\r\n" + + "User-Agent: curl/7.64.0\r\n" + + "Accept: */*\r\n" + + "Content-Length: 437760673\r\n" + + "Content-Type: multipart/form-data; boundary=------------------------27d997ca93d2689d\r\n" + + "Expect: 100-continue\r\n" + + "\r\n" + + "--------------------------27d997ca93d2689d\r\n" + + "content-disposition: form-data; name=\"data\"\r\n" + "Content-Type: application/octet-stream\r\n" + "\r\n" + "9988" +