提交 bac02ddf 编写于 作者: V Vlad Ilyushchenko

GRIFFIN: "COPY" command will be implemented via a model

上级 7dd14b7c
......@@ -76,6 +76,7 @@ public class PropServerConfiguration implements ServerConfiguration {
private final int sqlCharacterStoreCapacity;
private final int sqlCharacterStoreSequencePoolCapacity;
private final int sqlColumnPoolCapacity;
private final int sqlCopyModelPoolCapacity;
private final double sqlCompactMapLoadFactor;
private final int sqlExpressionPoolCapacity;
private final double sqlFastMapLoadFactor;
......@@ -159,7 +160,7 @@ public class PropServerConfiguration implements ServerConfiguration {
private int sqlColumnCastModelPoolCapacity;
private int sqlRenameTableModelPoolCapacity;
private int sqlWithClauseModelPoolCapacity;
private int sqlInsertAsSelectModelPoolCapacity;
private int sqlInsertModelPoolCapacity;
public PropServerConfiguration(String root, Properties properties) throws ServerConfigurationException {
this.sharedWorkerCount = getInt(properties, "shared.worker.count", 2);
......@@ -286,7 +287,9 @@ public class PropServerConfiguration implements ServerConfiguration {
this.sqlColumnCastModelPoolCapacity = getInt(properties, "cairo.sql.column.cast.model.pool.capacity", 16);
this.sqlRenameTableModelPoolCapacity = getInt(properties, "cairo.sql.rename.table.model.pool.capacity", 16);
this.sqlWithClauseModelPoolCapacity = getInt(properties, "cairo.sql.with.clause.model.pool.capacity", 128);
this.sqlInsertAsSelectModelPoolCapacity = getInt(properties, "cairo.sql.insert.as.select.model.pool.capacity", 64);
this.sqlInsertModelPoolCapacity = getInt(properties, "cairo.sql.insert.model.pool.capacity", 64);
this.sqlCopyModelPoolCapacity = getInt(properties, "cairo.copy.model.pool.capacity", 32);
parseBindTo(properties, "line.udp.bind.to", "0.0.0.0:9009", (a, p) -> {
this.lineUdpBindIPV4Address = a;
......@@ -310,11 +313,6 @@ public class PropServerConfiguration implements ServerConfiguration {
return httpServerConfiguration;
}
@Override
public PGWireConfiguration getPGWireConfiguration() {
return pgWireConfiguration;
}
@Override
public LineUdpReceiverConfiguration getLineUdpReceiverConfiguration() {
return lineUdpReceiverConfiguration;
......@@ -325,6 +323,11 @@ public class PropServerConfiguration implements ServerConfiguration {
return workerPoolConfiguration;
}
@Override
public PGWireConfiguration getPGWireConfiguration() {
return pgWireConfiguration;
}
private int[] getAffinity(Properties properties, String key, int httpWorkerCount) throws ServerConfigurationException {
final int[] result = new int[httpWorkerCount];
String value = properties.getProperty(key);
......@@ -508,6 +511,16 @@ public class PropServerConfiguration implements ServerConfiguration {
return MillisecondClockImpl.INSTANCE;
}
@Override
public String getDispatcherLogName() {
return "http-server";
}
@Override
public EpollFacade getEpollFacade() {
return EpollFacadeImpl.INSTANCE;
}
@Override
public int getEventCapacity() {
return eventCapacity;
......@@ -523,6 +536,11 @@ public class PropServerConfiguration implements ServerConfiguration {
return idleConnectionTimeout;
}
@Override
public int getInitialBias() {
return IOOperation.READ;
}
@Override
public int getInterestQueueCapacity() {
return interestQueueCapacity;
......@@ -539,8 +557,8 @@ public class PropServerConfiguration implements ServerConfiguration {
}
@Override
public EpollFacade getEpollFacade() {
return EpollFacadeImpl.INSTANCE;
public int getRcvBufSize() {
return rcvBufSize;
}
@Override
......@@ -548,25 +566,10 @@ public class PropServerConfiguration implements ServerConfiguration {
return SelectFacadeImpl.INSTANCE;
}
@Override
public int getInitialBias() {
return IOOperation.READ;
}
@Override
public int getSndBufSize() {
return sndBufSize;
}
@Override
public int getRcvBufSize() {
return rcvBufSize;
}
@Override
public String getDispatcherLogName() {
return "http-server";
}
}
private class PropTextConfiguration implements TextConfiguration {
......@@ -633,11 +636,6 @@ public class PropServerConfiguration implements ServerConfiguration {
private class PropHttpServerConfiguration implements HttpServerConfiguration {
@Override
public boolean workerHaltOnError() {
return httpWorkerHaltOnError;
}
@Override
public int getConnectionPoolInitialCapacity() {
return connectionPoolInitialCapacity;
......@@ -648,11 +646,6 @@ public class PropServerConfiguration implements ServerConfiguration {
return connectionStringPoolCapacity;
}
@Override
public boolean allowDeflateBeforeSend() {
return httpAllowDeflateBeforeSend;
}
@Override
public int getMultipartHeaderBufferSize() {
return multipartHeaderBufferSize;
......@@ -708,6 +701,11 @@ public class PropServerConfiguration implements ServerConfiguration {
return httpWorkerCount;
}
@Override
public boolean workerHaltOnError() {
return httpWorkerHaltOnError;
}
@Override
public int[] getWorkerAffinity() {
return httpWorkerAffinity;
......@@ -727,9 +725,19 @@ public class PropServerConfiguration implements ServerConfiguration {
public boolean getDumpNetworkTraffic() {
return false;
}
@Override
public boolean allowDeflateBeforeSend() {
return httpAllowDeflateBeforeSend;
}
}
private class PropCairoConfiguration implements CairoConfiguration {
@Override
public int getCopyPoolCapacity() {
return sqlCopyModelPoolCapacity;
}
@Override
public int getCreateAsSelectRetryCount() {
return createAsSelectRetryCount;
......@@ -957,7 +965,7 @@ public class PropServerConfiguration implements ServerConfiguration {
@Override
public int getInsertPoolCapacity() {
return sqlInsertAsSelectModelPoolCapacity;
return sqlInsertModelPoolCapacity;
}
}
......@@ -1024,6 +1032,11 @@ public class PropServerConfiguration implements ServerConfiguration {
return jsonQueryConnectionCheckFrequency;
}
@Override
public TextConfiguration getTextConfiguration() {
return textConfiguration;
}
@Override
public MillisecondClock getClock() {
return httpFrozenClock ? StationaryMillisClock.INSTANCE : MillisecondClockImpl.INSTANCE;
......
......@@ -29,6 +29,8 @@ import io.questdb.std.time.MillisecondClock;
public interface CairoConfiguration {
int getCopyPoolCapacity();
int getCreateAsSelectRetryCount();
CharSequence getDefaultMapType();
......
......@@ -271,4 +271,9 @@ public class DefaultCairoConfiguration implements CairoConfiguration {
public int getInsertPoolCapacity() {
return 8;
}
@Override
public int getCopyPoolCapacity() {
return 16;
}
}
......@@ -27,6 +27,7 @@ import io.questdb.cutlass.http.processors.DefaultTextImportProcessorConfiguratio
import io.questdb.cutlass.http.processors.JsonQueryProcessorConfiguration;
import io.questdb.cutlass.http.processors.StaticContentProcessorConfiguration;
import io.questdb.cutlass.http.processors.TextImportProcessorConfiguration;
import io.questdb.cutlass.text.TextConfiguration;
import io.questdb.network.DefaultIODispatcherConfiguration;
import io.questdb.network.IODispatcherConfiguration;
import io.questdb.std.FilesFacade;
......@@ -90,6 +91,11 @@ class DefaultHttpServerConfiguration implements HttpServerConfiguration {
public MillisecondClock getClock() {
return DefaultHttpServerConfiguration.this.getClock();
}
@Override
public TextConfiguration getTextConfiguration() {
return textImportProcessorConfiguration.getTextConfiguration();
}
};
private final TextImportProcessorConfiguration textImportProcessorConfiguration = new DefaultTextImportProcessorConfiguration();
......
......@@ -210,14 +210,6 @@ public class HttpServer implements Closeable {
}
Misc.free(httpContextFactory);
Misc.free(dispatcher);
for (int i = 0; i < workerCount; i++) {
HttpRequestProcessorSelectorImpl selector = selectors.getQuick(i);
Misc.free(selector.defaultRequestProcessor);
final ObjList<CharSequence> urls = selector.processorMap.keys();
for (int j = 0, m = urls.size(); j < m; j++) {
Misc.free(selector.processorMap.get(urls.getQuick(j)));
}
}
}
private static class HttpRequestProcessorSelectorImpl implements HttpRequestProcessorSelector {
......
......@@ -23,17 +23,20 @@
package io.questdb.cutlass.http.processors;
import io.questdb.cutlass.text.TextConfiguration;
import io.questdb.std.time.MillisecondClock;
public interface JsonQueryProcessorConfiguration {
CharSequence getKeepAliveHeader();
MillisecondClock getClock();
int getFloatScale();
int getConnectionCheckFrequency();
int getDoubleScale();
int getConnectionCheckFrequency();
int getFloatScale();
MillisecondClock getClock();
CharSequence getKeepAliveHeader();
TextConfiguration getTextConfiguration();
}
......@@ -60,18 +60,11 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
// processor. For different threads to lookup the same value from local value map the key,
// which is LV, has to be the same between processor instances
private static final LocalValue<TextImportProcessorState> LV = new LocalValue<>();
static {
atomicityParamMap.put("relaxed", Atomicity.SKIP_ROW);
atomicityParamMap.put("strict", Atomicity.SKIP_ALL);
}
private final TextImportProcessorConfiguration configuration;
private final CairoEngine engine;
private HttpConnectionContext transientContext;
private IODispatcher<HttpConnectionContext> transientDispatcher;
private TextImportProcessorState transientState;
public TextImportProcessor(
TextImportProcessorConfiguration configuration,
CairoEngine cairoEngine
......@@ -80,6 +73,121 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
this.engine = cairoEngine;
}
@Override
public void close() {
}
@Override
public void onChunk(HttpRequestHeader partHeader, long lo, long hi) {
if (hi > lo) {
try {
transientState.textLoader.parse(lo, hi, transientContext.getCairoSecurityContext());
if (transientState.messagePart == MESSAGE_DATA && !transientState.analysed) {
transientState.analysed = true;
transientState.textLoader.setState(TextLoader.LOAD_DATA);
}
} catch (JsonException e) {
// todo: reply something sensible
e.printStackTrace();
}
}
}
@Override
public void onPartBegin(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException {
LOG.debug().$("part begin [name=").$(partHeader.getContentDispositionName()).$(']').$();
if (Chars.equals("data", partHeader.getContentDispositionName())) {
final HttpRequestHeader rh = transientContext.getRequestHeader();
CharSequence name = rh.getUrlParam("name");
if (name == null) {
name = partHeader.getContentDispositionFilename();
}
if (name == null) {
transientContext.simpleResponse().sendStatus(400, "no name given");
// we have to disconnect to interrupt potentially large upload
transientDispatcher.disconnect(transientContext);
return;
}
transientState.analysed = false;
transientState.textLoader.configureDestination(
name,
Chars.equalsNc("true", rh.getUrlParam("overwrite")),
Chars.equalsNc("true", rh.getUrlParam("durable")),
// todo: these values are incorrect, but ok for now
getAtomicity(rh.getUrlParam("atomicity"))
);
transientState.textLoader.setForceHeaders(Chars.equalsNc("true", rh.getUrlParam("forceHeader")));
transientState.textLoader.setState(TextLoader.ANALYZE_STRUCTURE);
transientState.forceHeader = Chars.equalsNc("true", rh.getUrlParam("forceHeader"));
transientState.messagePart = MESSAGE_DATA;
} else if (Chars.equals("schema", partHeader.getContentDispositionName())) {
transientState.textLoader.setState(TextLoader.LOAD_JSON_METADATA);
transientState.messagePart = MESSAGE_SCHEMA;
} else {
// todo: disconnect
transientState.messagePart = MESSAGE_UNKNOWN;
}
}
// This processor implements HttpMultipartContentListener, methods of which
// have neither context nor dispatcher. During "chunk" processing we may need
// to send something back to client, or disconnect them. To do that we need
// these transient references. resumeRecv() will set them and they will remain
// valid during multipart events.
@Override
public void onPartEnd(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException {
try {
LOG.debug().$("part end").$();
transientState.textLoader.wrapUp();
if (transientState.messagePart == MESSAGE_DATA) {
sendResponse(transientContext);
}
} catch (JsonException e) {
handleJsonException(e);
}
}
@Override
public void onHeadersReady(HttpConnectionContext context) {
}
@Override
public void onRequestComplete(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) {
transientState.clear();
context.clear();
dispatcher.registerChannel(context, IOOperation.READ);
}
@Override
public void resumeRecv(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) {
this.transientContext = context;
this.transientDispatcher = dispatcher;
this.transientState = LV.get(context);
if (this.transientState == null) {
try {
LOG.debug().$("new text state").$();
LV.set(context, this.transientState = new TextImportProcessorState(configuration.getTextConfiguration(), engine));
} catch (JsonException e) {
// todo: handle gracefully
e.printStackTrace();
}
}
}
@Override
public void resumeSend(
HttpConnectionContext context,
IODispatcher<HttpConnectionContext> dispatcher
) throws PeerDisconnectedException, PeerIsSlowToReadException {
doResumeSend(LV.get(context), context.getChunkedResponseSocket());
}
private static void resumeJson(TextImportProcessorState state, HttpChunkedResponseSocket socket) throws PeerDisconnectedException, PeerIsSlowToReadException {
final TextLoader textLoader = state.textLoader;
final RecordMetadata m = textLoader.getMetadata();
......@@ -111,16 +219,6 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
putQuoted("type").put(':').putQuoted(ColumnType.nameOf(m.getColumnType(state.columnIndex))).put(',').
putQuoted("size").put(':').put(ColumnType.sizeOf(m.getColumnType(state.columnIndex))).put(',').
putQuoted("errors").put(':').put(errors.getQuick(state.columnIndex));
// todo: resolve these attributes
// if (im.pattern != null) {
// r.put(',').putQuoted("pattern").put(':').putQuoted(im.pattern);
// }
//
// if (im.dateLocale != null) {
// r.put(',').putQuoted("locale").put(':').putQuoted(im.dateLocale.getId());
// }
socket.put('}');
}
state.responseState = RESPONSE_SUFFIX;
......@@ -153,12 +251,6 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
return b;
}
// This processor implements HttpMultipartContentListener, methods of which
// have neither context nor dispatcher. During "chunk" processing we may need
// to send something back to client, or disconnect them. To do that we need
// these transient references. resumeRecv() will set them and they will remain
// valid during multipart events.
private static void pad(CharSink b, int w, long value) {
int len = (int) Math.log10(value);
if (len < 0) {
......@@ -258,115 +350,6 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
return atomicity == -1 ? Atomicity.SKIP_COL : atomicity;
}
@Override
public void close() {
}
@Override
public void onChunk(HttpRequestHeader partHeader, long lo, long hi) {
if (hi > lo) {
try {
transientState.textLoader.parse(lo, hi, transientContext.getCairoSecurityContext());
if (transientState.messagePart == MESSAGE_DATA && !transientState.analysed) {
transientState.analysed = true;
transientState.textLoader.setState(TextLoader.LOAD_DATA);
}
} catch (JsonException e) {
// todo: reply something sensible
e.printStackTrace();
}
}
}
@Override
public void onPartBegin(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException {
LOG.debug().$("part begin [name=").$(partHeader.getContentDispositionName()).$(']').$();
if (Chars.equals("data", partHeader.getContentDispositionName())) {
final HttpRequestHeader rh = transientContext.getRequestHeader();
CharSequence name = rh.getUrlParam("name");
if (name == null) {
name = partHeader.getContentDispositionFilename();
}
if (name == null) {
transientContext.simpleResponse().sendStatus(400, "no name given");
// we have to disconnect to interrupt potentially large upload
transientDispatcher.disconnect(transientContext);
return;
}
transientState.analysed = false;
transientState.textLoader.configureDestination(
name,
Chars.equalsNc("true", rh.getUrlParam("overwrite")),
Chars.equalsNc("true", rh.getUrlParam("durable")),
// todo: these values are incorrect, but ok for now
getAtomicity(rh.getUrlParam("atomicity"))
);
transientState.textLoader.setForceHeaders(Chars.equalsNc("true", rh.getUrlParam("forceHeader")));
transientState.textLoader.setState(TextLoader.ANALYZE_STRUCTURE);
transientState.forceHeader = Chars.equalsNc("true", rh.getUrlParam("forceHeader"));
transientState.messagePart = MESSAGE_DATA;
} else if (Chars.equals("schema", partHeader.getContentDispositionName())) {
transientState.textLoader.setState(TextLoader.LOAD_JSON_METADATA);
transientState.messagePart = MESSAGE_SCHEMA;
} else {
// todo: disconnect
transientState.messagePart = MESSAGE_UNKNOWN;
}
}
@Override
public void onPartEnd(HttpRequestHeader partHeader) throws PeerDisconnectedException, PeerIsSlowToReadException {
try {
LOG.debug().$("part end").$();
transientState.textLoader.wrapUp();
if (transientState.messagePart == MESSAGE_DATA) {
sendResponse(transientContext);
}
} catch (JsonException e) {
handleJsonException(e);
}
}
@Override
public void onHeadersReady(HttpConnectionContext context) {
}
@Override
public void onRequestComplete(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) {
transientState.clear();
context.clear();
dispatcher.registerChannel(context, IOOperation.READ);
}
@Override
public void resumeRecv(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher) {
this.transientContext = context;
this.transientDispatcher = dispatcher;
this.transientState = LV.get(context);
if (this.transientState == null) {
try {
LOG.debug().$("new text state").$();
LV.set(context, this.transientState = new TextImportProcessorState(configuration.getTextConfiguration(), engine));
} catch (JsonException e) {
// todo: handle gracefully
e.printStackTrace();
}
}
}
@Override
public void resumeSend(
HttpConnectionContext context,
IODispatcher<HttpConnectionContext> dispatcher
) throws PeerDisconnectedException, PeerIsSlowToReadException {
doResumeSend(LV.get(context), context.getChunkedResponseSocket());
}
private void doResumeSend(
TextImportProcessorState state,
HttpChunkedResponseSocket socket
......@@ -439,4 +422,9 @@ public class TextImportProcessor implements HttpRequestProcessor, HttpMultipartC
sendError(context, state.stateMessage, state.json);
}
}
static {
atomicityParamMap.put("relaxed", Atomicity.SKIP_ROW);
atomicityParamMap.put("strict", Atomicity.SKIP_ALL);
}
}
......@@ -61,6 +61,9 @@ public class TextLoader implements Closeable, Mutable {
private boolean forceHeaders = false;
private byte columnDelimiter = -1;
/**
* @throws JsonException when default configuration cannot be loaded from classpath
*/
public TextLoader(
TextConfiguration textConfiguration,
CairoEngine engine,
......
......@@ -25,6 +25,7 @@ package io.questdb.griffin;
import io.questdb.cairo.sql.InsertStatement;
import io.questdb.cairo.sql.RecordCursorFactory;
import io.questdb.griffin.model.CopyModel;
public interface CompiledQuery {
int SELECT = 0;
......@@ -42,5 +43,7 @@ public interface CompiledQuery {
InsertStatement getInsertStatement();
CopyModel getCopyModel();
int getType();
}
......@@ -25,44 +25,47 @@ package io.questdb.griffin;
import io.questdb.cairo.sql.InsertStatement;
import io.questdb.cairo.sql.RecordCursorFactory;
import io.questdb.griffin.model.CopyModel;
public class CompiledQueryImpl implements CompiledQuery {
private RecordCursorFactory recordCursorFactory;
private InsertStatement insertStatement;
private CopyModel copyModel;
private int type;
CompiledQuery of(RecordCursorFactory recordCursorFactory) {
this.type = SELECT;
this.recordCursorFactory = recordCursorFactory;
return this;
@Override
public RecordCursorFactory getRecordCursorFactory() {
return recordCursorFactory;
}
CompiledQuery ofTruncate() {
this.type = TRUNCATE;
return this;
@Override
public InsertStatement getInsertStatement() {
return insertStatement;
}
CompiledQuery ofAlter() {
this.type = ALTER;
return this;
@Override
public CopyModel getCopyModel() {
return copyModel;
}
CompiledQuery ofRepair() {
this.type = REPAIR;
return this;
@Override
public int getType() {
return type;
}
CompiledQuery ofSet() {
this.type = SET;
CompiledQuery of(RecordCursorFactory recordCursorFactory) {
this.type = SELECT;
this.recordCursorFactory = recordCursorFactory;
return this;
}
CompiledQuery ofDrop() {
this.type = DROP;
CompiledQuery ofAlter() {
this.type = ALTER;
return this;
}
CompiledQuery ofCopy() {
CompiledQuery ofCopy(CopyModel copyModel) {
this.copyModel = copyModel;
this.type = COPY;
return this;
}
......@@ -72,8 +75,8 @@ public class CompiledQueryImpl implements CompiledQuery {
return this;
}
CompiledQuery ofInsertAsSelect() {
this.type = INSERT_AS_SELECT;
CompiledQuery ofDrop() {
this.type = DROP;
return this;
}
......@@ -83,18 +86,23 @@ public class CompiledQueryImpl implements CompiledQuery {
return this;
}
@Override
public RecordCursorFactory getRecordCursorFactory() {
return recordCursorFactory;
CompiledQuery ofInsertAsSelect() {
this.type = INSERT_AS_SELECT;
return this;
}
@Override
public InsertStatement getInsertStatement() {
return insertStatement;
CompiledQuery ofRepair() {
this.type = REPAIR;
return this;
}
@Override
public int getType() {
return type;
CompiledQuery ofSet() {
this.type = SET;
return this;
}
CompiledQuery ofTruncate() {
this.type = TRUNCATE;
return this;
}
}
......@@ -24,20 +24,12 @@
package io.questdb.griffin;
import io.questdb.cairo.*;
import io.questdb.cairo.security.AllowAllCairoSecurityContext;
import io.questdb.cairo.sql.*;
import io.questdb.cutlass.json.JsonException;
import io.questdb.cutlass.text.Atomicity;
import io.questdb.cutlass.text.DefaultTextConfiguration;
import io.questdb.cutlass.text.TextConfiguration;
import io.questdb.cutlass.text.TextLoader;
import io.questdb.griffin.model.*;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.std.*;
import io.questdb.std.str.Path;
import io.questdb.std.time.DateFormatFactory;
import io.questdb.std.time.DateLocaleFactory;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
......@@ -49,31 +41,6 @@ public class SqlCompiler implements Closeable {
public static final ObjList<String> sqlControlSymbols = new ObjList<>(8);
private final static Log LOG = LogFactory.getLog(SqlCompiler.class);
private static final IntList castGroups = new IntList();
static {
castGroups.extendAndSet(ColumnType.BOOLEAN, 2);
castGroups.extendAndSet(ColumnType.BYTE, 1);
castGroups.extendAndSet(ColumnType.SHORT, 1);
castGroups.extendAndSet(ColumnType.CHAR, 1);
castGroups.extendAndSet(ColumnType.INT, 1);
castGroups.extendAndSet(ColumnType.LONG, 1);
castGroups.extendAndSet(ColumnType.FLOAT, 1);
castGroups.extendAndSet(ColumnType.DOUBLE, 1);
castGroups.extendAndSet(ColumnType.DATE, 1);
castGroups.extendAndSet(ColumnType.TIMESTAMP, 1);
castGroups.extendAndSet(ColumnType.STRING, 3);
castGroups.extendAndSet(ColumnType.SYMBOL, 3);
castGroups.extendAndSet(ColumnType.BINARY, 4);
sqlControlSymbols.add("(");
sqlControlSymbols.add(";");
sqlControlSymbols.add(")");
sqlControlSymbols.add(",");
sqlControlSymbols.add("/*");
sqlControlSymbols.add("*/");
sqlControlSymbols.add("--");
}
private final SqlOptimiser optimiser;
private final SqlParser parser;
private final ObjectPool<ExpressionNode> sqlNodePool;
......@@ -131,8 +98,6 @@ public class SqlCompiler implements Closeable {
keywordBasedExecutors.put("SET", this::compileSet);
keywordBasedExecutors.put("drop", this::dropTable);
keywordBasedExecutors.put("DROP", this::dropTable);
keywordBasedExecutors.put("copy", this::copyTable);
keywordBasedExecutors.put("COPY", this::copyTable);
configureLexer(lexer);
......@@ -172,6 +137,30 @@ public class SqlCompiler implements Closeable {
}
}
@Override
public void close() {
Misc.free(path);
}
public CompiledQuery compile(CharSequence query) throws SqlException {
return compile(query, DefaultSqlExecutionContext.INSTANCE);
}
public CompiledQuery compile(@NotNull CharSequence query, @NotNull SqlExecutionContext executionContext) throws SqlException {
clear();
//
// these are quick executions that do not require building of a model
//
lexer.of(query);
final CharSequence tok = SqlUtil.fetchNext(lexer);
final KeywordBasedExecutor executor = keywordBasedExecutors.get(tok);
if (executor == null) {
return compileUsingModel(executionContext);
}
return executor.execute(executionContext);
}
// Creates data type converter.
// INT and LONG NaN values are cast to their representation rather than Double or Float NaN.
private static RecordToRowCopier assembleRecordToRowCopier(BytecodeAssembler asm, ColumnTypes from, RecordMetadata to, ColumnFilter toColumnFilter) {
......@@ -633,30 +622,6 @@ public class SqlCompiler implements Closeable {
return tok;
}
@Override
public void close() {
Misc.free(path);
}
public CompiledQuery compile(CharSequence query) throws SqlException {
return compile(query, DefaultSqlExecutionContext.INSTANCE);
}
public CompiledQuery compile(@NotNull CharSequence query, @NotNull SqlExecutionContext executionContext) throws SqlException {
clear();
//
// these are quick executions that do not require building of a model
//
lexer.of(query);
final CharSequence tok = SqlUtil.fetchNext(lexer);
final KeywordBasedExecutor executor = keywordBasedExecutors.get(tok);
if (executor == null) {
return compileUsingModel(executionContext);
}
return executor.execute(executionContext);
}
private CompiledQuery alterTable(SqlExecutionContext executionContext) throws SqlException {
CharSequence tok;
expectKeyword(lexer, "table");
......@@ -851,7 +816,7 @@ public class SqlCompiler implements Closeable {
switch (model.getModelType()) {
case ExecutionModel.QUERY:
return optimiser.optimise((QueryModel) model, executionContext);
case ExecutionModel.INSERT_AS_SELECT:
case ExecutionModel.INSERT:
InsertModel insertModel = (InsertModel) model;
if (insertModel.getQueryModel() != null) {
return validateAndOptimiseInsertAsSelect(insertModel, executionContext);
......@@ -863,19 +828,6 @@ public class SqlCompiler implements Closeable {
}
}
private ExecutionModel lightlyValidateInsertModel(InsertModel model) throws SqlException {
ExpressionNode tableName = model.getTableName();
if (tableName.type != ExpressionNode.LITERAL) {
throw SqlException.$(tableName.position, "literal expected");
}
if (model.getColumnSet().size() > 0 && model.getColumnSet().size() != model.getColumnValues().size()) {
throw SqlException.$(model.getColumnPosition(0), "value count does not match column count");
}
return model;
}
private CompiledQuery compileSet(SqlExecutionContext executionContext) {
return compiledQuery.ofSet();
}
......@@ -898,6 +850,8 @@ public class SqlCompiler implements Closeable {
return compiledQuery.of(generate((QueryModel) executionModel, executionContext));
case ExecutionModel.CREATE_TABLE:
return createTableWithRetries(executionModel, executionContext);
case ExecutionModel.COPY:
return compiledQuery.ofCopy((CopyModel) executionModel);
default:
InsertModel insertModel = (InsertModel) executionModel;
if (insertModel.getQueryModel() != null) {
......@@ -1057,85 +1011,6 @@ public class SqlCompiler implements Closeable {
return compiledQuery.ofDrop();
}
private CompiledQuery copyTable(SqlExecutionContext executionContext) throws SqlException {
CharSequence tok;
final int tableNamePosition = lexer.getPosition();
tok = expectToken(lexer, "table name");
LOG.info().$("copying").$();
final TextConfiguration textConfiguration = new DefaultTextConfiguration() {
@Override
public int getRollBufferSize() {
return 4 * 1024 * 1024;
}
@Override
public int getRollBufferLimit() {
return 8 * 1024 * 1024;
}
};
try {
try (TextLoader textLoader = new TextLoader(
textConfiguration,
engine,
DateLocaleFactory.INSTANCE,
new DateFormatFactory(),
io.questdb.std.microtime.DateLocaleFactory.INSTANCE,
new io.questdb.std.microtime.DateFormatFactory()
)) {
textLoader.setState(TextLoader.ANALYZE_STRUCTURE);
textLoader.configureDestination("test", true, false, Atomicity.SKIP_ROW);
// if (columnSeparator > 0) {
// textLoader.configureColumnDelimiter(columnSeparator);
// }
int len = 4 * 1024 * 1024;
long buf = Unsafe.malloc(len);
try {
path.of("C:\\Users\\blues\\shared\\transactions.csv").$();
long fd = Files.openRO(path);
if (fd == -1) {
return null;
}
long fileLen = Files.length(fd);
long n = (int) Files.read(fd, buf, len, 0);
if (n > 0) {
textLoader.parse(buf, buf + n, AllowAllCairoSecurityContext.INSTANCE);
textLoader.setState(TextLoader.LOAD_DATA);
int read;
while (n < fileLen) {
read = (int) Files.read(fd, buf, len, n);
if (read < 1) {
// shit
break;
}
textLoader.parse(buf, buf + read, AllowAllCairoSecurityContext.INSTANCE);
n += read;
}
textLoader.wrapUp();
}
} finally {
Unsafe.free(buf, len);
}
}
} catch (JsonException e) {
e.printStackTrace();
} finally {
LOG.info().$("copied").$();
}
// CharSequence tableName = GenericLexer.immutableOf(tok);
return compiledQuery.ofCopy();
}
private CompiledQuery executeWithRetries(
ExecutableMethod method,
ExecutionModel executionModel,
......@@ -1161,6 +1036,71 @@ public class SqlCompiler implements Closeable {
return codeGenerator.generate(queryModel, executionContext);
}
private CompiledQuery insert(ExecutionModel executionModel, SqlExecutionContext executionContext) throws SqlException {
final InsertModel model = (InsertModel) executionModel;
final ExpressionNode name = model.getTableName();
tableExistsOrFail(name.position, name.token, executionContext);
try (TableReader reader = engine.getReader(executionContext.getCairoSecurityContext(), name.token, TableUtils.ANY_TABLE_VERSION)) {
final long structureVersion = reader.getVersion();
final RecordMetadata metadata = reader.getMetadata();
final int writerTimestampIndex = metadata.getTimestampIndex();
final CharSequenceHashSet columnSet = model.getColumnSet();
final int columnSetSize = columnSet.size();
final ColumnFilter columnFilter;
final ObjList<Function> valueFunctions;
Function timestampFunction = null;
if (columnSetSize > 0) {
listColumnFilter.clear();
columnFilter = listColumnFilter;
valueFunctions = new ObjList<>(columnSetSize);
for (int i = 0; i < columnSetSize; i++) {
int index = metadata.getColumnIndex(columnSet.get(i));
if (index < 0) {
throw SqlException.invalidColumn(model.getColumnPosition(i), columnSet.get(i));
}
final Function function = functionParser.parseFunction(model.getColumnValues().getQuick(i), metadata, executionContext);
if (!isAssignableFrom(metadata.getColumnType(index), function.getType())) {
throw SqlException.$(model.getColumnValues().getQuick(i).position, "inconvertible types: ").put(ColumnType.nameOf(function.getType())).put(" -> ").put(ColumnType.nameOf(metadata.getColumnType(index)));
}
if (i == writerTimestampIndex) {
timestampFunction = function;
} else {
valueFunctions.add(function);
listColumnFilter.add(index);
}
}
} else {
final int columnCount = metadata.getColumnCount();
entityColumnFilter.of(columnCount);
columnFilter = entityColumnFilter;
valueFunctions = new ObjList<>(columnCount);
for (int i = 0; i < columnCount; i++) {
Function function = functionParser.parseFunction(model.getColumnValues().getQuick(i), metadata, executionContext);
if (!isAssignableFrom(metadata.getColumnType(i), function.getType())) {
throw SqlException.$(model.getColumnValues().getQuick(i).position, "inconvertible types: ").put(ColumnType.nameOf(function.getType())).put(" -> ").put(ColumnType.nameOf(metadata.getColumnType(i)));
}
if (i == writerTimestampIndex) {
timestampFunction = function;
} else {
valueFunctions.add(function);
}
}
}
// validate timestamp
if (writerTimestampIndex > -1 && timestampFunction == null) {
throw SqlException.$(0, "insert statement must populate timestamp");
}
VirtualRecord record = new VirtualRecord(valueFunctions);
RecordToRowCopier copier = assembleRecordToRowCopier(asm, record, metadata, columnFilter);
return compiledQuery.ofInsert(new InsertStatementImpl(record, copier, timestampFunction, structureVersion));
}
}
private CompiledQuery insertAsSelect(ExecutionModel executionModel, SqlExecutionContext executionContext) throws SqlException {
final InsertModel model = (InsertModel) executionModel;
final ExpressionNode name = model.getTableName();
......@@ -1261,69 +1201,17 @@ public class SqlCompiler implements Closeable {
return compiledQuery.ofInsertAsSelect();
}
private CompiledQuery insert(ExecutionModel executionModel, SqlExecutionContext executionContext) throws SqlException {
final InsertModel model = (InsertModel) executionModel;
final ExpressionNode name = model.getTableName();
tableExistsOrFail(name.position, name.token, executionContext);
try (TableReader reader = engine.getReader(executionContext.getCairoSecurityContext(), name.token, TableUtils.ANY_TABLE_VERSION)) {
final long structureVersion = reader.getVersion();
final RecordMetadata metadata = reader.getMetadata();
final int writerTimestampIndex = metadata.getTimestampIndex();
final CharSequenceHashSet columnSet = model.getColumnSet();
final int columnSetSize = columnSet.size();
final ColumnFilter columnFilter;
final ObjList<Function> valueFunctions;
Function timestampFunction = null;
if (columnSetSize > 0) {
listColumnFilter.clear();
columnFilter = listColumnFilter;
valueFunctions = new ObjList<>(columnSetSize);
for (int i = 0; i < columnSetSize; i++) {
int index = metadata.getColumnIndex(columnSet.get(i));
if (index < 0) {
throw SqlException.invalidColumn(model.getColumnPosition(i), columnSet.get(i));
}
final Function function = functionParser.parseFunction(model.getColumnValues().getQuick(i), metadata, executionContext);
if (!isAssignableFrom(metadata.getColumnType(index), function.getType())) {
throw SqlException.$(model.getColumnValues().getQuick(i).position, "inconvertible types: ").put(ColumnType.nameOf(function.getType())).put(" -> ").put(ColumnType.nameOf(metadata.getColumnType(index)));
}
if (i == writerTimestampIndex) {
timestampFunction = function;
} else {
valueFunctions.add(function);
listColumnFilter.add(index);
}
}
} else {
final int columnCount = metadata.getColumnCount();
entityColumnFilter.of(columnCount);
columnFilter = entityColumnFilter;
valueFunctions = new ObjList<>(columnCount);
for (int i = 0; i < columnCount; i++) {
Function function = functionParser.parseFunction(model.getColumnValues().getQuick(i), metadata, executionContext);
if (!isAssignableFrom(metadata.getColumnType(i), function.getType())) {
throw SqlException.$(model.getColumnValues().getQuick(i).position, "inconvertible types: ").put(ColumnType.nameOf(function.getType())).put(" -> ").put(ColumnType.nameOf(metadata.getColumnType(i)));
}
if (i == writerTimestampIndex) {
timestampFunction = function;
} else {
valueFunctions.add(function);
}
}
private ExecutionModel lightlyValidateInsertModel(InsertModel model) throws SqlException {
ExpressionNode tableName = model.getTableName();
if (tableName.type != ExpressionNode.LITERAL) {
throw SqlException.$(tableName.position, "literal expected");
}
// validate timestamp
if (writerTimestampIndex > -1 && timestampFunction == null) {
throw SqlException.$(0, "insert statement must populate timestamp");
if (model.getColumnSet().size() > 0 && model.getColumnSet().size() != model.getColumnValues().size()) {
throw SqlException.$(model.getColumnPosition(0), "value count does not match column count");
}
VirtualRecord record = new VirtualRecord(valueFunctions);
RecordToRowCopier copier = assembleRecordToRowCopier(asm, record, metadata, columnFilter);
return compiledQuery.ofInsert(new InsertStatementImpl(record, copier, timestampFunction, structureVersion));
}
return model;
}
private boolean removeTableDirectory(CreateTableModel model) {
......@@ -1603,4 +1491,28 @@ public class SqlCompiler implements Closeable {
return this;
}
}
static {
castGroups.extendAndSet(ColumnType.BOOLEAN, 2);
castGroups.extendAndSet(ColumnType.BYTE, 1);
castGroups.extendAndSet(ColumnType.SHORT, 1);
castGroups.extendAndSet(ColumnType.CHAR, 1);
castGroups.extendAndSet(ColumnType.INT, 1);
castGroups.extendAndSet(ColumnType.LONG, 1);
castGroups.extendAndSet(ColumnType.FLOAT, 1);
castGroups.extendAndSet(ColumnType.DOUBLE, 1);
castGroups.extendAndSet(ColumnType.DATE, 1);
castGroups.extendAndSet(ColumnType.TIMESTAMP, 1);
castGroups.extendAndSet(ColumnType.STRING, 3);
castGroups.extendAndSet(ColumnType.SYMBOL, 3);
castGroups.extendAndSet(ColumnType.BINARY, 4);
sqlControlSymbols.add("(");
sqlControlSymbols.add(";");
sqlControlSymbols.add(")");
sqlControlSymbols.add(",");
sqlControlSymbols.add("/*");
sqlControlSymbols.add("*/");
sqlControlSymbols.add("--");
}
}
......@@ -85,6 +85,7 @@ public final class SqlParser {
private final ObjectPool<RenameTableModel> renameTableModelPool;
private final ObjectPool<WithClauseModel> withClauseModelPool;
private final ObjectPool<InsertModel> insertModelPool;
private final ObjectPool<CopyModel> copyModelPool;
private final ExpressionParser expressionParser;
private final CairoConfiguration configuration;
private final PostOrderTreeTraversalAlgo traversalAlgo;
......@@ -111,6 +112,7 @@ public final class SqlParser {
this.renameTableModelPool = new ObjectPool<>(RenameTableModel.FACTORY, configuration.getRenameTableModelPoolCapacity());
this.withClauseModelPool = new ObjectPool<>(WithClauseModel.FACTORY, configuration.getWithClauseModelPoolCapacity());
this.insertModelPool = new ObjectPool<>(InsertModel.FACTORY, configuration.getInsertPoolCapacity());
this.copyModelPool = new ObjectPool<>(CopyModel.FACTORY, configuration.getCopyPoolCapacity());
this.configuration = configuration;
this.traversalAlgo = traversalAlgo;
this.characterStore = characterStore;
......@@ -139,6 +141,7 @@ public final class SqlParser {
characterStore.clear();
insertModelPool.clear();
expressionTreeBuilder.reset();
copyModelPool.clear();
}
private CharSequence createColumnAlias(ExpressionNode node, QueryModel model) {
......@@ -292,9 +295,26 @@ public final class SqlParser {
return parseInsert(lexer);
}
if (Chars.equalsLowerCaseAscii(tok, "copy")) {
return parseCopy(lexer);
}
return parseSelect(lexer);
}
private ExecutionModel parseCopy(GenericLexer lexer) throws SqlException {
ExpressionNode tableName = expectExpr(lexer);
CharSequence tok = tok(lexer, "'from' or 'to'");
if (Chars.equalsLowerCaseAscii(tok, "from")) {
CopyModel model = copyModelPool.next();
model.setTableName(tableName);
model.setFileName(expectExpr(lexer));
return model;
}
return null;
}
private ExecutionModel parseCreateStatement(GenericLexer lexer, SqlExecutionContext executionContext) throws SqlException {
expectTok(lexer, "table");
return parseCreateTable(lexer, executionContext);
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package io.questdb.griffin.model;
import io.questdb.std.Mutable;
import io.questdb.std.ObjectFactory;
import io.questdb.std.Sinkable;
import io.questdb.std.str.CharSink;
public class CopyModel implements ExecutionModel, Mutable, Sinkable {
public static final ObjectFactory<CopyModel> FACTORY = CopyModel::new;
private ExpressionNode tableName;
private ExpressionNode fileName;
@Override
public void clear() {
}
public ExpressionNode getFileName() {
return fileName;
}
public void setFileName(ExpressionNode fileName) {
this.fileName = fileName;
}
@Override
public int getModelType() {
return ExecutionModel.COPY;
}
public ExpressionNode getTableName() {
return tableName;
}
public void setTableName(ExpressionNode tableName) {
this.tableName = tableName;
}
@Override
public void toSink(CharSink sink) {
}
}
......@@ -27,7 +27,8 @@ public interface ExecutionModel {
int QUERY = 1;
int CREATE_TABLE = 2;
int RENAME_TABLE = 3;
int INSERT_AS_SELECT = 4;
int INSERT = 4;
int COPY = 5;
int getModelType();
}
......@@ -82,7 +82,7 @@ public class InsertModel implements ExecutionModel, Mutable, Sinkable {
@Override
public int getModelType() {
return INSERT_AS_SELECT;
return INSERT;
}
public QueryModel getQueryModel() {
......
......@@ -728,25 +728,33 @@ public class SqlParserTest extends AbstractGriffinTest {
}
@Test
public void testInsertValues() throws SqlException {
assertModel("insert into x values (3, 'abc', ?)",
"insert into x values (3, 'abc', ?)",
ExecutionModel.INSERT_AS_SELECT,
public void testInsertAsSelect() throws SqlException {
assertModel(
"insert into x select-choose c, d from (y)",
"insert into x select * from y",
ExecutionModel.INSERT,
modelOf("x")
.col("a", ColumnType.INT)
.col("b", ColumnType.STRING)
.col("c", ColumnType.STRING));
.col("b", ColumnType.STRING),
modelOf("y")
.col("c", ColumnType.INT)
.col("d", ColumnType.STRING)
);
}
@Test
public void testInsertColumnsAndValues() throws SqlException {
assertModel("insert into x (a, b) values (3, ?)",
"insert into x (a,b) values (3, ?)",
ExecutionModel.INSERT_AS_SELECT,
public void testInsertAsSelectColumnList() throws SqlException {
assertModel(
"insert into x (a, b) select-choose c, d from (y)",
"insert into x (a,b) select * from y",
ExecutionModel.INSERT,
modelOf("x")
.col("a", ColumnType.INT)
.col("b", ColumnType.STRING)
.col("c", ColumnType.STRING));
.col("b", ColumnType.STRING),
modelOf("y")
.col("c", ColumnType.INT)
.col("d", ColumnType.STRING)
);
}
@Test
......@@ -2343,18 +2351,14 @@ public class SqlParserTest extends AbstractGriffinTest {
}
@Test
public void testInsertAsSelect() throws SqlException {
assertModel(
"insert into x select-choose c, d from (y)",
"insert into x select * from y",
ExecutionModel.INSERT_AS_SELECT,
public void testInsertColumnsAndValues() throws SqlException {
assertModel("insert into x (a, b) values (3, ?)",
"insert into x (a,b) values (3, ?)",
ExecutionModel.INSERT,
modelOf("x")
.col("a", ColumnType.INT)
.col("b", ColumnType.STRING),
modelOf("y")
.col("c", ColumnType.INT)
.col("d", ColumnType.STRING)
);
.col("b", ColumnType.STRING)
.col("c", ColumnType.STRING));
}
@Test
......@@ -2370,18 +2374,14 @@ public class SqlParserTest extends AbstractGriffinTest {
}
@Test
public void testInsertAsSelectColumnList() throws SqlException {
assertModel(
"insert into x (a, b) select-choose c, d from (y)",
"insert into x (a,b) select * from y",
ExecutionModel.INSERT_AS_SELECT,
public void testInsertValues() throws SqlException {
assertModel("insert into x values (3, 'abc', ?)",
"insert into x values (3, 'abc', ?)",
ExecutionModel.INSERT,
modelOf("x")
.col("a", ColumnType.INT)
.col("b", ColumnType.STRING),
modelOf("y")
.col("c", ColumnType.INT)
.col("d", ColumnType.STRING)
);
.col("b", ColumnType.STRING)
.col("c", ColumnType.STRING));
}
@Test
......
......@@ -78,7 +78,7 @@ cairo.sql.create.table.model.pool.capacity=64
cairo.sql.column.cast.model.pool.capacity=256
cairo.sql.rename.table.model.pool.capacity=512
cairo.sql.with.clause.model.pool.capacity=1024
cairo.sql.insert.as.select.model.pool.capacity=128
cairo.sql.insert.model.pool.capacity=128
line.udp.bind.to=10.2.1.33:9915
line.udp.commit.rate=100000
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册