From bac02ddf7eda56436c727227616e748431cf421c Mon Sep 17 00:00:00 2001 From: Vlad Ilyushchenko Date: Fri, 4 Oct 2019 12:50:57 +0100 Subject: [PATCH] GRIFFIN: "COPY" command will be implemented via a model --- .../io/questdb/PropServerConfiguration.java | 83 ++-- .../io/questdb/cairo/CairoConfiguration.java | 2 + .../cairo/DefaultCairoConfiguration.java | 5 + .../http/DefaultHttpServerConfiguration.java | 6 + .../io/questdb/cutlass/http/HttpServer.java | 8 - .../http/processors/JsonQueryProcessor.java | 423 ++++++++++-------- .../JsonQueryProcessorConfiguration.java | 11 +- .../http/processors/TextImportProcessor.java | 252 +++++------ .../io/questdb/cutlass/text/TextLoader.java | 3 + .../io/questdb/griffin/CompiledQuery.java | 3 + .../io/questdb/griffin/CompiledQueryImpl.java | 66 +-- .../java/io/questdb/griffin/SqlCompiler.java | 338 ++++++-------- .../java/io/questdb/griffin/SqlParser.java | 20 + .../io/questdb/griffin/model/CopyModel.java | 65 +++ .../questdb/griffin/model/ExecutionModel.java | 3 +- .../io/questdb/griffin/model/InsertModel.java | 2 +- .../io/questdb/griffin/SqlParserTest.java | 64 +-- core/src/test/resources/server.conf | 2 +- 18 files changed, 720 insertions(+), 636 deletions(-) create mode 100644 core/src/main/java/io/questdb/griffin/model/CopyModel.java diff --git a/core/src/main/java/io/questdb/PropServerConfiguration.java b/core/src/main/java/io/questdb/PropServerConfiguration.java index ab3079d92..46e1100d2 100644 --- a/core/src/main/java/io/questdb/PropServerConfiguration.java +++ b/core/src/main/java/io/questdb/PropServerConfiguration.java @@ -76,6 +76,7 @@ public class PropServerConfiguration implements ServerConfiguration { private final int sqlCharacterStoreCapacity; private final int sqlCharacterStoreSequencePoolCapacity; private final int sqlColumnPoolCapacity; + private final int sqlCopyModelPoolCapacity; private final double sqlCompactMapLoadFactor; private final int sqlExpressionPoolCapacity; private final double sqlFastMapLoadFactor; @@ -159,7 +160,7 @@ public class PropServerConfiguration implements ServerConfiguration { private int sqlColumnCastModelPoolCapacity; private int sqlRenameTableModelPoolCapacity; private int sqlWithClauseModelPoolCapacity; - private int sqlInsertAsSelectModelPoolCapacity; + private int sqlInsertModelPoolCapacity; public PropServerConfiguration(String root, Properties properties) throws ServerConfigurationException { this.sharedWorkerCount = getInt(properties, "shared.worker.count", 2); @@ -286,7 +287,9 @@ public class PropServerConfiguration implements ServerConfiguration { this.sqlColumnCastModelPoolCapacity = getInt(properties, "cairo.sql.column.cast.model.pool.capacity", 16); this.sqlRenameTableModelPoolCapacity = getInt(properties, "cairo.sql.rename.table.model.pool.capacity", 16); this.sqlWithClauseModelPoolCapacity = getInt(properties, "cairo.sql.with.clause.model.pool.capacity", 128); - this.sqlInsertAsSelectModelPoolCapacity = getInt(properties, "cairo.sql.insert.as.select.model.pool.capacity", 64); + this.sqlInsertModelPoolCapacity = getInt(properties, "cairo.sql.insert.model.pool.capacity", 64); + this.sqlCopyModelPoolCapacity = getInt(properties, "cairo.copy.model.pool.capacity", 32); + parseBindTo(properties, "line.udp.bind.to", "0.0.0.0:9009", (a, p) -> { this.lineUdpBindIPV4Address = a; @@ -310,11 +313,6 @@ public class PropServerConfiguration implements ServerConfiguration { return httpServerConfiguration; } - @Override - public PGWireConfiguration getPGWireConfiguration() { - return pgWireConfiguration; - } - @Override public LineUdpReceiverConfiguration getLineUdpReceiverConfiguration() { return lineUdpReceiverConfiguration; @@ -325,6 +323,11 @@ public class PropServerConfiguration implements ServerConfiguration { return workerPoolConfiguration; } + @Override + public PGWireConfiguration getPGWireConfiguration() { + return pgWireConfiguration; + } + private int[] getAffinity(Properties properties, String key, int httpWorkerCount) throws ServerConfigurationException { final int[] result = new int[httpWorkerCount]; String value = properties.getProperty(key); @@ -508,6 +511,16 @@ public class PropServerConfiguration implements ServerConfiguration { return MillisecondClockImpl.INSTANCE; } + @Override + public String getDispatcherLogName() { + return "http-server"; + } + + @Override + public EpollFacade getEpollFacade() { + return EpollFacadeImpl.INSTANCE; + } + @Override public int getEventCapacity() { return eventCapacity; @@ -523,6 +536,11 @@ public class PropServerConfiguration implements ServerConfiguration { return idleConnectionTimeout; } + @Override + public int getInitialBias() { + return IOOperation.READ; + } + @Override public int getInterestQueueCapacity() { return interestQueueCapacity; @@ -539,8 +557,8 @@ public class PropServerConfiguration implements ServerConfiguration { } @Override - public EpollFacade getEpollFacade() { - return EpollFacadeImpl.INSTANCE; + public int getRcvBufSize() { + return rcvBufSize; } @Override @@ -548,25 +566,10 @@ public class PropServerConfiguration implements ServerConfiguration { return SelectFacadeImpl.INSTANCE; } - @Override - public int getInitialBias() { - return IOOperation.READ; - } - @Override public int getSndBufSize() { return sndBufSize; } - - @Override - public int getRcvBufSize() { - return rcvBufSize; - } - - @Override - public String getDispatcherLogName() { - return "http-server"; - } } private class PropTextConfiguration implements TextConfiguration { @@ -633,11 +636,6 @@ public class PropServerConfiguration implements ServerConfiguration { private class PropHttpServerConfiguration implements HttpServerConfiguration { - @Override - public boolean workerHaltOnError() { - return httpWorkerHaltOnError; - } - @Override public int getConnectionPoolInitialCapacity() { return connectionPoolInitialCapacity; @@ -648,11 +646,6 @@ public class PropServerConfiguration implements ServerConfiguration { return connectionStringPoolCapacity; } - @Override - public boolean allowDeflateBeforeSend() { - return httpAllowDeflateBeforeSend; - } - @Override public int getMultipartHeaderBufferSize() { return multipartHeaderBufferSize; @@ -708,6 +701,11 @@ public class PropServerConfiguration implements ServerConfiguration { return httpWorkerCount; } + @Override + public boolean workerHaltOnError() { + return httpWorkerHaltOnError; + } + @Override public int[] getWorkerAffinity() { return httpWorkerAffinity; @@ -727,9 +725,19 @@ public class PropServerConfiguration implements ServerConfiguration { public boolean getDumpNetworkTraffic() { return false; } + + @Override + public boolean allowDeflateBeforeSend() { + return httpAllowDeflateBeforeSend; + } } private class PropCairoConfiguration implements CairoConfiguration { + @Override + public int getCopyPoolCapacity() { + return sqlCopyModelPoolCapacity; + } + @Override public int getCreateAsSelectRetryCount() { return createAsSelectRetryCount; @@ -957,7 +965,7 @@ public class PropServerConfiguration implements ServerConfiguration { @Override public int getInsertPoolCapacity() { - return sqlInsertAsSelectModelPoolCapacity; + return sqlInsertModelPoolCapacity; } } @@ -1024,6 +1032,11 @@ public class PropServerConfiguration implements ServerConfiguration { return jsonQueryConnectionCheckFrequency; } + @Override + public TextConfiguration getTextConfiguration() { + return textConfiguration; + } + @Override public MillisecondClock getClock() { return httpFrozenClock ? StationaryMillisClock.INSTANCE : MillisecondClockImpl.INSTANCE; diff --git a/core/src/main/java/io/questdb/cairo/CairoConfiguration.java b/core/src/main/java/io/questdb/cairo/CairoConfiguration.java index affce3497..a322afddf 100644 --- a/core/src/main/java/io/questdb/cairo/CairoConfiguration.java +++ b/core/src/main/java/io/questdb/cairo/CairoConfiguration.java @@ -29,6 +29,8 @@ import io.questdb.std.time.MillisecondClock; public interface CairoConfiguration { + int getCopyPoolCapacity(); + int getCreateAsSelectRetryCount(); CharSequence getDefaultMapType(); diff --git a/core/src/main/java/io/questdb/cairo/DefaultCairoConfiguration.java b/core/src/main/java/io/questdb/cairo/DefaultCairoConfiguration.java index b8bd968e0..1fd623461 100644 --- a/core/src/main/java/io/questdb/cairo/DefaultCairoConfiguration.java +++ b/core/src/main/java/io/questdb/cairo/DefaultCairoConfiguration.java @@ -271,4 +271,9 @@ public class DefaultCairoConfiguration implements CairoConfiguration { public int getInsertPoolCapacity() { return 8; } + + @Override + public int getCopyPoolCapacity() { + return 16; + } } diff --git a/core/src/main/java/io/questdb/cutlass/http/DefaultHttpServerConfiguration.java b/core/src/main/java/io/questdb/cutlass/http/DefaultHttpServerConfiguration.java index 82527991c..5a515ed2f 100644 --- a/core/src/main/java/io/questdb/cutlass/http/DefaultHttpServerConfiguration.java +++ b/core/src/main/java/io/questdb/cutlass/http/DefaultHttpServerConfiguration.java @@ -27,6 +27,7 @@ import io.questdb.cutlass.http.processors.DefaultTextImportProcessorConfiguratio import io.questdb.cutlass.http.processors.JsonQueryProcessorConfiguration; import io.questdb.cutlass.http.processors.StaticContentProcessorConfiguration; import io.questdb.cutlass.http.processors.TextImportProcessorConfiguration; +import io.questdb.cutlass.text.TextConfiguration; import io.questdb.network.DefaultIODispatcherConfiguration; import io.questdb.network.IODispatcherConfiguration; import io.questdb.std.FilesFacade; @@ -90,6 +91,11 @@ class DefaultHttpServerConfiguration implements HttpServerConfiguration { public MillisecondClock getClock() { return DefaultHttpServerConfiguration.this.getClock(); } + + @Override + public TextConfiguration getTextConfiguration() { + return textImportProcessorConfiguration.getTextConfiguration(); + } }; private final TextImportProcessorConfiguration textImportProcessorConfiguration = new DefaultTextImportProcessorConfiguration(); diff --git a/core/src/main/java/io/questdb/cutlass/http/HttpServer.java b/core/src/main/java/io/questdb/cutlass/http/HttpServer.java index 48fa7f691..aaa537a3d 100644 --- a/core/src/main/java/io/questdb/cutlass/http/HttpServer.java +++ b/core/src/main/java/io/questdb/cutlass/http/HttpServer.java @@ -210,14 +210,6 @@ public class HttpServer implements Closeable { } Misc.free(httpContextFactory); Misc.free(dispatcher); - for (int i = 0; i < workerCount; i++) { - HttpRequestProcessorSelectorImpl selector = selectors.getQuick(i); - Misc.free(selector.defaultRequestProcessor); - final ObjList urls = selector.processorMap.keys(); - for (int j = 0, m = urls.size(); j < m; j++) { - Misc.free(selector.processorMap.get(urls.getQuick(j))); - } - } } private static class HttpRequestProcessorSelectorImpl implements HttpRequestProcessorSelector { diff --git a/core/src/main/java/io/questdb/cutlass/http/processors/JsonQueryProcessor.java b/core/src/main/java/io/questdb/cutlass/http/processors/JsonQueryProcessor.java index 840d0455b..f34432674 100644 --- a/core/src/main/java/io/questdb/cutlass/http/processors/JsonQueryProcessor.java +++ b/core/src/main/java/io/questdb/cutlass/http/processors/JsonQueryProcessor.java @@ -33,16 +33,20 @@ import io.questdb.cutlass.http.HttpChunkedResponseSocket; import io.questdb.cutlass.http.HttpConnectionContext; import io.questdb.cutlass.http.HttpRequestHeader; import io.questdb.cutlass.http.HttpRequestProcessor; -import io.questdb.griffin.CompiledQuery; -import io.questdb.griffin.SqlCompiler; -import io.questdb.griffin.SqlException; -import io.questdb.griffin.SqlExecutionContextImpl; +import io.questdb.cutlass.json.JsonException; +import io.questdb.cutlass.text.Atomicity; +import io.questdb.cutlass.text.TextLoader; +import io.questdb.griffin.*; +import io.questdb.griffin.model.CopyModel; import io.questdb.log.Log; import io.questdb.log.LogFactory; import io.questdb.log.LogRecord; import io.questdb.network.*; import io.questdb.std.*; import io.questdb.std.str.CharSink; +import io.questdb.std.str.Path; +import io.questdb.std.time.DateFormatFactory; +import io.questdb.std.time.DateLocaleFactory; import java.io.Closeable; import java.util.concurrent.atomic.AtomicLong; @@ -59,6 +63,8 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { private final SqlExecutionContextImpl sqlExecutionContext = new SqlExecutionContextImpl(); private final ObjList valueWriters = new ObjList<>(); private final ObjList resumeActions = new ObjList<>(); + private final TextLoader textLoader; + private final Path path = new Path(); public JsonQueryProcessor(JsonQueryProcessorConfiguration configuration, CairoEngine engine) { // todo: add scheduler @@ -67,6 +73,20 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { this.floatScale = configuration.getFloatScale(); this.doubleScale = configuration.getDoubleScale(); + try { + this.textLoader = new TextLoader( + configuration.getTextConfiguration(), + engine, + DateLocaleFactory.INSTANCE, + new DateFormatFactory(), + io.questdb.std.microtime.DateLocaleFactory.INSTANCE, + new io.questdb.std.microtime.DateFormatFactory() + ); + } catch (JsonException e) { + // todo: we must not do this + throw new RuntimeException("damn"); + } + this.valueWriters.extendAndSet(ColumnType.BOOLEAN, this::putBooleanValue); this.valueWriters.extendAndSet(ColumnType.BYTE, this::putByteValue); this.valueWriters.extendAndSet(ColumnType.DOUBLE, this::putDoubleValue); @@ -92,17 +112,11 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { resumeActions.extendAndSet(AbstractQueryContext.QUERY_SUFFIX, this::doQuerySuffix); } - private static void putStringOrNull(CharSink r, CharSequence str) { - if (str == null) { - r.put("null"); - } else { - r.encodeUtf8AndQuote(str); - } - } - @Override public void close() { Misc.free(compiler); + Misc.free(path); + Misc.free(textLoader); } public void execute( @@ -120,6 +134,8 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { final CompiledQuery cc = compiler.compile(state.query, sqlExecutionContext); if (cc.getType() == CompiledQuery.SELECT) { state.recordCursorFactory = cc.getRecordCursorFactory(); + } else if (cc.getType() == CompiledQuery.COPY) { + copyTable(sqlExecutionContext, cc.getCopyModel()); } cacheHits.incrementAndGet(); info(state).$("execute-new [q=`").$(state.query). @@ -229,84 +245,88 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { readyForNextRequest(context, dispatcher); } - private void onQueryRecordSuffix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException { - doQueryRecordSuffix(state, socket); - doNextRecordLoop(state, socket, columnCount); - } - - private void onQueryRecord(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException { - doQueryRecord(state, socket, columnCount); - onQueryRecordSuffix(state, socket, columnCount); - } - - private void onQueryRecordPrefix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException { - doQueryRecordPrefix(state, socket); - onQueryRecord(state, socket, columnCount); - } - - private void onQueryMetadataSuffix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException { - doQueryMetadataSuffix(state, socket); - doFirstRecordLoop(state, socket, columnCount); - } - - private void onQueryMetadata(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException { - doQueryMetadata(state, socket, columnCount); - onQueryMetadataSuffix(state, socket, columnCount); + private static void putStringOrNull(CharSink r, CharSequence str) { + if (str == null) { + r.put("null"); + } else { + r.encodeUtf8AndQuote(str); + } } - private void onQueryPrefix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException { - if (doQueryPrefix(state, socket)) { - doQueryMetadata(state, socket, columnCount); - doQueryMetadataSuffix(state, socket); + private void copyTable(SqlExecutionContext executionContext, CopyModel model) throws SqlException { + try { + textLoader.clear(); + textLoader.setState(TextLoader.ANALYZE_STRUCTURE); + textLoader.configureDestination(model.getTableName().token, true, false, Atomicity.SKIP_ROW); + int len = 4 * 1024 * 1024; + long buf = Unsafe.malloc(len); + try { + path.of(GenericLexer.unquote(model.getFileName().token)).$(); + long fd = Files.openRO(path); + if (fd == -1) { + throw SqlException.$(model.getFileName().position, "could not open file [errno=").put(Os.errno()).put(']'); + } + long fileLen = Files.length(fd); + long n = (int) Files.read(fd, buf, len, 0); + if (n > 0) { + textLoader.parse(buf, buf + n, executionContext.getCairoSecurityContext()); + textLoader.setState(TextLoader.LOAD_DATA); + int read; + while (n < fileLen) { + read = (int) Files.read(fd, buf, len, n); + if (read < 1) { + throw SqlException.$(model.getFileName().position, "could not read file [errno=").put(Os.errno()).put(']'); + } + textLoader.parse(buf, buf + read, executionContext.getCairoSecurityContext()); + n += read; + } + textLoader.wrapUp(); + } + } finally { + Unsafe.free(buf, len); + } + } catch (JsonException e) { + e.printStackTrace(); + } finally { + LOG.info().$("copied").$(); } - doFirstRecordLoop(state, socket, columnCount); } - private void doNextRecordLoop(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException { - if (doQueryNextRecord(state)) { + private void doFirstRecordLoop(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException { + if (onQuerySetupFirstRecord(state)) { doRecordFetchLoop(state, socket, columnCount); } else { doQuerySuffix(state, socket, columnCount); } } - private void doRecordFetchLoop(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException { - do { - doQueryRecordPrefix(state, socket); - doQueryRecord(state, socket, columnCount); - doQueryRecordSuffix(state, socket); - } while (doQueryNextRecord(state)); - doQuerySuffix(state, socket, columnCount); - } - - private void doFirstRecordLoop(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException { - if (onQuerySetupFirstRecord(state)) { + private void doNextRecordLoop(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException { + if (doQueryNextRecord(state)) { doRecordFetchLoop(state, socket, columnCount); } else { doQuerySuffix(state, socket, columnCount); } } - private boolean onQuerySetupFirstRecord(JsonQueryProcessorState state) { - if (state.skip > 0) { - final RecordCursor cursor = state.cursor; - long target = state.skip + 1; - while (target > 0 && cursor.hasNext()) { - target--; - } - if (target > 0) { - return false; - } - state.count = state.skip; - } else { - if (!state.cursor.hasNext()) { - return false; + private void doQueryMetadata(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) { + state.queryState = AbstractQueryContext.QUERY_METADATA; + for (; state.columnIndex < columnCount; state.columnIndex++) { + socket.bookmark(); + if (state.columnIndex > 0) { + socket.put(','); } + socket.put('{'). + putQuoted("name").put(':').putQuoted(state.metadata.getColumnName(state.columnIndex)). + put(','). + putQuoted("type").put(':').putQuoted(ColumnType.nameOf(state.metadata.getColumnType(state.columnIndex))); + socket.put('}'); } + } - state.columnIndex = 0; - state.record = state.cursor.getRecord(); - return true; + private void doQueryMetadataSuffix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket) { + state.queryState = AbstractQueryContext.QUERY_METADATA_SUFFIX; + socket.bookmark(); + socket.put("],\"dataset\":["); } private boolean doQueryNextRecord(JsonQueryProcessorState state) { @@ -320,39 +340,17 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { return false; } - private void doQueryRecordPrefix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket) { - state.queryState = AbstractQueryContext.QUERY_RECORD_PREFIX; - socket.bookmark(); - if (state.count > state.skip) { - socket.put(','); - } - socket.put('['); - state.columnIndex = 0; - } - - private void onNoMoreData(JsonQueryProcessorState state) { - if (state.countRows) { - // this is the tail end of the cursor - // we don't need to read records, just round up record count - final RecordCursor cursor = state.cursor; - final long size = cursor.size(); - if (size < 0) { - long count = 1; - while (cursor.hasNext()) { - count++; - } - state.count += count; - } else { - state.count = size; - } + private boolean doQueryPrefix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket) { + if (state.noMeta) { + socket.bookmark(); + socket.put('{').putQuoted("dataset").put(":["); + return false; } - } - - private void doQueryRecordSuffix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket) { - state.queryState = AbstractQueryContext.QUERY_RECORD_SUFFIX; - state.count++; socket.bookmark(); - socket.put(']'); + socket.put('{').putQuoted("query").put(':').encodeUtf8AndQuote(state.query); + socket.put(',').putQuoted("columns").put(':').put('['); + state.columnIndex = 0; + return true; } private void doQueryRecord(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) { @@ -369,38 +367,47 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { } } - private void doQueryMetadataSuffix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket) { - state.queryState = AbstractQueryContext.QUERY_METADATA_SUFFIX; + private void doQueryRecordPrefix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket) { + state.queryState = AbstractQueryContext.QUERY_RECORD_PREFIX; socket.bookmark(); - socket.put("],\"dataset\":["); + if (state.count > state.skip) { + socket.put(','); + } + socket.put('['); + state.columnIndex = 0; } - private void doQueryMetadata(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) { - state.queryState = AbstractQueryContext.QUERY_METADATA; - for (; state.columnIndex < columnCount; state.columnIndex++) { + private void doQueryRecordSuffix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket) { + state.queryState = AbstractQueryContext.QUERY_RECORD_SUFFIX; + state.count++; + socket.bookmark(); + socket.put(']'); + } + + private void doQuerySuffix( + JsonQueryProcessorState state, + HttpChunkedResponseSocket socket, + int columnCount + ) throws PeerDisconnectedException, PeerIsSlowToReadException { + state.queryState = AbstractQueryContext.QUERY_SUFFIX; + if (state.count > -1) { socket.bookmark(); - if (state.columnIndex > 0) { - socket.put(','); - } - socket.put('{'). - putQuoted("name").put(':').putQuoted(state.metadata.getColumnName(state.columnIndex)). - put(','). - putQuoted("type").put(':').putQuoted(ColumnType.nameOf(state.metadata.getColumnType(state.columnIndex))); + socket.put(']'); + socket.put(',').putQuoted("count").put(':').put(state.count); socket.put('}'); + state.count = -1; + socket.sendChunk(); } + socket.done(); } - private boolean doQueryPrefix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket) { - if (state.noMeta) { - socket.bookmark(); - socket.put('{').putQuoted("dataset").put(":["); - return false; - } - socket.bookmark(); - socket.put('{').putQuoted("query").put(':').encodeUtf8AndQuote(state.query); - socket.put(',').putQuoted("columns").put(':').put('['); - state.columnIndex = 0; - return true; + private void doRecordFetchLoop(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException { + do { + doQueryRecordPrefix(state, socket); + doQueryRecord(state, socket, columnCount); + doQueryRecordSuffix(state, socket); + } while (doQueryNextRecord(state)); + doQuerySuffix(state, socket, columnCount); } private LogRecord error(JsonQueryProcessorState state) { @@ -437,6 +444,79 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { sendException(socket, 0, e.getMessage(), 500, state.query); } + private void onNoMoreData(JsonQueryProcessorState state) { + if (state.countRows) { + // this is the tail end of the cursor + // we don't need to read records, just round up record count + final RecordCursor cursor = state.cursor; + final long size = cursor.size(); + if (size < 0) { + long count = 1; + while (cursor.hasNext()) { + count++; + } + state.count += count; + } else { + state.count = size; + } + } + } + + private void onQueryMetadata(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException { + doQueryMetadata(state, socket, columnCount); + onQueryMetadataSuffix(state, socket, columnCount); + } + + private void onQueryMetadataSuffix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException { + doQueryMetadataSuffix(state, socket); + doFirstRecordLoop(state, socket, columnCount); + } + + private void onQueryPrefix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException { + if (doQueryPrefix(state, socket)) { + doQueryMetadata(state, socket, columnCount); + doQueryMetadataSuffix(state, socket); + } + doFirstRecordLoop(state, socket, columnCount); + } + + private void onQueryRecord(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException { + doQueryRecord(state, socket, columnCount); + onQueryRecordSuffix(state, socket, columnCount); + } + + private void onQueryRecordPrefix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException { + doQueryRecordPrefix(state, socket); + onQueryRecord(state, socket, columnCount); + } + + private void onQueryRecordSuffix(JsonQueryProcessorState state, HttpChunkedResponseSocket socket, int columnCount) throws PeerDisconnectedException, PeerIsSlowToReadException { + doQueryRecordSuffix(state, socket); + doNextRecordLoop(state, socket, columnCount); + } + + private boolean onQuerySetupFirstRecord(JsonQueryProcessorState state) { + if (state.skip > 0) { + final RecordCursor cursor = state.cursor; + long target = state.skip + 1; + while (target > 0 && cursor.hasNext()) { + target--; + } + if (target > 0) { + return false; + } + state.count = state.skip; + } else { + if (!state.cursor.hasNext()) { + return false; + } + } + + state.columnIndex = 0; + state.record = state.cursor.getRecord(); + return true; + } + private boolean parseUrl( HttpChunkedResponseSocket socket, HttpRequestHeader request, @@ -487,23 +567,17 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { return true; } - private void putLong256Value(HttpChunkedResponseSocket socket, Record rec, int col) { - socket.put('"'); - rec.getLong256(col, socket); - socket.put('"'); - } - private void putBinValue(HttpChunkedResponseSocket socket, Record record, int col) { socket.put('['); socket.put(']'); } - private void putSymValue(HttpChunkedResponseSocket socket, Record rec, int col) { - putStringOrNull(socket, rec.getSym(col)); + private void putBooleanValue(HttpChunkedResponseSocket socket, Record rec, int col) { + socket.put(rec.getBool(col)); } - private void putStrValue(HttpChunkedResponseSocket socket, Record rec, int col) { - putStringOrNull(socket, rec.getStr(col)); + private void putByteValue(HttpChunkedResponseSocket socket, Record rec, int col) { + socket.put(rec.getByte(col)); } private void putCharValue(HttpChunkedResponseSocket socket, Record rec, int col) { @@ -515,19 +589,6 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { } } - private void putShortValue(HttpChunkedResponseSocket socket, Record rec, int col) { - socket.put(rec.getShort(col)); - } - - private void putTimestampValue(HttpChunkedResponseSocket socket, Record rec, int col) { - final long t = rec.getTimestamp(col); - if (t == Long.MIN_VALUE) { - socket.put("null"); - return; - } - socket.put('"').putISODate(t).put('"'); - } - private void putDateValue(HttpChunkedResponseSocket socket, Record rec, int col) { final long d = rec.getDate(col); if (d == Long.MIN_VALUE) { @@ -537,13 +598,12 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { socket.put('"').putISODateMillis(d).put('"'); } - private void putLongValue(HttpChunkedResponseSocket socket, Record rec, int col) { - final long l = rec.getLong(col); - if (l == Long.MIN_VALUE) { - socket.put("null"); - } else { - socket.put(l); - } + private void putDoubleValue(HttpChunkedResponseSocket socket, Record rec, int col) { + socket.put(rec.getDouble(col), doubleScale); + } + + private void putFloatValue(HttpChunkedResponseSocket socket, Record rec, int col) { + socket.put(rec.getFloat(col), floatScale); } private void putIntValue(HttpChunkedResponseSocket socket, Record rec, int col) { @@ -555,20 +615,40 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { } } - private void putFloatValue(HttpChunkedResponseSocket socket, Record rec, int col) { - socket.put(rec.getFloat(col), floatScale); + private void putLong256Value(HttpChunkedResponseSocket socket, Record rec, int col) { + socket.put('"'); + rec.getLong256(col, socket); + socket.put('"'); } - private void putDoubleValue(HttpChunkedResponseSocket socket, Record rec, int col) { - socket.put(rec.getDouble(col), doubleScale); + private void putLongValue(HttpChunkedResponseSocket socket, Record rec, int col) { + final long l = rec.getLong(col); + if (l == Long.MIN_VALUE) { + socket.put("null"); + } else { + socket.put(l); + } } - private void putByteValue(HttpChunkedResponseSocket socket, Record rec, int col) { - socket.put(rec.getByte(col)); + private void putShortValue(HttpChunkedResponseSocket socket, Record rec, int col) { + socket.put(rec.getShort(col)); } - private void putBooleanValue(HttpChunkedResponseSocket socket, Record rec, int col) { - socket.put(rec.getBool(col)); + private void putStrValue(HttpChunkedResponseSocket socket, Record rec, int col) { + putStringOrNull(socket, rec.getStr(col)); + } + + private void putSymValue(HttpChunkedResponseSocket socket, Record rec, int col) { + putStringOrNull(socket, rec.getSym(col)); + } + + private void putTimestampValue(HttpChunkedResponseSocket socket, Record rec, int col) { + final long t = rec.getTimestamp(col); + if (t == Long.MIN_VALUE) { + socket.put("null"); + return; + } + socket.put('"').putISODate(t).put('"'); } private void readyForNextRequest(HttpConnectionContext context, IODispatcher dispatcher) { @@ -583,23 +663,6 @@ public class JsonQueryProcessor implements HttpRequestProcessor, Closeable { socket.done(); } - private void doQuerySuffix( - JsonQueryProcessorState state, - HttpChunkedResponseSocket socket, - int columnCount - ) throws PeerDisconnectedException, PeerIsSlowToReadException { - state.queryState = AbstractQueryContext.QUERY_SUFFIX; - if (state.count > -1) { - socket.bookmark(); - socket.put(']'); - socket.put(',').putQuoted("count").put(':').put(state.count); - socket.put('}'); - state.count = -1; - socket.sendChunk(); - } - socket.done(); - } - private void sendException( HttpChunkedResponseSocket socket, int position, diff --git a/core/src/main/java/io/questdb/cutlass/http/processors/JsonQueryProcessorConfiguration.java b/core/src/main/java/io/questdb/cutlass/http/processors/JsonQueryProcessorConfiguration.java index 81f5aaa0f..18422e2e0 100644 --- a/core/src/main/java/io/questdb/cutlass/http/processors/JsonQueryProcessorConfiguration.java +++ b/core/src/main/java/io/questdb/cutlass/http/processors/JsonQueryProcessorConfiguration.java @@ -23,17 +23,20 @@ package io.questdb.cutlass.http.processors; +import io.questdb.cutlass.text.TextConfiguration; import io.questdb.std.time.MillisecondClock; public interface JsonQueryProcessorConfiguration { - CharSequence getKeepAliveHeader(); + MillisecondClock getClock(); - int getFloatScale(); + int getConnectionCheckFrequency(); int getDoubleScale(); - int getConnectionCheckFrequency(); + int getFloatScale(); - MillisecondClock getClock(); + CharSequence getKeepAliveHeader(); + + TextConfiguration getTextConfiguration(); } diff --git a/core/src/main/java/io/questdb/cutlass/http/processors/TextImportProcessor.java b/core/src/main/java/io/questdb/cutlass/http/processors/TextImportProcessor.java index d870b1271..e1270a224 100644 --- a/core/src/main/java/io/questdb/cutlass/http/processors/TextImportProcessor.java +++ b/core/src/main/java/io/questdb/cutlass/http/processors/TextImportProcessor.java @@ -60,18 +60,11 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC // processor. For different threads to lookup the same value from local value map the key, // which is LV, has to be the same between processor instances private static final LocalValue LV = new LocalValue<>(); - - static { - atomicityParamMap.put("relaxed", Atomicity.SKIP_ROW); - atomicityParamMap.put("strict", Atomicity.SKIP_ALL); - } - private final TextImportProcessorConfiguration configuration; private final CairoEngine engine; private HttpConnectionContext transientContext; private IODispatcher transientDispatcher; private TextImportProcessorState transientState; - public TextImportProcessor( TextImportProcessorConfiguration configuration, CairoEngine cairoEngine @@ -80,6 +73,121 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC this.engine = cairoEngine; } + @Override + public void close() { + + } + + @Override + public void onChunk(HttpRequestHeader partHeader, long lo, long hi) { + if (hi > lo) { + try { + transientState.textLoader.parse(lo, hi, transientContext.getCairoSecurityContext()); + if (transientState.messagePart == MESSAGE_DATA && !transientState.analysed) { + transientState.analysed = true; + transientState.textLoader.setState(TextLoader.LOAD_DATA); + } + } catch (JsonException e) { + // todo: reply something sensible + e.printStackTrace(); + } + } + } + + @Override + public void onPartBegin(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException { + LOG.debug().$("part begin [name=").$(partHeader.getContentDispositionName()).$(']').$(); + if (Chars.equals("data", partHeader.getContentDispositionName())) { + + final HttpRequestHeader rh = transientContext.getRequestHeader(); + CharSequence name = rh.getUrlParam("name"); + if (name == null) { + name = partHeader.getContentDispositionFilename(); + } + if (name == null) { + transientContext.simpleResponse().sendStatus(400, "no name given"); + // we have to disconnect to interrupt potentially large upload + transientDispatcher.disconnect(transientContext); + return; + } + + transientState.analysed = false; + transientState.textLoader.configureDestination( + name, + Chars.equalsNc("true", rh.getUrlParam("overwrite")), + Chars.equalsNc("true", rh.getUrlParam("durable")), + // todo: these values are incorrect, but ok for now + getAtomicity(rh.getUrlParam("atomicity")) + ); + transientState.textLoader.setForceHeaders(Chars.equalsNc("true", rh.getUrlParam("forceHeader"))); + transientState.textLoader.setState(TextLoader.ANALYZE_STRUCTURE); + + transientState.forceHeader = Chars.equalsNc("true", rh.getUrlParam("forceHeader")); + transientState.messagePart = MESSAGE_DATA; + } else if (Chars.equals("schema", partHeader.getContentDispositionName())) { + transientState.textLoader.setState(TextLoader.LOAD_JSON_METADATA); + transientState.messagePart = MESSAGE_SCHEMA; + } else { + // todo: disconnect + transientState.messagePart = MESSAGE_UNKNOWN; + } + } + + // This processor implements HttpMultipartContentListener, methods of which + // have neither context nor dispatcher. During "chunk" processing we may need + // to send something back to client, or disconnect them. To do that we need + // these transient references. resumeRecv() will set them and they will remain + // valid during multipart events. + + @Override + public void onPartEnd(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException { + try { + LOG.debug().$("part end").$(); + transientState.textLoader.wrapUp(); + if (transientState.messagePart == MESSAGE_DATA) { + sendResponse(transientContext); + } + } catch (JsonException e) { + handleJsonException(e); + } + } + + @Override + public void onHeadersReady(HttpConnectionContext context) { + + } + + @Override + public void onRequestComplete(HttpConnectionContext context, IODispatcher dispatcher) { + transientState.clear(); + context.clear(); + dispatcher.registerChannel(context, IOOperation.READ); + } + + @Override + public void resumeRecv(HttpConnectionContext context, IODispatcher dispatcher) { + this.transientContext = context; + this.transientDispatcher = dispatcher; + this.transientState = LV.get(context); + if (this.transientState == null) { + try { + LOG.debug().$("new text state").$(); + LV.set(context, this.transientState = new TextImportProcessorState(configuration.getTextConfiguration(), engine)); + } catch (JsonException e) { + // todo: handle gracefully + e.printStackTrace(); + } + } + } + + @Override + public void resumeSend( + HttpConnectionContext context, + IODispatcher dispatcher + ) throws PeerDisconnectedException, PeerIsSlowToReadException { + doResumeSend(LV.get(context), context.getChunkedResponseSocket()); + } + private static void resumeJson(TextImportProcessorState state, HttpChunkedResponseSocket socket) throws PeerDisconnectedException, PeerIsSlowToReadException { final TextLoader textLoader = state.textLoader; final RecordMetadata m = textLoader.getMetadata(); @@ -111,16 +219,6 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC putQuoted("type").put(':').putQuoted(ColumnType.nameOf(m.getColumnType(state.columnIndex))).put(','). putQuoted("size").put(':').put(ColumnType.sizeOf(m.getColumnType(state.columnIndex))).put(','). putQuoted("errors").put(':').put(errors.getQuick(state.columnIndex)); - - // todo: resolve these attributes -// if (im.pattern != null) { -// r.put(',').putQuoted("pattern").put(':').putQuoted(im.pattern); -// } -// -// if (im.dateLocale != null) { -// r.put(',').putQuoted("locale").put(':').putQuoted(im.dateLocale.getId()); -// } - socket.put('}'); } state.responseState = RESPONSE_SUFFIX; @@ -153,12 +251,6 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC return b; } - // This processor implements HttpMultipartContentListener, methods of which - // have neither context nor dispatcher. During "chunk" processing we may need - // to send something back to client, or disconnect them. To do that we need - // these transient references. resumeRecv() will set them and they will remain - // valid during multipart events. - private static void pad(CharSink b, int w, long value) { int len = (int) Math.log10(value); if (len < 0) { @@ -258,115 +350,6 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC return atomicity == -1 ? Atomicity.SKIP_COL : atomicity; } - @Override - public void close() { - - } - - @Override - public void onChunk(HttpRequestHeader partHeader, long lo, long hi) { - if (hi > lo) { - try { - transientState.textLoader.parse(lo, hi, transientContext.getCairoSecurityContext()); - if (transientState.messagePart == MESSAGE_DATA && !transientState.analysed) { - transientState.analysed = true; - transientState.textLoader.setState(TextLoader.LOAD_DATA); - } - } catch (JsonException e) { - // todo: reply something sensible - e.printStackTrace(); - } - } - } - - @Override - public void onPartBegin(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException { - LOG.debug().$("part begin [name=").$(partHeader.getContentDispositionName()).$(']').$(); - if (Chars.equals("data", partHeader.getContentDispositionName())) { - - final HttpRequestHeader rh = transientContext.getRequestHeader(); - CharSequence name = rh.getUrlParam("name"); - if (name == null) { - name = partHeader.getContentDispositionFilename(); - } - if (name == null) { - transientContext.simpleResponse().sendStatus(400, "no name given"); - // we have to disconnect to interrupt potentially large upload - transientDispatcher.disconnect(transientContext); - return; - } - - transientState.analysed = false; - transientState.textLoader.configureDestination( - name, - Chars.equalsNc("true", rh.getUrlParam("overwrite")), - Chars.equalsNc("true", rh.getUrlParam("durable")), - // todo: these values are incorrect, but ok for now - getAtomicity(rh.getUrlParam("atomicity")) - ); - transientState.textLoader.setForceHeaders(Chars.equalsNc("true", rh.getUrlParam("forceHeader"))); - transientState.textLoader.setState(TextLoader.ANALYZE_STRUCTURE); - - transientState.forceHeader = Chars.equalsNc("true", rh.getUrlParam("forceHeader")); - transientState.messagePart = MESSAGE_DATA; - } else if (Chars.equals("schema", partHeader.getContentDispositionName())) { - transientState.textLoader.setState(TextLoader.LOAD_JSON_METADATA); - transientState.messagePart = MESSAGE_SCHEMA; - } else { - // todo: disconnect - transientState.messagePart = MESSAGE_UNKNOWN; - } - } - - @Override - public void onPartEnd(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException { - try { - LOG.debug().$("part end").$(); - transientState.textLoader.wrapUp(); - if (transientState.messagePart == MESSAGE_DATA) { - sendResponse(transientContext); - } - } catch (JsonException e) { - handleJsonException(e); - } - } - - @Override - public void onHeadersReady(HttpConnectionContext context) { - - } - - @Override - public void onRequestComplete(HttpConnectionContext context, IODispatcher dispatcher) { - transientState.clear(); - context.clear(); - dispatcher.registerChannel(context, IOOperation.READ); - } - - @Override - public void resumeRecv(HttpConnectionContext context, IODispatcher dispatcher) { - this.transientContext = context; - this.transientDispatcher = dispatcher; - this.transientState = LV.get(context); - if (this.transientState == null) { - try { - LOG.debug().$("new text state").$(); - LV.set(context, this.transientState = new TextImportProcessorState(configuration.getTextConfiguration(), engine)); - } catch (JsonException e) { - // todo: handle gracefully - e.printStackTrace(); - } - } - } - - @Override - public void resumeSend( - HttpConnectionContext context, - IODispatcher dispatcher - ) throws PeerDisconnectedException, PeerIsSlowToReadException { - doResumeSend(LV.get(context), context.getChunkedResponseSocket()); - } - private void doResumeSend( TextImportProcessorState state, HttpChunkedResponseSocket socket @@ -439,4 +422,9 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC sendError(context, state.stateMessage, state.json); } } + + static { + atomicityParamMap.put("relaxed", Atomicity.SKIP_ROW); + atomicityParamMap.put("strict", Atomicity.SKIP_ALL); + } } diff --git a/core/src/main/java/io/questdb/cutlass/text/TextLoader.java b/core/src/main/java/io/questdb/cutlass/text/TextLoader.java index 94561fc1f..2ba05d106 100644 --- a/core/src/main/java/io/questdb/cutlass/text/TextLoader.java +++ b/core/src/main/java/io/questdb/cutlass/text/TextLoader.java @@ -61,6 +61,9 @@ public class TextLoader implements Closeable, Mutable { private boolean forceHeaders = false; private byte columnDelimiter = -1; + /** + * @throws JsonException when default configuration cannot be loaded from classpath + */ public TextLoader( TextConfiguration textConfiguration, CairoEngine engine, diff --git a/core/src/main/java/io/questdb/griffin/CompiledQuery.java b/core/src/main/java/io/questdb/griffin/CompiledQuery.java index 32de447b5..bad92a809 100644 --- a/core/src/main/java/io/questdb/griffin/CompiledQuery.java +++ b/core/src/main/java/io/questdb/griffin/CompiledQuery.java @@ -25,6 +25,7 @@ package io.questdb.griffin; import io.questdb.cairo.sql.InsertStatement; import io.questdb.cairo.sql.RecordCursorFactory; +import io.questdb.griffin.model.CopyModel; public interface CompiledQuery { int SELECT = 0; @@ -42,5 +43,7 @@ public interface CompiledQuery { InsertStatement getInsertStatement(); + CopyModel getCopyModel(); + int getType(); } diff --git a/core/src/main/java/io/questdb/griffin/CompiledQueryImpl.java b/core/src/main/java/io/questdb/griffin/CompiledQueryImpl.java index 7db291f59..e716c376b 100644 --- a/core/src/main/java/io/questdb/griffin/CompiledQueryImpl.java +++ b/core/src/main/java/io/questdb/griffin/CompiledQueryImpl.java @@ -25,44 +25,47 @@ package io.questdb.griffin; import io.questdb.cairo.sql.InsertStatement; import io.questdb.cairo.sql.RecordCursorFactory; +import io.questdb.griffin.model.CopyModel; public class CompiledQueryImpl implements CompiledQuery { private RecordCursorFactory recordCursorFactory; private InsertStatement insertStatement; + private CopyModel copyModel; private int type; - CompiledQuery of(RecordCursorFactory recordCursorFactory) { - this.type = SELECT; - this.recordCursorFactory = recordCursorFactory; - return this; + @Override + public RecordCursorFactory getRecordCursorFactory() { + return recordCursorFactory; } - CompiledQuery ofTruncate() { - this.type = TRUNCATE; - return this; + @Override + public InsertStatement getInsertStatement() { + return insertStatement; } - CompiledQuery ofAlter() { - this.type = ALTER; - return this; + @Override + public CopyModel getCopyModel() { + return copyModel; } - CompiledQuery ofRepair() { - this.type = REPAIR; - return this; + @Override + public int getType() { + return type; } - CompiledQuery ofSet() { - this.type = SET; + CompiledQuery of(RecordCursorFactory recordCursorFactory) { + this.type = SELECT; + this.recordCursorFactory = recordCursorFactory; return this; } - CompiledQuery ofDrop() { - this.type = DROP; + CompiledQuery ofAlter() { + this.type = ALTER; return this; } - CompiledQuery ofCopy() { + CompiledQuery ofCopy(CopyModel copyModel) { + this.copyModel = copyModel; this.type = COPY; return this; } @@ -72,8 +75,8 @@ public class CompiledQueryImpl implements CompiledQuery { return this; } - CompiledQuery ofInsertAsSelect() { - this.type = INSERT_AS_SELECT; + CompiledQuery ofDrop() { + this.type = DROP; return this; } @@ -83,18 +86,23 @@ public class CompiledQueryImpl implements CompiledQuery { return this; } - @Override - public RecordCursorFactory getRecordCursorFactory() { - return recordCursorFactory; + CompiledQuery ofInsertAsSelect() { + this.type = INSERT_AS_SELECT; + return this; } - @Override - public InsertStatement getInsertStatement() { - return insertStatement; + CompiledQuery ofRepair() { + this.type = REPAIR; + return this; } - @Override - public int getType() { - return type; + CompiledQuery ofSet() { + this.type = SET; + return this; + } + + CompiledQuery ofTruncate() { + this.type = TRUNCATE; + return this; } } diff --git a/core/src/main/java/io/questdb/griffin/SqlCompiler.java b/core/src/main/java/io/questdb/griffin/SqlCompiler.java index 69fe064d1..d0cbea642 100644 --- a/core/src/main/java/io/questdb/griffin/SqlCompiler.java +++ b/core/src/main/java/io/questdb/griffin/SqlCompiler.java @@ -24,20 +24,12 @@ package io.questdb.griffin; import io.questdb.cairo.*; -import io.questdb.cairo.security.AllowAllCairoSecurityContext; import io.questdb.cairo.sql.*; -import io.questdb.cutlass.json.JsonException; -import io.questdb.cutlass.text.Atomicity; -import io.questdb.cutlass.text.DefaultTextConfiguration; -import io.questdb.cutlass.text.TextConfiguration; -import io.questdb.cutlass.text.TextLoader; import io.questdb.griffin.model.*; import io.questdb.log.Log; import io.questdb.log.LogFactory; import io.questdb.std.*; import io.questdb.std.str.Path; -import io.questdb.std.time.DateFormatFactory; -import io.questdb.std.time.DateLocaleFactory; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -49,31 +41,6 @@ public class SqlCompiler implements Closeable { public static final ObjList sqlControlSymbols = new ObjList<>(8); private final static Log LOG = LogFactory.getLog(SqlCompiler.class); private static final IntList castGroups = new IntList(); - - static { - castGroups.extendAndSet(ColumnType.BOOLEAN, 2); - castGroups.extendAndSet(ColumnType.BYTE, 1); - castGroups.extendAndSet(ColumnType.SHORT, 1); - castGroups.extendAndSet(ColumnType.CHAR, 1); - castGroups.extendAndSet(ColumnType.INT, 1); - castGroups.extendAndSet(ColumnType.LONG, 1); - castGroups.extendAndSet(ColumnType.FLOAT, 1); - castGroups.extendAndSet(ColumnType.DOUBLE, 1); - castGroups.extendAndSet(ColumnType.DATE, 1); - castGroups.extendAndSet(ColumnType.TIMESTAMP, 1); - castGroups.extendAndSet(ColumnType.STRING, 3); - castGroups.extendAndSet(ColumnType.SYMBOL, 3); - castGroups.extendAndSet(ColumnType.BINARY, 4); - - sqlControlSymbols.add("("); - sqlControlSymbols.add(";"); - sqlControlSymbols.add(")"); - sqlControlSymbols.add(","); - sqlControlSymbols.add("/*"); - sqlControlSymbols.add("*/"); - sqlControlSymbols.add("--"); - } - private final SqlOptimiser optimiser; private final SqlParser parser; private final ObjectPool sqlNodePool; @@ -131,8 +98,6 @@ public class SqlCompiler implements Closeable { keywordBasedExecutors.put("SET", this::compileSet); keywordBasedExecutors.put("drop", this::dropTable); keywordBasedExecutors.put("DROP", this::dropTable); - keywordBasedExecutors.put("copy", this::copyTable); - keywordBasedExecutors.put("COPY", this::copyTable); configureLexer(lexer); @@ -172,6 +137,30 @@ public class SqlCompiler implements Closeable { } } + @Override + public void close() { + Misc.free(path); + } + + public CompiledQuery compile(CharSequence query) throws SqlException { + return compile(query, DefaultSqlExecutionContext.INSTANCE); + } + + public CompiledQuery compile(@NotNull CharSequence query, @NotNull SqlExecutionContext executionContext) throws SqlException { + clear(); + // + // these are quick executions that do not require building of a model + // + lexer.of(query); + + final CharSequence tok = SqlUtil.fetchNext(lexer); + final KeywordBasedExecutor executor = keywordBasedExecutors.get(tok); + if (executor == null) { + return compileUsingModel(executionContext); + } + return executor.execute(executionContext); + } + // Creates data type converter. // INT and LONG NaN values are cast to their representation rather than Double or Float NaN. private static RecordToRowCopier assembleRecordToRowCopier(BytecodeAssembler asm, ColumnTypes from, RecordMetadata to, ColumnFilter toColumnFilter) { @@ -633,30 +622,6 @@ public class SqlCompiler implements Closeable { return tok; } - @Override - public void close() { - Misc.free(path); - } - - public CompiledQuery compile(CharSequence query) throws SqlException { - return compile(query, DefaultSqlExecutionContext.INSTANCE); - } - - public CompiledQuery compile(@NotNull CharSequence query, @NotNull SqlExecutionContext executionContext) throws SqlException { - clear(); - // - // these are quick executions that do not require building of a model - // - lexer.of(query); - - final CharSequence tok = SqlUtil.fetchNext(lexer); - final KeywordBasedExecutor executor = keywordBasedExecutors.get(tok); - if (executor == null) { - return compileUsingModel(executionContext); - } - return executor.execute(executionContext); - } - private CompiledQuery alterTable(SqlExecutionContext executionContext) throws SqlException { CharSequence tok; expectKeyword(lexer, "table"); @@ -851,7 +816,7 @@ public class SqlCompiler implements Closeable { switch (model.getModelType()) { case ExecutionModel.QUERY: return optimiser.optimise((QueryModel) model, executionContext); - case ExecutionModel.INSERT_AS_SELECT: + case ExecutionModel.INSERT: InsertModel insertModel = (InsertModel) model; if (insertModel.getQueryModel() != null) { return validateAndOptimiseInsertAsSelect(insertModel, executionContext); @@ -863,19 +828,6 @@ public class SqlCompiler implements Closeable { } } - private ExecutionModel lightlyValidateInsertModel(InsertModel model) throws SqlException { - ExpressionNode tableName = model.getTableName(); - if (tableName.type != ExpressionNode.LITERAL) { - throw SqlException.$(tableName.position, "literal expected"); - } - - if (model.getColumnSet().size() > 0 && model.getColumnSet().size() != model.getColumnValues().size()) { - throw SqlException.$(model.getColumnPosition(0), "value count does not match column count"); - } - - return model; - } - private CompiledQuery compileSet(SqlExecutionContext executionContext) { return compiledQuery.ofSet(); } @@ -898,6 +850,8 @@ public class SqlCompiler implements Closeable { return compiledQuery.of(generate((QueryModel) executionModel, executionContext)); case ExecutionModel.CREATE_TABLE: return createTableWithRetries(executionModel, executionContext); + case ExecutionModel.COPY: + return compiledQuery.ofCopy((CopyModel) executionModel); default: InsertModel insertModel = (InsertModel) executionModel; if (insertModel.getQueryModel() != null) { @@ -1057,85 +1011,6 @@ public class SqlCompiler implements Closeable { return compiledQuery.ofDrop(); } - private CompiledQuery copyTable(SqlExecutionContext executionContext) throws SqlException { - - CharSequence tok; - - final int tableNamePosition = lexer.getPosition(); - - tok = expectToken(lexer, "table name"); - - LOG.info().$("copying").$(); - - final TextConfiguration textConfiguration = new DefaultTextConfiguration() { - @Override - public int getRollBufferSize() { - return 4 * 1024 * 1024; - } - - @Override - public int getRollBufferLimit() { - return 8 * 1024 * 1024; - } - }; - - try { - try (TextLoader textLoader = new TextLoader( - textConfiguration, - engine, - DateLocaleFactory.INSTANCE, - new DateFormatFactory(), - io.questdb.std.microtime.DateLocaleFactory.INSTANCE, - new io.questdb.std.microtime.DateFormatFactory() - )) { - textLoader.setState(TextLoader.ANALYZE_STRUCTURE); - textLoader.configureDestination("test", true, false, Atomicity.SKIP_ROW); - // if (columnSeparator > 0) { - // textLoader.configureColumnDelimiter(columnSeparator); - // } - int len = 4 * 1024 * 1024; - long buf = Unsafe.malloc(len); - try { - path.of("C:\\Users\\blues\\shared\\transactions.csv").$(); - long fd = Files.openRO(path); - if (fd == -1) { - return null; - } - long fileLen = Files.length(fd); - long n = (int) Files.read(fd, buf, len, 0); - if (n > 0) { - textLoader.parse(buf, buf + n, AllowAllCairoSecurityContext.INSTANCE); - - textLoader.setState(TextLoader.LOAD_DATA); - - int read; - while (n < fileLen) { - read = (int) Files.read(fd, buf, len, n); - if (read < 1) { - // shit - break; - } - textLoader.parse(buf, buf + read, AllowAllCairoSecurityContext.INSTANCE); - n += read; - } - textLoader.wrapUp(); - } - } finally { - Unsafe.free(buf, len); - } - } - } catch (JsonException e) { - e.printStackTrace(); - } finally { - LOG.info().$("copied").$(); - } - -// CharSequence tableName = GenericLexer.immutableOf(tok); - - - return compiledQuery.ofCopy(); - } - private CompiledQuery executeWithRetries( ExecutableMethod method, ExecutionModel executionModel, @@ -1161,6 +1036,71 @@ public class SqlCompiler implements Closeable { return codeGenerator.generate(queryModel, executionContext); } + private CompiledQuery insert(ExecutionModel executionModel, SqlExecutionContext executionContext) throws SqlException { + final InsertModel model = (InsertModel) executionModel; + final ExpressionNode name = model.getTableName(); + tableExistsOrFail(name.position, name.token, executionContext); + + try (TableReader reader = engine.getReader(executionContext.getCairoSecurityContext(), name.token, TableUtils.ANY_TABLE_VERSION)) { + final long structureVersion = reader.getVersion(); + final RecordMetadata metadata = reader.getMetadata(); + final int writerTimestampIndex = metadata.getTimestampIndex(); + final CharSequenceHashSet columnSet = model.getColumnSet(); + final int columnSetSize = columnSet.size(); + final ColumnFilter columnFilter; + final ObjList valueFunctions; + Function timestampFunction = null; + if (columnSetSize > 0) { + listColumnFilter.clear(); + columnFilter = listColumnFilter; + valueFunctions = new ObjList<>(columnSetSize); + for (int i = 0; i < columnSetSize; i++) { + int index = metadata.getColumnIndex(columnSet.get(i)); + if (index < 0) { + throw SqlException.invalidColumn(model.getColumnPosition(i), columnSet.get(i)); + } + + final Function function = functionParser.parseFunction(model.getColumnValues().getQuick(i), metadata, executionContext); + if (!isAssignableFrom(metadata.getColumnType(index), function.getType())) { + throw SqlException.$(model.getColumnValues().getQuick(i).position, "inconvertible types: ").put(ColumnType.nameOf(function.getType())).put(" -> ").put(ColumnType.nameOf(metadata.getColumnType(index))); + } + + if (i == writerTimestampIndex) { + timestampFunction = function; + } else { + valueFunctions.add(function); + listColumnFilter.add(index); + } + } + } else { + final int columnCount = metadata.getColumnCount(); + entityColumnFilter.of(columnCount); + columnFilter = entityColumnFilter; + valueFunctions = new ObjList<>(columnCount); + for (int i = 0; i < columnCount; i++) { + Function function = functionParser.parseFunction(model.getColumnValues().getQuick(i), metadata, executionContext); + if (!isAssignableFrom(metadata.getColumnType(i), function.getType())) { + throw SqlException.$(model.getColumnValues().getQuick(i).position, "inconvertible types: ").put(ColumnType.nameOf(function.getType())).put(" -> ").put(ColumnType.nameOf(metadata.getColumnType(i))); + } + if (i == writerTimestampIndex) { + timestampFunction = function; + } else { + valueFunctions.add(function); + } + } + } + + // validate timestamp + if (writerTimestampIndex > -1 && timestampFunction == null) { + throw SqlException.$(0, "insert statement must populate timestamp"); + } + + VirtualRecord record = new VirtualRecord(valueFunctions); + RecordToRowCopier copier = assembleRecordToRowCopier(asm, record, metadata, columnFilter); + return compiledQuery.ofInsert(new InsertStatementImpl(record, copier, timestampFunction, structureVersion)); + } + } + private CompiledQuery insertAsSelect(ExecutionModel executionModel, SqlExecutionContext executionContext) throws SqlException { final InsertModel model = (InsertModel) executionModel; final ExpressionNode name = model.getTableName(); @@ -1261,69 +1201,17 @@ public class SqlCompiler implements Closeable { return compiledQuery.ofInsertAsSelect(); } - private CompiledQuery insert(ExecutionModel executionModel, SqlExecutionContext executionContext) throws SqlException { - final InsertModel model = (InsertModel) executionModel; - final ExpressionNode name = model.getTableName(); - tableExistsOrFail(name.position, name.token, executionContext); - - try (TableReader reader = engine.getReader(executionContext.getCairoSecurityContext(), name.token, TableUtils.ANY_TABLE_VERSION)) { - final long structureVersion = reader.getVersion(); - final RecordMetadata metadata = reader.getMetadata(); - final int writerTimestampIndex = metadata.getTimestampIndex(); - final CharSequenceHashSet columnSet = model.getColumnSet(); - final int columnSetSize = columnSet.size(); - final ColumnFilter columnFilter; - final ObjList valueFunctions; - Function timestampFunction = null; - if (columnSetSize > 0) { - listColumnFilter.clear(); - columnFilter = listColumnFilter; - valueFunctions = new ObjList<>(columnSetSize); - for (int i = 0; i < columnSetSize; i++) { - int index = metadata.getColumnIndex(columnSet.get(i)); - if (index < 0) { - throw SqlException.invalidColumn(model.getColumnPosition(i), columnSet.get(i)); - } - - final Function function = functionParser.parseFunction(model.getColumnValues().getQuick(i), metadata, executionContext); - if (!isAssignableFrom(metadata.getColumnType(index), function.getType())) { - throw SqlException.$(model.getColumnValues().getQuick(i).position, "inconvertible types: ").put(ColumnType.nameOf(function.getType())).put(" -> ").put(ColumnType.nameOf(metadata.getColumnType(index))); - } - - if (i == writerTimestampIndex) { - timestampFunction = function; - } else { - valueFunctions.add(function); - listColumnFilter.add(index); - } - } - } else { - final int columnCount = metadata.getColumnCount(); - entityColumnFilter.of(columnCount); - columnFilter = entityColumnFilter; - valueFunctions = new ObjList<>(columnCount); - for (int i = 0; i < columnCount; i++) { - Function function = functionParser.parseFunction(model.getColumnValues().getQuick(i), metadata, executionContext); - if (!isAssignableFrom(metadata.getColumnType(i), function.getType())) { - throw SqlException.$(model.getColumnValues().getQuick(i).position, "inconvertible types: ").put(ColumnType.nameOf(function.getType())).put(" -> ").put(ColumnType.nameOf(metadata.getColumnType(i))); - } - if (i == writerTimestampIndex) { - timestampFunction = function; - } else { - valueFunctions.add(function); - } - } - } - - // validate timestamp - if (writerTimestampIndex > -1 && timestampFunction == null) { - throw SqlException.$(0, "insert statement must populate timestamp"); - } + private ExecutionModel lightlyValidateInsertModel(InsertModel model) throws SqlException { + ExpressionNode tableName = model.getTableName(); + if (tableName.type != ExpressionNode.LITERAL) { + throw SqlException.$(tableName.position, "literal expected"); + } - VirtualRecord record = new VirtualRecord(valueFunctions); - RecordToRowCopier copier = assembleRecordToRowCopier(asm, record, metadata, columnFilter); - return compiledQuery.ofInsert(new InsertStatementImpl(record, copier, timestampFunction, structureVersion)); + if (model.getColumnSet().size() > 0 && model.getColumnSet().size() != model.getColumnValues().size()) { + throw SqlException.$(model.getColumnPosition(0), "value count does not match column count"); } + + return model; } private boolean removeTableDirectory(CreateTableModel model) { @@ -1603,4 +1491,28 @@ public class SqlCompiler implements Closeable { return this; } } + + static { + castGroups.extendAndSet(ColumnType.BOOLEAN, 2); + castGroups.extendAndSet(ColumnType.BYTE, 1); + castGroups.extendAndSet(ColumnType.SHORT, 1); + castGroups.extendAndSet(ColumnType.CHAR, 1); + castGroups.extendAndSet(ColumnType.INT, 1); + castGroups.extendAndSet(ColumnType.LONG, 1); + castGroups.extendAndSet(ColumnType.FLOAT, 1); + castGroups.extendAndSet(ColumnType.DOUBLE, 1); + castGroups.extendAndSet(ColumnType.DATE, 1); + castGroups.extendAndSet(ColumnType.TIMESTAMP, 1); + castGroups.extendAndSet(ColumnType.STRING, 3); + castGroups.extendAndSet(ColumnType.SYMBOL, 3); + castGroups.extendAndSet(ColumnType.BINARY, 4); + + sqlControlSymbols.add("("); + sqlControlSymbols.add(";"); + sqlControlSymbols.add(")"); + sqlControlSymbols.add(","); + sqlControlSymbols.add("/*"); + sqlControlSymbols.add("*/"); + sqlControlSymbols.add("--"); + } } diff --git a/core/src/main/java/io/questdb/griffin/SqlParser.java b/core/src/main/java/io/questdb/griffin/SqlParser.java index 5c829a7b4..c768d75bf 100644 --- a/core/src/main/java/io/questdb/griffin/SqlParser.java +++ b/core/src/main/java/io/questdb/griffin/SqlParser.java @@ -85,6 +85,7 @@ public final class SqlParser { private final ObjectPool renameTableModelPool; private final ObjectPool withClauseModelPool; private final ObjectPool insertModelPool; + private final ObjectPool copyModelPool; private final ExpressionParser expressionParser; private final CairoConfiguration configuration; private final PostOrderTreeTraversalAlgo traversalAlgo; @@ -111,6 +112,7 @@ public final class SqlParser { this.renameTableModelPool = new ObjectPool<>(RenameTableModel.FACTORY, configuration.getRenameTableModelPoolCapacity()); this.withClauseModelPool = new ObjectPool<>(WithClauseModel.FACTORY, configuration.getWithClauseModelPoolCapacity()); this.insertModelPool = new ObjectPool<>(InsertModel.FACTORY, configuration.getInsertPoolCapacity()); + this.copyModelPool = new ObjectPool<>(CopyModel.FACTORY, configuration.getCopyPoolCapacity()); this.configuration = configuration; this.traversalAlgo = traversalAlgo; this.characterStore = characterStore; @@ -139,6 +141,7 @@ public final class SqlParser { characterStore.clear(); insertModelPool.clear(); expressionTreeBuilder.reset(); + copyModelPool.clear(); } private CharSequence createColumnAlias(ExpressionNode node, QueryModel model) { @@ -292,9 +295,26 @@ public final class SqlParser { return parseInsert(lexer); } + if (Chars.equalsLowerCaseAscii(tok, "copy")) { + return parseCopy(lexer); + } + return parseSelect(lexer); } + private ExecutionModel parseCopy(GenericLexer lexer) throws SqlException { + ExpressionNode tableName = expectExpr(lexer); + CharSequence tok = tok(lexer, "'from' or 'to'"); + + if (Chars.equalsLowerCaseAscii(tok, "from")) { + CopyModel model = copyModelPool.next(); + model.setTableName(tableName); + model.setFileName(expectExpr(lexer)); + return model; + } + return null; + } + private ExecutionModel parseCreateStatement(GenericLexer lexer, SqlExecutionContext executionContext) throws SqlException { expectTok(lexer, "table"); return parseCreateTable(lexer, executionContext); diff --git a/core/src/main/java/io/questdb/griffin/model/CopyModel.java b/core/src/main/java/io/questdb/griffin/model/CopyModel.java new file mode 100644 index 000000000..f31a03c1a --- /dev/null +++ b/core/src/main/java/io/questdb/griffin/model/CopyModel.java @@ -0,0 +1,65 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (C) 2014-2019 Appsicle + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + ******************************************************************************/ + +package io.questdb.griffin.model; + +import io.questdb.std.Mutable; +import io.questdb.std.ObjectFactory; +import io.questdb.std.Sinkable; +import io.questdb.std.str.CharSink; + +public class CopyModel implements ExecutionModel, Mutable, Sinkable { + public static final ObjectFactory FACTORY = CopyModel::new; + private ExpressionNode tableName; + private ExpressionNode fileName; + + @Override + public void clear() { + } + + public ExpressionNode getFileName() { + return fileName; + } + + public void setFileName(ExpressionNode fileName) { + this.fileName = fileName; + } + + @Override + public int getModelType() { + return ExecutionModel.COPY; + } + + public ExpressionNode getTableName() { + return tableName; + } + + public void setTableName(ExpressionNode tableName) { + this.tableName = tableName; + } + + @Override + public void toSink(CharSink sink) { + + } +} diff --git a/core/src/main/java/io/questdb/griffin/model/ExecutionModel.java b/core/src/main/java/io/questdb/griffin/model/ExecutionModel.java index 4a483c127..9b7f629e7 100644 --- a/core/src/main/java/io/questdb/griffin/model/ExecutionModel.java +++ b/core/src/main/java/io/questdb/griffin/model/ExecutionModel.java @@ -27,7 +27,8 @@ public interface ExecutionModel { int QUERY = 1; int CREATE_TABLE = 2; int RENAME_TABLE = 3; - int INSERT_AS_SELECT = 4; + int INSERT = 4; + int COPY = 5; int getModelType(); } diff --git a/core/src/main/java/io/questdb/griffin/model/InsertModel.java b/core/src/main/java/io/questdb/griffin/model/InsertModel.java index ea8c04584..c020a906f 100644 --- a/core/src/main/java/io/questdb/griffin/model/InsertModel.java +++ b/core/src/main/java/io/questdb/griffin/model/InsertModel.java @@ -82,7 +82,7 @@ public class InsertModel implements ExecutionModel, Mutable, Sinkable { @Override public int getModelType() { - return INSERT_AS_SELECT; + return INSERT; } public QueryModel getQueryModel() { diff --git a/core/src/test/java/io/questdb/griffin/SqlParserTest.java b/core/src/test/java/io/questdb/griffin/SqlParserTest.java index 3fd3c915b..7bde5ca9c 100644 --- a/core/src/test/java/io/questdb/griffin/SqlParserTest.java +++ b/core/src/test/java/io/questdb/griffin/SqlParserTest.java @@ -728,25 +728,33 @@ public class SqlParserTest extends AbstractGriffinTest { } @Test - public void testInsertValues() throws SqlException { - assertModel("insert into x values (3, 'abc', ?)", - "insert into x values (3, 'abc', ?)", - ExecutionModel.INSERT_AS_SELECT, + public void testInsertAsSelect() throws SqlException { + assertModel( + "insert into x select-choose c, d from (y)", + "insert into x select * from y", + ExecutionModel.INSERT, modelOf("x") .col("a", ColumnType.INT) - .col("b", ColumnType.STRING) - .col("c", ColumnType.STRING)); + .col("b", ColumnType.STRING), + modelOf("y") + .col("c", ColumnType.INT) + .col("d", ColumnType.STRING) + ); } @Test - public void testInsertColumnsAndValues() throws SqlException { - assertModel("insert into x (a, b) values (3, ?)", - "insert into x (a,b) values (3, ?)", - ExecutionModel.INSERT_AS_SELECT, + public void testInsertAsSelectColumnList() throws SqlException { + assertModel( + "insert into x (a, b) select-choose c, d from (y)", + "insert into x (a,b) select * from y", + ExecutionModel.INSERT, modelOf("x") .col("a", ColumnType.INT) - .col("b", ColumnType.STRING) - .col("c", ColumnType.STRING)); + .col("b", ColumnType.STRING), + modelOf("y") + .col("c", ColumnType.INT) + .col("d", ColumnType.STRING) + ); } @Test @@ -2343,18 +2351,14 @@ public class SqlParserTest extends AbstractGriffinTest { } @Test - public void testInsertAsSelect() throws SqlException { - assertModel( - "insert into x select-choose c, d from (y)", - "insert into x select * from y", - ExecutionModel.INSERT_AS_SELECT, + public void testInsertColumnsAndValues() throws SqlException { + assertModel("insert into x (a, b) values (3, ?)", + "insert into x (a,b) values (3, ?)", + ExecutionModel.INSERT, modelOf("x") .col("a", ColumnType.INT) - .col("b", ColumnType.STRING), - modelOf("y") - .col("c", ColumnType.INT) - .col("d", ColumnType.STRING) - ); + .col("b", ColumnType.STRING) + .col("c", ColumnType.STRING)); } @Test @@ -2370,18 +2374,14 @@ public class SqlParserTest extends AbstractGriffinTest { } @Test - public void testInsertAsSelectColumnList() throws SqlException { - assertModel( - "insert into x (a, b) select-choose c, d from (y)", - "insert into x (a,b) select * from y", - ExecutionModel.INSERT_AS_SELECT, + public void testInsertValues() throws SqlException { + assertModel("insert into x values (3, 'abc', ?)", + "insert into x values (3, 'abc', ?)", + ExecutionModel.INSERT, modelOf("x") .col("a", ColumnType.INT) - .col("b", ColumnType.STRING), - modelOf("y") - .col("c", ColumnType.INT) - .col("d", ColumnType.STRING) - ); + .col("b", ColumnType.STRING) + .col("c", ColumnType.STRING)); } @Test diff --git a/core/src/test/resources/server.conf b/core/src/test/resources/server.conf index af340d043..35d11cd95 100644 --- a/core/src/test/resources/server.conf +++ b/core/src/test/resources/server.conf @@ -78,7 +78,7 @@ cairo.sql.create.table.model.pool.capacity=64 cairo.sql.column.cast.model.pool.capacity=256 cairo.sql.rename.table.model.pool.capacity=512 cairo.sql.with.clause.model.pool.capacity=1024 -cairo.sql.insert.as.select.model.pool.capacity=128 +cairo.sql.insert.model.pool.capacity=128 line.udp.bind.to=10.2.1.33:9915 line.udp.commit.rate=100000 -- GitLab