未验证 提交 b3efe149 编写于 作者: A Alex Pelagenko 提交者: GitHub

feat(sql): update statement query model (#1607)

上级 4c3a3af6
......@@ -2144,6 +2144,11 @@ public class PropServerConfiguration implements ServerConfiguration {
public boolean isParallelIndexingEnabled() {
return parallelIndexingEnabled;
}
@Override
public boolean enableDevelopmentUpdates() {
return false;
}
}
private class PropLineUdpReceiverConfiguration implements LineUdpReceiverConfiguration {
......
......@@ -25,6 +25,7 @@
package io.questdb.cairo;
import io.questdb.cairo.sql.DataFrameCursorFactory;
import io.questdb.std.Chars;
import io.questdb.std.str.CharSink;
public abstract class AbstractDataFrameCursorFactory implements DataFrameCursorFactory {
......@@ -57,4 +58,9 @@ public abstract class AbstractDataFrameCursorFactory implements DataFrameCursorF
@Override
public void close() {
}
@Override
public boolean supportTableRowId(CharSequence tableName) {
return Chars.equalsIgnoreCaseNc(tableName, this.tableName);
}
}
......@@ -40,6 +40,8 @@ public interface CairoConfiguration {
ThreadLocal<Rnd> RANDOM = new ThreadLocal<>();
boolean enableDevelopmentUpdates();
boolean enableTestFactories();
int getAnalyticColumnPoolCapacity();
......
......@@ -495,6 +495,11 @@ public class DefaultCairoConfiguration implements CairoConfiguration {
return 1024;
}
@Override
public boolean enableDevelopmentUpdates() {
return false;
}
@Override
public int getSqlMapMaxResizes() {
return 64;
......
......@@ -126,6 +126,11 @@ public class TableReaderRecord implements Record, Sinkable {
return Rows.toRowID(reader.getPartitionIndex(columnBase), recordIndex);
}
@Override
public long getUpdateRowId() {
return getRowId();
}
@Override
public short getShort(int col) {
final long offset = getAdjustedRecordIndex(col) * Short.BYTES;
......
......@@ -122,6 +122,11 @@ public class TableReaderSelectedColumnRecord implements Record {
return Rows.toRowID(reader.getPartitionIndex(columnBase), recordIndex);
}
@Override
public long getUpdateRowId() {
return getRowId();
}
@Override
public long getLong(int columnIndex) {
final int col = deferenceColumn(columnIndex);
......
......@@ -26,8 +26,7 @@ package io.questdb.cairo;
import io.questdb.MessageBus;
import io.questdb.MessageBusImpl;
import io.questdb.cairo.sql.RecordMetadata;
import io.questdb.cairo.sql.SymbolTable;
import io.questdb.cairo.sql.*;
import io.questdb.cairo.vm.MemoryFCRImpl;
import io.questdb.cairo.vm.Vm;
import io.questdb.cairo.vm.api.*;
......@@ -729,6 +728,10 @@ public class TableWriter implements Closeable {
return txWriter.getPartitionCount();
}
public long getPartitionTimestamp(int partitionIndex) {
return txWriter.getPartitionTimestamp(partitionIndex);
}
public long getRawMetaMemory() {
return metaMem.getPageAddress(0);
}
......@@ -3563,17 +3566,17 @@ public class TableWriter implements Closeable {
o3TimestampMem.putLong128(timestamp, getO3RowCount0());
}
private void openColumnFiles(CharSequence name, int i, int plen) {
MemoryMAR mem1 = getPrimaryColumn(i);
MemoryMAR mem2 = getSecondaryColumn(i);
private void openColumnFiles(CharSequence name, int columnIndex, int pathTrimToLen) {
MemoryMAR mem1 = getPrimaryColumn(columnIndex);
MemoryMAR mem2 = getSecondaryColumn(columnIndex);
try {
mem1.of(ff, dFile(path.trimTo(plen), name), configuration.getDataAppendPageSize(), -1, MemoryTag.MMAP_TABLE_WRITER);
mem1.of(ff, dFile(path.trimTo(pathTrimToLen), name), configuration.getDataAppendPageSize(), -1, MemoryTag.MMAP_TABLE_WRITER);
if (mem2 != null) {
mem2.of(ff, iFile(path.trimTo(plen), name), configuration.getDataAppendPageSize(), -1, MemoryTag.MMAP_TABLE_WRITER);
mem2.of(ff, iFile(path.trimTo(pathTrimToLen), name), configuration.getDataAppendPageSize(), -1, MemoryTag.MMAP_TABLE_WRITER);
}
} finally {
path.trimTo(plen);
path.trimTo(pathTrimToLen);
}
}
......
......@@ -45,6 +45,8 @@ public interface DataFrameCursorFactory extends Sinkable, Closeable {
throw new UnsupportedOperationException();
}
boolean supportTableRowId(CharSequence tableName);
/**
* Returns 0 for ASC, 1 for DESC
*/
......
......@@ -24,7 +24,6 @@
package io.questdb.cairo.sql;
import io.questdb.cairo.ColumnType;
import io.questdb.std.BinarySequence;
import io.questdb.std.Long256;
import io.questdb.std.str.CharSink;
......@@ -185,7 +184,7 @@ public interface Record {
}
/**
* Gets the numeric ID of this row
* Gets the numeric ID of this row. This can be not real table row id
*
* @return numeric ID of the current row
*/
......@@ -193,6 +192,16 @@ public interface Record {
throw new UnsupportedOperationException();
}
/**
* Gets the numeric ID of this row. This must be real table row id
*
* @return numeric ID of the current row
*/
default long getUpdateRowId() {
throw new UnsupportedOperationException();
}
/**
* Gets the value of a short column by index
*
......
......@@ -91,6 +91,10 @@ public interface RecordCursorFactory extends Closeable, Sinkable {
return false;
}
default boolean supportsUpdateRowId(CharSequence tableName) {
return false;
}
default boolean usesCompiledFilter() {
return false;
}
......
......@@ -119,6 +119,11 @@ public class VirtualRecord implements ColumnTypes, Record {
return base.getRowId();
}
@Override
public long getUpdateRowId() {
return base.getUpdateRowId();
}
@Override
public short getShort(int col) {
return getFunction(col).getShort(base);
......
......@@ -27,6 +27,7 @@ package io.questdb.griffin;
import io.questdb.cairo.sql.InsertStatement;
import io.questdb.cairo.sql.RecordCursorFactory;
import io.questdb.cutlass.text.TextLoader;
import io.questdb.griffin.update.UpdateStatement;
import io.questdb.mp.SCSequence;
public interface CompiledQuery {
......@@ -43,6 +44,7 @@ public interface CompiledQuery {
short COPY_REMOTE = 11;
short RENAME_TABLE = 12;
short BACKUP_TABLE = 13;
short UPDATE = 14;
short LOCK = 14;
short UNLOCK = 14;
short VACUUM = 15;
......@@ -57,6 +59,8 @@ public interface CompiledQuery {
short getType();
UpdateStatement getUpdateStatement();
/***
* Executes the query.
* If execution is done in sync returns an instance of QueryFuture where isDone() is true.
......@@ -67,7 +71,3 @@ public interface CompiledQuery {
*/
QueryFuture execute(SCSequence eventSubSeq) throws SqlException;
}
......@@ -43,6 +43,7 @@ import io.questdb.std.datetime.microtime.MicrosecondClock;
import io.questdb.tasks.TableWriterTask;
import java.util.concurrent.locks.LockSupport;
import io.questdb.griffin.update.UpdateStatement;
public class CompiledQueryImpl implements CompiledQuery {
private static final Log LOG = LogFactory.getLog(CompiledQueryImpl.class);
......@@ -50,6 +51,7 @@ public class CompiledQueryImpl implements CompiledQuery {
private final AlterTableQueryFuture alterFuture = new AlterTableQueryFuture();
private RecordCursorFactory recordCursorFactory;
private InsertStatement insertStatement;
private UpdateStatement updateStatement;
private TextLoader textLoader;
private AlterStatement alterStatement;
private short type;
......@@ -84,6 +86,17 @@ public class CompiledQueryImpl implements CompiledQuery {
return type;
}
@Override
public UpdateStatement getUpdateStatement() {
return updateStatement;
}
public CompiledQuery ofUpdate(UpdateStatement updateStatement) {
this.updateStatement = updateStatement;
this.type = UPDATE;
return this;
}
@Override
public QueryFuture execute(SCSequence eventSubSeq) throws SqlException {
if (type == INSERT) {
......
......@@ -32,6 +32,8 @@ import io.questdb.griffin.engine.functions.CursorFunction;
import io.questdb.griffin.engine.functions.GroupByFunction;
import io.questdb.griffin.engine.functions.bind.IndexedParameterLinkFunction;
import io.questdb.griffin.engine.functions.bind.NamedParameterLinkFunction;
import io.questdb.griffin.engine.functions.cast.CastGeoHashToGeoHashFunctionFactory;
import io.questdb.griffin.engine.functions.cast.CastStrToGeoHashFunctionFactory;
import io.questdb.griffin.engine.functions.cast.CastStrToTimestampFunctionFactory;
import io.questdb.griffin.engine.functions.cast.CastSymbolToTimestampFunctionFactory;
import io.questdb.griffin.engine.functions.columns.*;
......@@ -42,6 +44,7 @@ import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.std.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.ArrayDeque;
......@@ -153,6 +156,46 @@ public class FunctionParser implements PostOrderTreeTraversalAlgo.Visitor, Mutab
return NullConstant.NULL;
}
public Function createImplicitCast(int position, Function function, int toType) throws SqlException {
Function cast = createImplicitCastOrNull(position, function, toType);
if (cast != null && cast.isConstant()) {
Function constant = functionToConstant(cast);
// incoming function is now converted to a constant and can be closed here
// since the returning constant will not use the function as underlying arg
function.close();
return constant;
}
// Do not close incoming function if cast is not a constant
// it will be used inside the cast as an argument
return cast;
}
@Nullable
private Function createImplicitCastOrNull(int position, Function function, int toType) throws SqlException {
int fromType = function.getType();
switch (fromType) {
case ColumnType.STRING:
case ColumnType.SYMBOL:
if (toType == ColumnType.TIMESTAMP) {
return new CastStrToTimestampFunctionFactory.Func(position, function);
}
if (ColumnType.isGeoHash(toType)) {
return CastStrToGeoHashFunctionFactory.newInstance(position, toType, function);
}
break;
default:
if (ColumnType.isGeoHash(fromType)) {
int fromGeoBits = ColumnType.getGeoHashBits(fromType);
int toGeoBits = ColumnType.getGeoHashBits(toType);
if (ColumnType.isGeoHash(toType) && toGeoBits < fromGeoBits) {
return CastGeoHashToGeoHashFunctionFactory.newInstance(position, function, fromType, toType);
}
}
break;
}
return null;
}
public FunctionFactoryCache getFunctionFactoryCache() {
return functionFactoryCache;
}
......@@ -210,11 +253,7 @@ public class FunctionParser implements PostOrderTreeTraversalAlgo.Visitor, Mutab
positionStack.pop();
assert positionStack.size() == functionStack.size();
if (function != null && function.isConstant() && (function instanceof ScalarFunction)) {
try {
return functionToConstant(function);
} finally {
function.close();
}
return functionToConstant(function);
}
return function;
} finally {
......@@ -735,6 +774,16 @@ public class FunctionParser implements PostOrderTreeTraversalAlgo.Visitor, Mutab
}
private Function functionToConstant(Function function) {
Function newFunction = functionToConstant0(function);
// Sometimes functionToConstant0 returns same instance as passed in parameter
if (newFunction != function) {
// and we want to close underlying function only in case it's different form returned newFunction
function.close();
}
return newFunction;
}
private Function functionToConstant0(Function function) {
int type = function.getType();
switch (ColumnType.tagOf(type)) {
case ColumnType.INT:
......
......@@ -27,8 +27,8 @@ package io.questdb.griffin;
import io.questdb.cairo.*;
import io.questdb.cairo.map.RecordValueSink;
import io.questdb.cairo.map.RecordValueSinkFactory;
import io.questdb.cairo.sql.Record;
import io.questdb.cairo.sql.*;
import io.questdb.cairo.sql.Record;
import io.questdb.cairo.vm.Vm;
import io.questdb.cairo.vm.api.MemoryCARW;
import io.questdb.griffin.engine.EmptyTableRecordCursorFactory;
......@@ -2028,6 +2028,34 @@ public class SqlCodeGenerator implements Mutable, Closeable {
final int selectColumnCount = columns.size();
final ExpressionNode timestamp = model.getTimestamp();
// If this is update query and column types don't match exactly
// to the column type of table to be updated we have to fall back to
// select-virtual
if (model.isUpdate()) {
boolean columnTypeMismatch = false;
ObjList<CharSequence> updateColumnNames = model.getUpdateTableColumnNames();
IntList updateColumnTypes = model.getUpdateTableColumnTypes();
for(int i = 0, n = columns.size(); i < n; i++) {
QueryColumn queryColumn = columns.getQuick(i);
CharSequence columnName = queryColumn.getAlias();
int index = metadata.getColumnIndexQuiet(queryColumn.getAst().token);
assert index > -1 : "wtf? " + queryColumn.getAst().token;
int updateColumnIndex = updateColumnNames.indexOf(columnName);
int updateColumnType = updateColumnTypes.get(updateColumnIndex);
if (updateColumnType != metadata.getColumnType(index)) {
columnTypeMismatch = true;
break;
}
}
if (columnTypeMismatch) {
return generateSelectVirtualWithSubquery(model, executionContext, factory);
}
}
boolean entity;
// the model is considered entity when it doesn't add any value to its nested model
//
......@@ -2414,7 +2442,11 @@ public class SqlCodeGenerator implements Mutable, Closeable {
private RecordCursorFactory generateSelectVirtual(QueryModel model, SqlExecutionContext executionContext) throws SqlException {
final RecordCursorFactory factory = generateSubQuery(model, executionContext);
return generateSelectVirtualWithSubquery(model, executionContext, factory);
}
@NotNull
private VirtualRecordCursorFactory generateSelectVirtualWithSubquery(QueryModel model, SqlExecutionContext executionContext, RecordCursorFactory factory) throws SqlException {
try {
final ObjList<QueryColumn> columns = model.getColumns();
final int columnCount = columns.size();
......@@ -2438,18 +2470,47 @@ public class SqlCodeGenerator implements Mutable, Closeable {
virtualMetadata.setTimestampIndex(i);
}
final Function function = functionParser.parseFunction(
Function function = functionParser.parseFunction(
column.getAst(),
metadata,
executionContext
);
// define "undefined" functions as string
int targetColumnType = -1;
if (model.isUpdate()) {
// Check the type of the column to be updated
int columnIndex = model.getUpdateTableColumnNames().indexOf(column.getAlias());
targetColumnType = model.getUpdateTableColumnTypes().get(columnIndex);
}
// define "undefined" functions as string unless it's update. Leave Undefined if update
if (function.isUndefined()) {
function.assignType(ColumnType.STRING, executionContext.getBindVariableService());
if (!model.isUpdate()) {
function.assignType(ColumnType.STRING, executionContext.getBindVariableService());
} else {
// Set bind variable the type of the column
function.assignType(targetColumnType, executionContext.getBindVariableService());
}
}
int columnType = function.getType();
if (targetColumnType != -1 && targetColumnType != columnType) {
// This is an update and the target column does not match with column the update is trying to perform
if (SqlCompiler.builtInFunctionCast(targetColumnType, function.getType())) {
// All functions will be able to getLong() if they support getInt(), no need to generate cast here
columnType = targetColumnType;
} else {
Function castFunction = functionParser.createImplicitCast(column.getAst().position, function, targetColumnType);
if (castFunction != null) {
function = castFunction;
columnType = targetColumnType;
}
// else - update code will throw incompatibility exception. It will have better chance close resources then
}
}
functions.add(function);
if (function instanceof SymbolFunction) {
if (function instanceof SymbolFunction && columnType == ColumnType.SYMBOL) {
virtualMetadata.add(
new TableColumnMetadata(
Chars.toString(column.getAlias()),
......@@ -2466,7 +2527,7 @@ public class SqlCodeGenerator implements Mutable, Closeable {
new TableColumnMetadata(
Chars.toString(column.getAlias()),
configuration.getRandom().nextLong(),
function.getType(),
columnType,
function.getMetadata()
)
);
......@@ -2591,7 +2652,7 @@ public class SqlCodeGenerator implements Mutable, Closeable {
boolean contextTimestampRequired = executionContext.isTimestampRequired();
// some "sample by" queries don't select any cols but needs timestamp col selected
// for example "select count() from x sample by 1h" implicitly needs timestamp column selected
if (topDownColumnCount > 0 || contextTimestampRequired) {
if (topDownColumnCount > 0 || contextTimestampRequired || model.isUpdate()) {
framingSupported = true;
for (int i = 0; i < topDownColumnCount; i++) {
int columnIndex = readerMeta.getColumnIndexQuiet(topDownColumns.getQuick(i).getName());
......
......@@ -28,8 +28,8 @@ import io.questdb.MessageBus;
import io.questdb.PropServerConfiguration;
import io.questdb.cairo.*;
import io.questdb.cairo.pool.WriterPool;
import io.questdb.cairo.sql.Record;
import io.questdb.cairo.sql.*;
import io.questdb.cairo.sql.Record;
import io.questdb.cairo.vm.Vm;
import io.questdb.cairo.vm.api.MemoryMARW;
import io.questdb.cutlass.text.Atomicity;
......@@ -44,6 +44,7 @@ import io.questdb.griffin.engine.functions.catalogue.ShowTransactionIsolationLev
import io.questdb.griffin.engine.table.ShowColumnsRecordCursorFactory;
import io.questdb.griffin.engine.table.TableListRecordCursorFactory;
import io.questdb.griffin.model.*;
import io.questdb.griffin.update.UpdateStatement;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.std.*;
......@@ -63,7 +64,6 @@ public class SqlCompiler implements Closeable {
public static final ObjList<String> sqlControlSymbols = new ObjList<>(8);
private final static Log LOG = LogFactory.getLog(SqlCompiler.class);
private static final IntList castGroups = new IntList();
private static final CastStrToGeoHashFunctionFactory GEO_HASH_FUNCTION_FACTORY = new CastStrToGeoHashFunctionFactory();
private static final CastCharToStrFunctionFactory CHAR_TO_STR_FUNCTION_FACTORY = new CastCharToStrFunctionFactory();
protected final GenericLexer lexer;
protected final Path path = new Path();
......@@ -820,12 +820,8 @@ public class SqlCompiler implements Closeable {
final int fromTag = ColumnType.tagOf(from);
return (toTag == fromTag && (ColumnType.getGeoHashBits(to) <= ColumnType.getGeoHashBits(from)
|| ColumnType.getGeoHashBits(from) == 0) /* to account for typed NULL assignment */)
|| fromTag == ColumnType.NULL
//widening conversions
|| (fromTag >= ColumnType.BYTE
&& toTag >= ColumnType.BYTE
&& toTag <= ColumnType.DOUBLE
&& fromTag < toTag)
// widening conversions,
|| builtInFunctionCast(to, from)
//narrowing conversions
|| (fromTag == ColumnType.DOUBLE && (toTag == ColumnType.FLOAT || (toTag >= ColumnType.BYTE && toTag <= ColumnType.LONG)))
|| (fromTag == ColumnType.FLOAT && toTag >= ColumnType.BYTE && toTag <= ColumnType.LONG)
......@@ -852,6 +848,22 @@ public class SqlCompiler implements Closeable {
|| (fromTag == ColumnType.SYMBOL && toTag == ColumnType.TIMESTAMP);
}
public static boolean builtInFunctionCast(int toType, int fromType) {
// This method returns true when a cast is not needed from type to type
// because of the way typed functions are implemented.
// For example IntFunction has getDouble() method implemented and does not need
// additional wrap function to CAST to double.
// This is usually case for widening conversions.
return (fromType >= ColumnType.BYTE
&& toType >= ColumnType.BYTE
&& toType <= ColumnType.DOUBLE
&& fromType < toType)
|| fromType == ColumnType.NULL
// char can be short and short can be char for symmetry
|| (fromType == ColumnType.CHAR && toType == ColumnType.SHORT)
|| (fromType == ColumnType.TIMESTAMP && toType == ColumnType.LONG);
}
@Override
public void close() {
backupAgent.close();
......@@ -863,6 +875,15 @@ public class SqlCompiler implements Closeable {
@NotNull
public CompiledQuery compile(@NotNull CharSequence query, @NotNull SqlExecutionContext executionContext) throws SqlException {
CompiledQuery result = compile0(query, executionContext);
if (result.getType() != CompiledQuery.UPDATE || configuration.enableDevelopmentUpdates()) {
return result;
}
throw SqlException.$(0, "UPDATE statement is not supported yet");
}
@NotNull
private CompiledQuery compile0(@NotNull CharSequence query, @NotNull SqlExecutionContext executionContext) throws SqlException {
clear();
//
// these are quick executions that do not require building of a model
......@@ -1592,6 +1613,8 @@ public class SqlCompiler implements Closeable {
} else {
return lightlyValidateInsertModel(insertModel);
}
case ExecutionModel.UPDATE:
optimiser.optimiseUpdate((QueryModel) model, executionContext);
default:
return model;
}
......@@ -1627,6 +1650,10 @@ public class SqlCompiler implements Closeable {
final RenameTableModel rtm = (RenameTableModel) executionModel;
engine.rename(executionContext.getCairoSecurityContext(), path, GenericLexer.unquote(rtm.getFrom().token), renamePath, GenericLexer.unquote(rtm.getTo().token));
return compiledQuery.ofRenameTable();
case ExecutionModel.UPDATE:
final QueryModel updateQueryModel = (QueryModel) executionModel;
UpdateStatement updateStatement = generateUpdate(updateQueryModel, executionContext);
return compiledQuery.ofUpdate(updateStatement);
default:
InsertModel insertModel = (InsertModel) executionModel;
if (insertModel.getQueryModel() != null) {
......@@ -1963,6 +1990,73 @@ public class SqlCompiler implements Closeable {
throw SqlException.position(0).put("underlying cursor is extremely volatile");
}
UpdateStatement generateUpdate(QueryModel updateQueryModel, SqlExecutionContext executionContext) throws SqlException {
// Update QueryModel structure is
// QueryModel with SET column expressions
// |-- QueryModel of select-virtual or select-choose of data selected for update
final QueryModel selectQueryModel = updateQueryModel.getNestedModel();
// First generate plan for nested SELECT QueryModel
final RecordCursorFactory updateToCursorFactory = codeGenerator.generate(selectQueryModel, executionContext);
// And then generate plan for UPDATE top level QueryModel
final IntList tableColumnTypes = selectQueryModel.getUpdateTableColumnTypes();
final ObjList<CharSequence> tableColumnNames = selectQueryModel.getUpdateTableColumnNames();
final int tableId = selectQueryModel.getTableId();
final long tableVersion = selectQueryModel.getTableVersion();
return generateUpdateStatement(
updateQueryModel,
tableColumnTypes,
tableColumnNames,
tableId,
tableVersion,
updateToCursorFactory
);
}
private static UpdateStatement generateUpdateStatement(
@Transient QueryModel updateQueryModel,
@Transient IntList tableColumnTypes,
@Transient ObjList<CharSequence> tableColumnNames,
int tableId,
long tableVersion,
RecordCursorFactory updateToCursorFactory
) throws SqlException {
try {
String tableName = updateQueryModel.getUpdateTableName();
if (!updateToCursorFactory.supportsUpdateRowId(tableName)) {
throw SqlException.$(updateQueryModel.getModelPosition(), "Only simple UPDATE statements without joins are supported");
}
// Check that updateDataFactoryMetadata match types of table to be updated exactly
RecordMetadata updateDataFactoryMetadata = updateToCursorFactory.getMetadata();
for (int i = 0, n = updateDataFactoryMetadata.getColumnCount(); i < n; i++) {
int virtualColumnType = updateDataFactoryMetadata.getColumnType(i);
CharSequence updateColumnName = updateDataFactoryMetadata.getColumnName(i);
int tableColumnIndex = tableColumnNames.indexOf(updateColumnName);
int tableColumnType = tableColumnTypes.get(tableColumnIndex);
if (virtualColumnType != tableColumnType) {
// get column position
ExpressionNode setRhs = updateQueryModel.getNestedModel().getColumns().getQuick(i).getAst();
int position = setRhs.position;
throw SqlException.inconvertibleTypes(position, virtualColumnType, "", tableColumnType, updateColumnName);
}
}
return new UpdateStatement(
tableName,
tableId,
tableVersion,
updateToCursorFactory
);
} catch (Throwable e) {
Misc.free(updateToCursorFactory);
throw e;
}
}
RecordCursorFactory generate(QueryModel queryModel, SqlExecutionContext executionContext) throws SqlException {
return codeGenerator.generate(queryModel, executionContext);
}
......@@ -2534,7 +2628,7 @@ public class SqlCompiler implements Closeable {
function = CHAR_TO_STR_FUNCTION_FACTORY.newInstance(function);
// fall through to STRING
default:
function = GEO_HASH_FUNCTION_FACTORY.newInstance(functionPosition, columnType, function);
function = CastStrToGeoHashFunctionFactory.newInstance(functionPosition, columnType, function);
break;
}
}
......
......@@ -1380,6 +1380,20 @@ public class SqlKeywords {
&& (tok.charAt(i) | 32) == 'k';
}
public static boolean isUpdateKeyword(CharSequence tok) {
if (tok.length() != 6) {
return false;
}
int i = 0;
return (tok.charAt(i++) | 32) == 'u'
&& (tok.charAt(i++) | 32) == 'p'
&& (tok.charAt(i++) | 32) == 'd'
&& (tok.charAt(i++) | 32) == 'a'
&& (tok.charAt(i++) | 32) == 't'
&& (tok.charAt(i) | 32) == 'e';
}
public static boolean isValuesKeyword(CharSequence tok) {
if (tok.length() != 6) {
return false;
......
......@@ -688,12 +688,20 @@ class SqlOptimiser {
}
}
private void copyColumnTypesFromMetadata(QueryModel model, TableReaderMetadata m){
// TODO: optimise by copying column indexes, types of the columns used in SET clause in the UPDATE only
for (int i = 0, k = m.getColumnCount(); i < k; i++) {
model.addUpdateTableColumnMetadata(m.getColumnType(i), m.getColumnName(i));
}
}
private void copyColumnsFromMetadata(QueryModel model, RecordMetadata m, boolean cleanColumnNames) throws SqlException {
// column names are not allowed to have dot
for (int i = 0, k = m.getColumnCount(); i < k; i++) {
CharSequence columnName = createColumnAlias(m.getColumnName(i), model, cleanColumnNames);
model.addField(queryColumnPool.next().of(columnName, expressionNodePool.next().of(LITERAL, columnName, 0, 0)));
QueryColumn column = queryColumnPool.next().of(columnName, expressionNodePool.next().of(LITERAL, columnName, 0, 0));
model.addField(column);
}
// validate explicitly defined timestamp, if it exists
......@@ -1362,6 +1370,9 @@ class SqlOptimiser {
final QueryModel nested = model.getNestedModel();
if (nested != null) {
enumerateTableColumns(nested, executionContext);
if (model.isUpdate()) {
model.copyUpdateTableMetadata(nested);
}
// copy columns of nested model onto parent one
// we must treat sub-query just like we do a table
// model.copyColumnsFrom(nested, queryColumnPool, expressionNodePool);
......@@ -1870,6 +1881,9 @@ class SqlOptimiser {
model.setTableVersion(r.getVersion());
model.setTableId(r.getMetadata().getId());
copyColumnsFromMetadata(model, r.getMetadata(), false);
if (model.isUpdate()) {
copyColumnTypesFromMetadata(model, r.getMetadata());
}
} catch (EntryLockedException e) {
throw SqlException.position(tableNamePosition).put("table is locked: ").put(tableLookupSequence);
} catch (CairoException e) {
......@@ -1877,6 +1891,74 @@ class SqlOptimiser {
}
}
void optimiseUpdate(QueryModel updateQueryModel, SqlExecutionContext sqlExecutionContext) throws SqlException {
final QueryModel selectQueryModel = updateQueryModel.getNestedModel();
selectQueryModel.setIsUpdate(true);
QueryModel optimisedNested = optimise(selectQueryModel, sqlExecutionContext);
assert optimisedNested.isUpdate();
updateQueryModel.setNestedModel(optimisedNested);
// And then generate plan for UPDATE top level QueryModel
validateUpdateColumns(updateQueryModel, sqlExecutionContext, optimisedNested.getTableId(), optimisedNested.getTableVersion());
}
private void validateUpdateColumns(QueryModel updateQueryModel, SqlExecutionContext executionContext, int tableId, long tableVersion) throws SqlException {
try (
TableReader r = engine.getReader(
executionContext.getCairoSecurityContext(),
updateQueryModel.getTableName().token,
tableId,
tableVersion
)
) {
TableReaderMetadata metadata = r.getMetadata();
if (metadata.getPartitionBy() == PartitionBy.NONE) {
throw SqlException.$(updateQueryModel.getModelPosition(), "UPDATE query can only be executed on partitioned tables");
}
int timestampIndex = metadata.getTimestampIndex();
if (timestampIndex < 0) {
throw SqlException.$(updateQueryModel.getModelPosition(), "UPDATE query can only be executed on tables with Designated timestamp");
}
tempList.clear(metadata.getColumnCount());
tempList.setPos(metadata.getColumnCount());
int updateSetColumnCount = updateQueryModel.getUpdateExpressions().size();
for(int i = 0; i < updateSetColumnCount; i++) {
// SET left hand side expressions are stored in top level UPDATE QueryModel
ExpressionNode columnExpression = updateQueryModel.getUpdateExpressions().get(i);
int position = columnExpression.position;
int columnIndex = metadata.getColumnIndexQuiet(columnExpression.token);
// SET right hand side expressions are stored in the Nested SELECT QueryModel as columns
QueryColumn queryColumn = updateQueryModel.getNestedModel().getColumns().get(i);
if (columnIndex < 0) {
throw SqlException.invalidColumn(position, queryColumn.getName());
}
if (columnIndex == timestampIndex) {
throw SqlException.$(position, "Designated timestamp column cannot be updated");
}
if (tempList.getQuick(columnIndex) == 1) {
throw SqlException.$(position, "Duplicate column ").put(queryColumn.getName()).put(" in SET clause");
}
tempList.set(columnIndex, 1);
ExpressionNode rhs = queryColumn.getAst();
if (rhs.type == FUNCTION) {
if (functionParser.getFunctionFactoryCache().isGroupBy(rhs.token)) {
throw SqlException.$(rhs.position, "Unsupported function in SET clause");
}
}
}
// Save update table name as a String to not re-create string later on from CharSequence
updateQueryModel.setUpdateTableName(r.getTableName());
} catch (EntryLockedException e) {
throw SqlException.position(updateQueryModel.getModelPosition()).put("table is locked: ").put(tableLookupSequence);
} catch (CairoException e) {
throw SqlException.position(updateQueryModel.getModelPosition()).put(e);
}
}
QueryModel optimise(QueryModel model, SqlExecutionContext sqlExecutionContext) throws SqlException {
final QueryModel rewrittenModel;
try {
......@@ -3100,6 +3182,10 @@ class SqlOptimiser {
root.setUnionModel(model.getUnionModel());
root.setSetOperationType(model.getSetOperationType());
root.setModelPosition(model.getModelPosition());
if (model.isUpdate()) {
root.setIsUpdate(true);
root.copyUpdateTableMetadata(model);
}
}
return root;
}
......
......@@ -62,6 +62,7 @@ public final class SqlParser {
private final PostOrderTreeTraversalAlgo.Visitor rewriteCount0Ref = this::rewriteCount0;
private final PostOrderTreeTraversalAlgo.Visitor rewriteConcat0Ref = this::rewriteConcat0;
private final PostOrderTreeTraversalAlgo.Visitor rewriteTypeQualifier0Ref = this::rewriteTypeQualifier0;
private final LowerCaseCharSequenceObjHashMap<WithClauseModel> topLevelWithModel = new LowerCaseCharSequenceObjHashMap<>();
private boolean subQueryMode = false;
SqlParser(
......@@ -160,6 +161,7 @@ public final class SqlParser {
insertModelPool.clear();
expressionTreeBuilder.reset();
copyModelPool.clear();
topLevelWithModel.clear();
}
private CharSequence createColumnAlias(ExpressionNode node, QueryModel model) {
......@@ -378,6 +380,10 @@ public final class SqlParser {
return parseCreateStatement(lexer, executionContext);
}
if (isUpdateKeyword(tok)) {
return parseUpdate(lexer);
}
if (isRenameKeyword(tok)) {
return parseRenameStatement(lexer);
}
......@@ -390,6 +396,10 @@ public final class SqlParser {
return parseCopy(lexer);
}
if (isWithKeyword(tok)) {
return parseWith(lexer);
}
return parseSelect(lexer);
}
......@@ -755,12 +765,29 @@ public final class SqlParser {
return null;
}
@NotNull
private ExecutionModel parseWith(GenericLexer lexer) throws SqlException {
parseWithClauses(lexer, topLevelWithModel);
CharSequence tok = tok(lexer, "'select', 'update' or name expected");
if (!isUpdateKeyword(tok)) {
// SELECT
lexer.unparse();
return parseDml(lexer, null);
} else {
// UPDATE
return parseUpdate(lexer);
}
}
private QueryModel parseDml(GenericLexer lexer, @Nullable LowerCaseCharSequenceObjHashMap<WithClauseModel> withClauses) throws SqlException {
QueryModel model = null;
QueryModel prevModel = null;
while (true) {
QueryModel unionModel = parseDml0(lexer, prevModel != null ? prevModel.getWithClauses() : withClauses);
LowerCaseCharSequenceObjHashMap<WithClauseModel> parentWithClauses = prevModel != null ? prevModel.getWithClauses() : withClauses;
LowerCaseCharSequenceObjHashMap<WithClauseModel> topWithClauses = model == null ? topLevelWithModel : null;
QueryModel unionModel = parseDml0(lexer, parentWithClauses, topWithClauses);
if (prevModel == null) {
model = unionModel;
prevModel = model;
......@@ -800,21 +827,27 @@ public final class SqlParser {
}
@NotNull
private QueryModel parseDml0(GenericLexer lexer, @Nullable LowerCaseCharSequenceObjHashMap<WithClauseModel> parentWithClauses) throws SqlException {
private QueryModel parseDml0(
GenericLexer lexer,
@Nullable LowerCaseCharSequenceObjHashMap<WithClauseModel> parentWithClauses,
@Nullable LowerCaseCharSequenceObjHashMap<WithClauseModel> topWithClauses
) throws SqlException {
CharSequence tok;
final int modelPosition = lexer.getPosition();
QueryModel model = queryModelPool.next();
model.setModelPosition(modelPosition);
if (parentWithClauses != null) {
model.addWithClauses(parentWithClauses);
model.getWithClauses().putAll(parentWithClauses);
}
tok = tok(lexer, "'select', 'with' or table name expected");
if (isWithKeyword(tok)) {
parseWithClauses(lexer, model);
parseWithClauses(lexer, model.getWithClauses());
tok = tok(lexer, "'select' or table name expected");
} else if (topWithClauses != null) {
model.getWithClauses().putAll(topWithClauses);
}
// [select]
......@@ -861,6 +894,133 @@ public final class SqlParser {
return model;
}
private QueryModel parseDmlUpdate(GenericLexer lexer) throws SqlException {
// Update QueryModel structure is
// QueryModel with SET column expressions (updateQueryModel)
// |-- nested QueryModel of select-virtual or select-choose of data selected for update (fromModel)
// |-- nested QueryModel with selected data (nestedModel)
// |-- join QueryModels to represent FROM clause
CharSequence tok;
final int modelPosition = lexer.getPosition();
QueryModel updateQueryModel = queryModelPool.next();
updateQueryModel.setModelType(ExecutionModel.UPDATE);
updateQueryModel.setModelPosition(modelPosition);
QueryModel fromModel = queryModelPool.next();
fromModel.setModelPosition(modelPosition);
updateQueryModel.setIsUpdate(true);
fromModel.setIsUpdate(true);
tok = tok(lexer, "UPDATE, WITH or table name expected");
// [update]
if (isUpdateKeyword(tok)) {
// parse SET statements into updateQueryModel and rhs of SETs into fromModel to select
parseUpdateClause(lexer, updateQueryModel, fromModel);
// create nestedModel QueryModel to source rowids for the update
QueryModel nestedModel = queryModelPool.next();
nestedModel.setTableName(fromModel.getTableName());
nestedModel.setAlias(updateQueryModel.getAlias());
nestedModel.setIsUpdate(true);
// nest nestedModel inside fromModel
fromModel.setTableName(null);
fromModel.setNestedModel(nestedModel);
// Add WITH clauses if they exist into fromModel
fromModel.getWithClauses().putAll(topLevelWithModel);
tok = optTok(lexer);
// [from]
if (tok != null && isFromKeyword(tok)) {
tok = ","; // FROM in Postgres UPDATE statement means cross join
int joinType;
int i = 0;
while (tok != null && (joinType = joinStartSet.get(tok)) != -1) {
if (i++ == 1) {
throw SqlException.$(lexer.lastTokenPosition(), "JOIN is not supported on UPDATE statement");
}
// expect multiple [[inner | outer | cross] join]
nestedModel.addJoinModel(parseJoin(lexer, tok, joinType, topLevelWithModel));
tok = optTok(lexer);
}
} else if (tok != null && !isWhereKeyword(tok)) {
throw SqlException.$(lexer.lastTokenPosition(), "FROM, WHERE or EOF expected");
}
// [where]
if (tok != null && isWhereKeyword(tok)) {
ExpressionNode expr = expr(lexer, fromModel);
if (expr != null) {
nestedModel.setWhereClause(expr);
} else {
throw SqlException.$((lexer.lastTokenPosition()), "empty where clause");
}
} else if (tok != null) {
throw errUnexpected(lexer, tok);
}
updateQueryModel.setNestedModel(fromModel);
}
return updateQueryModel;
}
private void parseUpdateClause(GenericLexer lexer, QueryModel updateQueryModel, QueryModel fromModel) throws SqlException {
CharSequence tok = tok(lexer, "table name or alias");
CharSequence tableName = GenericLexer.immutableOf(tok);
ExpressionNode tableNameExpr = ExpressionNode.FACTORY.newInstance().of(ExpressionNode.LITERAL, tableName, 0, 0);
updateQueryModel.setTableName(tableNameExpr);
fromModel.setTableName(tableNameExpr);
tok = tok(lexer, "AS, SET or table alias expected");
if (isAsKeyword(tok)) {
tok = tok(lexer, "table alias expected");
if (isSetKeyword(tok)) {
throw SqlException.$(lexer.lastTokenPosition(), "table alias expected");
}
}
if (!isAsKeyword(tok) && !isSetKeyword(tok)) {
// This is table alias
CharSequence tableAlias = GenericLexer.immutableOf(tok);
ExpressionNode tableAliasExpr = ExpressionNode.FACTORY.newInstance().of(ExpressionNode.LITERAL, tableAlias, 0, 0);
updateQueryModel.setAlias(tableAliasExpr);
tok = tok(lexer, "SET expected");
}
if (!isSetKeyword(tok)) {
throw SqlException.$(lexer.lastTokenPosition(), "SET expected");
}
while (true) {
// Column
tok = tok(lexer, "column name");
CharSequence col = GenericLexer.immutableOf(GenericLexer.unquote(tok));
int colPosition = lexer.lastTokenPosition();
expectTok(lexer, "=");
// Value expression
ExpressionNode expr = expr(lexer, (QueryModel) null);
ExpressionNode setColumnExpression = expressionNodePool.next().of(ExpressionNode.LITERAL, col, 0, colPosition);
updateQueryModel.getUpdateExpressions().add(setColumnExpression);
QueryColumn valueColumn = queryColumnPool.next().of(col, expr);
fromModel.addBottomUpColumn(valueColumn);
tok = optTok(lexer);
if (tok == null) {
break;
}
if (tok.length() != 1 || tok.charAt(0) != ',') {
lexer.unparse();
break;
}
}
}
private void parseFromClause(GenericLexer lexer, QueryModel model, QueryModel masterModel) throws SqlException {
CharSequence tok = expectTableNameOrSubQuery(lexer);
// expect "(" in case of sub-query
......@@ -871,7 +1031,7 @@ public final class SqlParser {
tok = setModelAliasAndTimestamp(lexer, model);
} else {
lexer.unparse();
parseSelectFrom(lexer, model, masterModel);
parseSelectFrom(lexer, model, masterModel.getWithClauses());
tok = setModelAliasAndTimestamp(lexer, model);
// expect [latest by] (deprecated syntax)
......@@ -885,7 +1045,7 @@ public final class SqlParser {
int joinType;
while (tok != null && (joinType = joinStartSet.get(tok)) != -1) {
model.addJoinModel(parseJoin(lexer, tok, joinType, masterModel));
model.addJoinModel(parseJoin(lexer, tok, joinType, masterModel.getWithClauses()));
tok = optTok(lexer);
}
......@@ -1135,7 +1295,7 @@ public final class SqlParser {
throw err(lexer, "'select' or 'values' expected");
}
private QueryModel parseJoin(GenericLexer lexer, CharSequence tok, int joinType, QueryModel parent) throws SqlException {
private QueryModel parseJoin(GenericLexer lexer, CharSequence tok, int joinType, LowerCaseCharSequenceObjHashMap<WithClauseModel> parent) throws SqlException {
QueryModel joinModel = queryModelPool.next();
int errorPos = lexer.lastTokenPosition();
......@@ -1164,7 +1324,7 @@ public final class SqlParser {
tok = expectTableNameOrSubQuery(lexer);
if (Chars.equals(tok, '(')) {
joinModel.setNestedModel(parseAsSubQueryAndExpectClosingBrace(lexer, parent.getWithClauses()));
joinModel.setNestedModel(parseAsSubQueryAndExpectClosingBrace(lexer, parent));
} else {
lexer.unparse();
parseSelectFrom(lexer, joinModel, parent);
......@@ -1438,7 +1598,7 @@ public final class SqlParser {
}
}
private void parseSelectFrom(GenericLexer lexer, QueryModel model, QueryModel masterModel) throws SqlException {
private void parseSelectFrom(GenericLexer lexer, QueryModel model, LowerCaseCharSequenceObjHashMap<WithClauseModel> masterModel) throws SqlException {
final ExpressionNode expr = expr(lexer, model);
if (expr == null) {
throw SqlException.position(lexer.lastTokenPosition()).put("table name expected");
......@@ -1449,9 +1609,9 @@ public final class SqlParser {
case ExpressionNode.LITERAL:
case ExpressionNode.CONSTANT:
final ExpressionNode literal = literal(name, expr.position);
final WithClauseModel withClause = masterModel.getWithClause(name);
final WithClauseModel withClause = masterModel.get(name);
if (withClause != null) {
model.setNestedModel(parseWith(lexer, withClause, masterModel.getWithClauses()));
model.setNestedModel(parseWith(lexer, withClause, masterModel));
model.setAlias(literal);
} else {
model.setTableName(literal);
......@@ -1498,11 +1658,11 @@ public final class SqlParser {
return m;
}
private void parseWithClauses(GenericLexer lexer, QueryModel model) throws SqlException {
private void parseWithClauses(GenericLexer lexer, LowerCaseCharSequenceObjHashMap<WithClauseModel> model) throws SqlException {
do {
ExpressionNode name = expectLiteral(lexer);
if (model.getWithClause(name.token) != null) {
if (model.get(name.token) != null) {
throw SqlException.$(name.position, "duplicate name");
}
......@@ -1510,8 +1670,8 @@ public final class SqlParser {
expectTok(lexer, '(');
int lo = lexer.lastTokenPosition();
WithClauseModel wcm = withClauseModelPool.next();
wcm.of(lo + 1, parseAsSubQueryAndExpectClosingBrace(lexer, model.getWithClauses()));
model.addWithClause(name.token, wcm);
wcm.of(lo + 1, parseAsSubQueryAndExpectClosingBrace(lexer, model));
model.put(name.token, wcm);
CharSequence tok = optTok(lexer);
if (tok == null || !Chars.equals(tok, ',')) {
......@@ -1529,6 +1689,16 @@ public final class SqlParser {
return tok;
}
private ExecutionModel parseUpdate(GenericLexer lexer) throws SqlException {
lexer.unparse();
final QueryModel model = parseDmlUpdate(lexer);
final CharSequence tok = optTok(lexer);
if (tok == null || Chars.equals(tok, ';')) {
return model;
}
throw errUnexpected(lexer, tok);
}
private ExpressionNode rewriteCase(ExpressionNode parent) throws SqlException {
traversalAlgo.traverse(parent, rewriteCase0Ref);
return parent;
......@@ -1796,6 +1966,7 @@ public final class SqlParser {
tableAliasStop.add("group");
tableAliasStop.add("except");
tableAliasStop.add("intersect");
tableAliasStop.add("from");
//
columnAliasStop.add("from");
columnAliasStop.add(",");
......
......@@ -43,7 +43,7 @@ public abstract class AbstractVirtualFunctionRecordCursor implements RecordCurso
this.recordA = new VirtualRecord(functions);
this.recordB = new VirtualRecord(functions);
} else {
this.recordA = new VirtualRecordNoRowid(functions);
this.recordA = new VirtualRecord(functions);
this.recordB = null;
}
this.supportsRandomAccess = supportsRandomAccess;
......@@ -71,7 +71,7 @@ public abstract class AbstractVirtualFunctionRecordCursor implements RecordCurso
@Override
public Record getRecordB() {
if (recordB != null) {
if (supportsRandomAccess) {
return recordB;
}
throw new UnsupportedOperationException();
......
......@@ -140,7 +140,6 @@ public abstract class IntFunction implements ScalarFunction {
return Numbers.intToLong(getInt(rec));
}
@Override
public byte getGeoByte(Record rec) {
throw new UnsupportedOperationException();
......
......@@ -55,6 +55,10 @@ public class CastGeoHashToGeoHashFunctionFactory implements FunctionFactory {
final Function value = args.getQuick(0);
int srcType = value.getType();
int targetType = args.getQuick(1).getType();
return newInstance(position, value, srcType, targetType);
}
public static Function newInstance(int position, Function value, int srcType, int targetType) throws SqlException {
int srcBitsPrecision = ColumnType.getGeoHashBits(srcType);
int targetBitsPrecision = ColumnType.getGeoHashBits(targetType);
int shift = srcBitsPrecision - targetBitsPrecision;
......@@ -86,7 +90,7 @@ public class CastGeoHashToGeoHashFunctionFactory implements FunctionFactory {
.put("b)");
}
private Function castFunc(int shift, int targetType, Function value, int srcType) {
private static Function castFunc(int shift, int targetType, Function value, int srcType) {
switch (ColumnType.tagOf(srcType)) {
case ColumnType.GEOBYTE:
return new CastByteFunc(shift, targetType, value);
......
......@@ -59,7 +59,7 @@ public class CastStrToGeoHashFunctionFactory implements FunctionFactory {
return newInstance(argPosition, geoType, value);
}
public Function newInstance(int argPosition, int geoType, Function value) throws SqlException {
public static Function newInstance(int argPosition, int geoType, Function value) throws SqlException {
if (value.isConstant()) {
try {
final int bits = ColumnType.getGeoHashBits(geoType);
......
......@@ -74,6 +74,11 @@ public class CrossJoinRecordCursorFactory extends AbstractRecordCursorFactory {
return false;
}
@Override
public boolean supportsUpdateRowId(CharSequence tableName) {
return masterFactory.supportsUpdateRowId(tableName);
}
private static class CrossJoinRecordCursor implements NoRandomAccessRecordCursor {
private final JoinRecord record;
private final int columnSplit;
......
......@@ -98,6 +98,11 @@ public class HashJoinLightRecordCursorFactory extends AbstractRecordCursorFactor
return false;
}
@Override
public boolean supportsUpdateRowId(CharSequence tableName) {
return masterFactory.supportsUpdateRowId(tableName);
}
private void buildMapOfSlaveRecords(RecordCursor slaveCursor, SqlExecutionCircuitBreaker circuitBreaker) {
slaveChain.clear();
joinKeyMap.clear();
......
......@@ -96,6 +96,11 @@ public class HashJoinRecordCursorFactory extends AbstractRecordCursorFactory {
return false;
}
@Override
public boolean supportsUpdateRowId(CharSequence tableName) {
return masterFactory.supportsUpdateRowId(tableName);
}
private void buildMapOfSlaveRecords(RecordCursor slaveCursor, SqlExecutionCircuitBreaker circuitBreaker) {
HashOuterJoinRecordCursorFactory.buildMap(slaveCursor, slaveCursor.getRecord(), joinKeyMap, slaveKeySink, slaveChain, circuitBreaker);
}
......
......@@ -153,7 +153,12 @@ public class JoinRecord implements Record {
@Override
public long getRowId() {
throw new UnsupportedOperationException();
return master.getRowId();
}
@Override
public long getUpdateRowId() {
return master.getUpdateRowId();
}
@Override
......
......@@ -363,6 +363,11 @@ class CompiledFilterRecordCursor implements RecordCursor {
return Rows.toRowID(frameIndex, index);
}
@Override
public long getUpdateRowId() {
return pageAddressCache.toTableRowID(frameIndex, index);
}
@Override
public BinarySequence getBin(int columnIndex) {
final long dataPageAddress = pageAddressCache.getPageAddress(frameIndex, columnIndex);
......@@ -694,6 +699,7 @@ class CompiledFilterRecordCursor implements RecordCursor {
// Index page addresses and page sizes are stored only for variable length columns.
private LongList indexPageAddresses = new LongList();
private LongList pageSizes = new LongList();
private LongList pageRowIdOffsets = new LongList();
public PageAddressCache(CairoConfiguration configuration) {
cacheSizeThreshold = configuration.getSqlJitPageAddressCacheThreshold() / Long.BYTES;
......@@ -718,10 +724,12 @@ class CompiledFilterRecordCursor implements RecordCursor {
pageAddresses.clear();
indexPageAddresses.clear();
pageSizes.clear();
pageRowIdOffsets.clear();
} else {
pageAddresses = new LongList();
indexPageAddresses = new LongList();
pageSizes = new LongList();
pageRowIdOffsets = new LongList();
}
}
......@@ -737,6 +745,7 @@ class CompiledFilterRecordCursor implements RecordCursor {
pageSizes.add(frame.getPageSize(columnIndex));
}
}
pageRowIdOffsets.add(Rows.toRowID(frame.getPartitionIndex(), frame.getPartitionLo()));
}
public long getPageAddress(int frameIndex, int columnIndex) {
......@@ -767,5 +776,9 @@ class CompiledFilterRecordCursor implements RecordCursor {
}
return false;
}
public long toTableRowID(int frameIndex, long index) {
return pageRowIdOffsets.get(frameIndex) + index;
}
}
}
......@@ -96,6 +96,11 @@ public class CompiledFilterRecordCursorFactory implements RecordCursorFactory {
return true;
}
@Override
public boolean supportsUpdateRowId(CharSequence tableName) {
return factory.supportsUpdateRowId(tableName);
}
@Override
public boolean usesCompiledFilter() {
return true;
......
......@@ -127,6 +127,11 @@ public class DataFrameRecordCursorFactory extends AbstractDataFrameRecordCursorF
return cursor;
}
@Override
public boolean supportsUpdateRowId(CharSequence tableName) {
return dataFrameCursorFactory.supportTableRowId(tableName);
}
public static class TableReaderPageFrameCursor implements PageFrameCursor {
private final LongList columnPageNextAddress = new LongList();
private final LongList columnPageAddress = new LongList();
......
......@@ -72,6 +72,11 @@ public class FilteredRecordCursorFactory implements RecordCursorFactory {
return base.recordCursorSupportsRandomAccess();
}
@Override
public boolean supportsUpdateRowId(CharSequence tableName) {
return base.supportsUpdateRowId(tableName);
}
@Override
public boolean usesCompiledFilter() {
return base.usesCompiledFilter();
......
......@@ -108,6 +108,11 @@ class SelectedRecord implements Record {
return base.getRowId();
}
@Override
public long getUpdateRowId() {
return base.getUpdateRowId();
}
@Override
public short getShort(int col) {
return base.getShort(getColumnIndex(col));
......
......@@ -69,6 +69,11 @@ public class SelectedRecordCursorFactory extends AbstractRecordCursorFactory {
return base.usesCompiledFilter();
}
@Override
public boolean supportsUpdateRowId(CharSequence tableName) {
return base.supportsUpdateRowId(tableName);
}
public boolean hasDescendingOrder() {
return base.hasDescendingOrder();
}
......
......@@ -81,4 +81,9 @@ public class VirtualRecordCursorFactory extends AbstractRecordCursorFactory {
public boolean recordCursorSupportsRandomAccess() {
return supportsRandomAccess;
}
@Override
public boolean supportsUpdateRowId(CharSequence tableName) {
return baseFactory.supportsUpdateRowId(tableName);
}
}
......@@ -30,6 +30,7 @@ public interface ExecutionModel {
int RENAME_TABLE = 3;
int INSERT = 4;
int COPY = 5;
int UPDATE = 6;
int getModelType();
}
......@@ -86,7 +86,6 @@ public class QueryModel implements Mutable, ExecutionModel, AliasTranslator, Sin
private final ArrayDeque<ExpressionNode> sqlNodeStack = new ArrayDeque<>();
private final LowerCaseCharSequenceIntHashMap orderHash = new LowerCaseCharSequenceIntHashMap(4, 0.5, -1);
private final ObjList<ExpressionNode> joinColumns = new ObjList<>(4);
private final LowerCaseCharSequenceObjHashMap<WithClauseModel> withClauses = new LowerCaseCharSequenceObjHashMap<>();
private final ObjList<ExpressionNode> sampleByFill = new ObjList<>();
private ExpressionNode sampleByTimezoneName = null;
private ExpressionNode sampleByOffset = null;
......@@ -120,6 +119,14 @@ public class QueryModel implements Mutable, ExecutionModel, AliasTranslator, Sin
private int modelPosition = 0;
private int orderByAdviceMnemonic;
private int tableId;
private boolean isUpdateModel;
private final LowerCaseCharSequenceObjHashMap<WithClauseModel> withClauseModel = new LowerCaseCharSequenceObjHashMap<>();
private int modelType;
private final ObjList<ExpressionNode> updateSetColumns = new ObjList<>();
private final IntList updateTableColumnTypes = new IntList();
private final ObjList<CharSequence> updateTableColumnNames = new ObjList<>();
private QueryModel updateTableModel;
private String updateTableName;
private QueryModel() {
joinModels.add(this);
......@@ -153,6 +160,11 @@ public class QueryModel implements Mutable, ExecutionModel, AliasTranslator, Sin
aliasToColumnMap.put(alias, column);
}
public void addUpdateTableColumnMetadata(int columnType, String columnName) {
updateTableColumnTypes.add(columnType);
updateTableColumnNames.add(columnName);
}
public void addGroupBy(ExpressionNode node) {
groupBy.add(node);
}
......@@ -189,14 +201,6 @@ public class QueryModel implements Mutable, ExecutionModel, AliasTranslator, Sin
}
}
public void addWithClause(CharSequence name, WithClauseModel model) {
withClauses.put(name, model);
}
public void addWithClauses(LowerCaseCharSequenceObjHashMap<WithClauseModel> parentWithClauses) {
withClauses.putAll(parentWithClauses);
}
public void clear() {
bottomUpColumns.clear();
aliasToColumnNameMap.clear();
......@@ -230,7 +234,7 @@ public class QueryModel implements Mutable, ExecutionModel, AliasTranslator, Sin
timestamp = null;
sqlNodeStack.clear();
joinColumns.clear();
withClauses.clear();
withClauseModel.clear();
selectModelType = SELECT_MODEL_NONE;
columnNameToAliasMap.clear();
tableNameFunction = null;
......@@ -246,6 +250,13 @@ public class QueryModel implements Mutable, ExecutionModel, AliasTranslator, Sin
topDownColumns.clear();
topDownNameSet.clear();
aliasToColumnMap.clear();
isUpdateModel = false;
modelType = ExecutionModel.QUERY;
updateSetColumns.clear();
updateTableColumnTypes.clear();
updateTableColumnNames.clear();
updateTableModel = null;
updateTableName = null;
}
public void clearColumnMapStructs() {
......@@ -304,6 +315,12 @@ public class QueryModel implements Mutable, ExecutionModel, AliasTranslator, Sin
this.orderByDirectionAdvice.addAll(orderByDirection);
}
public void copyUpdateTableMetadata(QueryModel updateTableModel) {
this.updateTableModel = updateTableModel;
this.tableId = updateTableModel.tableId;
this.tableVersion = updateTableModel.tableVersion;
}
public QueryColumn findBottomUpColumnByAst(ExpressionNode node) {
for (int i = 0, n = bottomUpColumns.size(); i < n; i++) {
QueryColumn qc = bottomUpColumns.getQuick(i);
......@@ -318,6 +335,30 @@ public class QueryModel implements Mutable, ExecutionModel, AliasTranslator, Sin
return alias;
}
public IntList getUpdateTableColumnTypes() {
return this.updateTableModel != null ? this.updateTableModel.getUpdateTableColumnTypes() : updateTableColumnTypes;
}
public ObjList<CharSequence> getUpdateTableColumnNames() {
return this.updateTableModel != null ? this.updateTableModel.getUpdateTableColumnNames() : updateTableColumnNames;
}
public ObjList<ExpressionNode> getUpdateExpressions() {
return updateSetColumns;
}
public String getUpdateTableName() {
return updateTableName;
}
public LowerCaseCharSequenceObjHashMap<WithClauseModel> getWithClauses() {
return withClauseModel;
}
public boolean isUpdate() {
return isUpdateModel;
}
public ExpressionNode getSampleByUnit() {
return sampleByUnit;
}
......@@ -410,6 +451,10 @@ public class QueryModel implements Mutable, ExecutionModel, AliasTranslator, Sin
return joinCriteria;
}
public void setIsUpdate(boolean isUpdate) {
this.isUpdateModel = isUpdate;
}
public void setJoinCriteria(ExpressionNode joinCriteria) {
this.joinCriteria = joinCriteria;
}
......@@ -464,7 +509,7 @@ public class QueryModel implements Mutable, ExecutionModel, AliasTranslator, Sin
@Override
public int getModelType() {
return ExecutionModel.QUERY;
return modelType;
}
public CharSequence getName() {
......@@ -483,6 +528,10 @@ public class QueryModel implements Mutable, ExecutionModel, AliasTranslator, Sin
return nestedModel;
}
public void setModelType(int modelType) {
this.modelType = modelType;
}
public void setNestedModel(QueryModel nestedModel) {
this.nestedModel = nestedModel;
}
......@@ -637,16 +686,12 @@ public class QueryModel implements Mutable, ExecutionModel, AliasTranslator, Sin
return whereClause;
}
public void setWhereClause(ExpressionNode whereClause) {
this.whereClause = whereClause;
}
public WithClauseModel getWithClause(CharSequence name) {
return withClauses.get(name);
public void setUpdateTableName(String tableName) {
this.updateTableName = tableName;
}
public LowerCaseCharSequenceObjHashMap<WithClauseModel> getWithClauses() {
return withClauses;
public void setWhereClause(ExpressionNode whereClause) {
this.whereClause = whereClause;
}
public boolean isDistinct() {
......@@ -750,7 +795,38 @@ public class QueryModel implements Mutable, ExecutionModel, AliasTranslator, Sin
@Override
public void toSink(CharSink sink) {
toSink0(sink, false);
if (modelType == ExecutionModel.QUERY) {
toSink0(sink, false);
} else if (modelType == ExecutionModel.UPDATE) {
updateToSink(sink);
}
}
private void updateToSink(CharSink sink) {
sink.put("update ");
tableName.toSink(sink);
if (alias != null) {
sink.put(" as");
aliasToSink(alias.token, sink);
}
sink.put(" set ");
for (int i = 0, n = getUpdateExpressions().size(); i < n; i++) {
if (i > 0) {
sink.put(',');
}
CharSequence columnExpr = getUpdateExpressions().get(i).token;
sink.put(columnExpr);
sink.put(" = ");
QueryColumn setColumn = getNestedModel().getColumns().getQuick(i);
setColumn.getAst().toSink(sink);
}
if (getNestedModel() != null) {
sink.put(" from (");
getNestedModel().toSink(sink);
sink.put(")");
}
}
@Override
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.griffin.update;
import io.questdb.cairo.*;
import io.questdb.cairo.sql.*;
import io.questdb.cairo.sql.Record;
import io.questdb.cairo.vm.Vm;
import io.questdb.cairo.vm.api.MemoryCMARW;
import io.questdb.griffin.SqlException;
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.std.*;
import io.questdb.std.str.Path;
import java.io.Closeable;
import static io.questdb.cairo.TableUtils.dFile;
public class InplaceUpdateExecution implements Closeable {
private static final Log LOG = LogFactory.getLog(InplaceUpdateExecution.class);
private final FilesFacade ff;
private final int rootLen;
private final IntList updateToColumnMap = new IntList();
private final ObjList<MemoryCMARW> updateMemory = new ObjList<>();
private final long dataAppendPageSize;
private Path path;
public InplaceUpdateExecution(CairoConfiguration configuration) {
ff = configuration.getFilesFacade();
path = new Path().of(configuration.getRoot());
rootLen = path.length();
dataAppendPageSize = configuration.getDataAppendPageSize();
}
@Override
public void close() {
path = Misc.free(path);
}
// This is very hacky way to update in place for testing only
// TODO: rewrite to production grade
public void executeUpdate(TableWriter tableWriter, UpdateStatement updateStatement, SqlExecutionContext executionContext) throws SqlException {
if (tableWriter.inTransaction()) {
LOG.info().$("committing current transaction before UPDATE execution [table=").$(tableWriter.getTableName()).I$();
tableWriter.commit();
}
TableWriterMetadata writerMetadata = tableWriter.getMetadata();
// Check that table structure hasn't changed between planning and executing the UPDATE
if (writerMetadata.getId() != updateStatement.getTableId() || tableWriter.getStructureVersion() != updateStatement.getTableVersion()) {
throw ReaderOutOfDateException.of(tableWriter.getTableName());
}
RecordMetadata updateToMetadata = updateStatement.getUpdateToDataCursorFactory().getMetadata();
int updateToColumnCount = updateToMetadata.getColumnCount();
// Build index column map from table to update to values returned from the update statement row cursors
updateToColumnMap.clear();
for (int i = 0; i < updateToColumnCount; i++) {
CharSequence columnName = updateToMetadata.getColumnName(i);
int tableColumnIndex = writerMetadata.getColumnIndex(columnName);
assert tableColumnIndex >= 0;
updateToColumnMap.add(tableColumnIndex);
}
// Create update memory list of all columns to be updated
initUpdateMemory(updateToColumnCount);
// Start execution frame by frame
RecordCursorFactory updateStatementDataCursorFactory = updateStatement.getUpdateToDataCursorFactory();
// Track how many records updated
long rowsUpdated = 0;
// Start row by row updates
try (RecordCursor recordCursor = updateStatementDataCursorFactory.getCursor(executionContext)) {
Record masterRecord = recordCursor.getRecord();
long currentPartitionIndex = -1L;
long lastRowId = -1;
while (recordCursor.hasNext()) {
long rowId = masterRecord.getUpdateRowId();
// Some joins expand results set and returns same row multiple times
if (rowId == lastRowId) {
continue;
}
lastRowId = rowId;
int partitionIndex = Rows.toPartitionIndex(rowId);
long partitionRowId = Rows.toLocalRowID(rowId);
if (partitionIndex != currentPartitionIndex) {
// Map columns to be updated for RW
openPartitionColumnsForUpdate(tableWriter, updateMemory, partitionIndex, updateToColumnMap);
currentPartitionIndex = partitionIndex;
}
// Update values in-place
updateColumnValues(
writerMetadata,
updateToColumnMap,
updateToColumnCount,
updateMemory,
partitionRowId,
masterRecord);
rowsUpdated++;
}
} finally {
for(int i = 0; i < updateMemory.size(); i++) {
updateMemory.getQuick(i).close(false);
}
}
if (rowsUpdated > 0) {
tableWriter.commit();
}
}
private void updateColumnValues(
RecordMetadata metadata,
IntList updateToColumnMap,
int columnCount,
ObjList<MemoryCMARW> updateMemory,
long rowId,
Record record
) throws SqlException {
for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) {
int columnType = metadata.getColumnType(updateToColumnMap.get(columnIndex));
MemoryCMARW primaryColMem = updateMemory.get(columnIndex);
switch (ColumnType.tagOf(columnType)) {
case ColumnType.INT:
primaryColMem.putInt(rowId << 2, record.getInt(columnIndex));
break;
case ColumnType.FLOAT:
primaryColMem.putFloat(rowId << 2, record.getFloat(columnIndex));
break;
case ColumnType.LONG:
primaryColMem.putLong(rowId << 3, record.getLong(columnIndex));
break;
case ColumnType.TIMESTAMP:
primaryColMem.putLong(rowId << 3, record.getTimestamp(columnIndex));
break;
case ColumnType.DATE:
primaryColMem.putLong(rowId << 3, record.getDate(columnIndex));
break;
case ColumnType.DOUBLE:
primaryColMem.putDouble(rowId << 3, record.getDouble(columnIndex));
break;
case ColumnType.SHORT:
primaryColMem.putShort(rowId << 1, record.getShort(columnIndex));
break;
case ColumnType.CHAR:
primaryColMem.putChar(rowId << 1, record.getChar(columnIndex));
break;
case ColumnType.BYTE:
case ColumnType.BOOLEAN:
primaryColMem.putByte(rowId, record.getByte(columnIndex));
break;
case ColumnType.GEOBYTE:
primaryColMem.putByte(rowId, record.getGeoByte(columnIndex));
break;
case ColumnType.GEOSHORT:
primaryColMem.putShort(rowId << 1, record.getGeoShort(columnIndex));
break;
case ColumnType.GEOINT:
primaryColMem.putInt(rowId << 2, record.getGeoInt(columnIndex));
break;
case ColumnType.GEOLONG:
primaryColMem.putLong(rowId << 3, record.getGeoLong(columnIndex));
break;
default:
throw SqlException.$(0, "Column type ")
.put(ColumnType.nameOf(columnType))
.put(" not supported for updates");
}
}
}
private void initUpdateMemory(int columnCount) {
for(int i = updateMemory.size(); i < columnCount; i++) {
updateMemory.add(Vm.getCMARWInstance());
}
}
private void openPartitionColumnsForUpdate(TableWriter tableWriter, ObjList<MemoryCMARW> updateMemory, int partitionIndex, IntList columnMap) {
long partitionTimestamp = tableWriter.getPartitionTimestamp(partitionIndex);
RecordMetadata metadata = tableWriter.getMetadata();
try {
path.concat(tableWriter.getTableName());
TableUtils.setPathForPartition(path, tableWriter.getPartitionBy(), partitionTimestamp, false);
int pathTrimToLen = path.length();
for (int i = 0, n = columnMap.size(); i < n; i++) {
CharSequence name = metadata.getColumnName(columnMap.get(i));
MemoryCMARW colMem = updateMemory.get(i);
colMem.close(false);
colMem.of(ff, dFile(path.trimTo(pathTrimToLen), name), dataAppendPageSize, -1, MemoryTag.MMAP_TABLE_WRITER);
}
} finally {
path.trimTo(rootLen);
}
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.griffin.update;
import io.questdb.cairo.sql.RecordCursorFactory;
import io.questdb.std.Misc;
import java.io.Closeable;
public class UpdateStatement implements Closeable {
private final String tableName;
private final int tableId;
private final long tableVersion;
private RecordCursorFactory updateToDataCursorFactory;
public UpdateStatement(
String tableName,
int tableId,
long tableVersion,
RecordCursorFactory updateToCursorFactory
) {
this.tableName = tableName;
this.tableId = tableId;
this.tableVersion = tableVersion;
this.updateToDataCursorFactory = updateToCursorFactory;
}
@Override
public void close() {
updateToDataCursorFactory = Misc.free(updateToDataCursorFactory);
}
public int getTableId() {
return tableId;
}
public CharSequence getTableName() {
return tableName;
}
public long getTableVersion() {
return tableVersion;
}
public RecordCursorFactory getUpdateToDataCursorFactory() {
return updateToDataCursorFactory;
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.griffin.update;
import io.questdb.cairo.sql.Record;
import java.io.Closeable;
public interface UpdateStatementMasterCursor extends Closeable {
void setMaster(Record master);
Record getRecord();
boolean hasNext();
@Override
void close();
}
......@@ -89,6 +89,8 @@ open module io.questdb {
exports io.questdb.metrics;
exports io.questdb.cairo.vm.api;
exports io.questdb.cairo.mig;
exports io.questdb.griffin.engine.join;
exports io.questdb.griffin.update;
provides FunctionFactory with
// test functions
......
......@@ -160,6 +160,11 @@ public class AbstractCairoTest {
return writerAsyncCommandMaxTimeout < 0 ? super.getWriterAsyncCommandMaxTimeout() : writerAsyncCommandMaxTimeout;
}
@Override
public boolean enableDevelopmentUpdates() {
return true;
}
@Override
public int getSqlJitMode() {
// JIT compiler is a beta feature and thus is disabled by default,
......
......@@ -433,12 +433,6 @@ public class AbstractGriffinTest extends AbstractCairoTest {
TestUtils.assertEquals(expected, sink);
}
} else {
try {
record.getRowId();
Assert.fail();
} catch (UnsupportedOperationException ignore) {
}
try {
cursor.getRecordB();
Assert.fail();
......@@ -931,8 +925,8 @@ public class AbstractGriffinTest extends AbstractCairoTest {
compile(query, sqlExecutionContext);
Assert.fail();
} catch (SqlException e) {
Assert.assertEquals(Chars.toString(query), expectedPosition, e.getPosition());
TestUtils.assertContains(e.getFlyweightMessage(), expectedMessage);
Assert.assertEquals(Chars.toString(query), expectedPosition, e.getPosition());
}
Assert.assertEquals(0, engine.getBusyReaderCount());
Assert.assertEquals(0, engine.getBusyWriterCount());
......
......@@ -54,8 +54,8 @@ public class AbstractSqlParserTest extends AbstractGriffinTest {
compiler.compile(query, sqlExecutionContext);
Assert.fail("Exception expected");
} catch (SqlException e) {
Assert.assertEquals(position, e.getPosition());
TestUtils.assertContains(e.getFlyweightMessage(), contains);
Assert.assertEquals(position, e.getPosition());
}
});
} finally {
......@@ -143,7 +143,7 @@ public class AbstractSqlParserTest extends AbstractGriffinTest {
ExecutionModel model = compiler.testCompileModel(query, sqlExecutionContext);
Assert.assertEquals(model.getModelType(), modelType);
((Sinkable) model).toSink(sink);
if (model instanceof QueryModel) {
if (model instanceof QueryModel && model.getModelType() == ExecutionModel.QUERY) {
validateTopDownColumns((QueryModel) model);
}
TestUtils.assertEquals(expected, sink);
......@@ -154,6 +154,10 @@ public class AbstractSqlParserTest extends AbstractGriffinTest {
assertModel(expected, query, ExecutionModel.QUERY, tableModels);
}
protected void assertUpdate(String expected, String query, TableModel... tableModels) throws SqlException {
assertModel(expected, query, ExecutionModel.UPDATE, tableModels);
}
private void createModelsAndRun(SqlParserTest.CairoAware runnable, TableModel... tableModels) throws SqlException {
try {
for (int i = 0, n = tableModels.length; i < n; i++) {
......
......@@ -5003,7 +5003,7 @@ public class SqlCodeGeneratorTest extends AbstractGriffinTest {
" timestamp_sequence(0, 100000000000) k" +
" from long_sequence(20)" +
")",
5,
"with tab as (x where b in ('BB')) tab ".length(),
"latest by query does not provide dedicated TIMESTAMP column"
);
}
......
......@@ -3174,7 +3174,7 @@ public class SqlCompilerTest extends AbstractGriffinTest {
@Test
public void testInsertAsSelectInconvertibleList3() throws Exception {
testInsertAsSelectError("create table x (a SHORT, b INT, n TIMESTAMP)",
testInsertAsSelectError("create table x (a BYTE, b INT, n TIMESTAMP)",
"insert into x (b,a)" +
"select" +
" rnd_int()," +
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package io.questdb.griffin;
import io.questdb.cairo.ColumnType;
import io.questdb.cairo.PartitionBy;
import io.questdb.cairo.TableModel;
import org.junit.Test;
public class SqlParserUpdateTest extends AbstractSqlParserTest {
@Test
public void testUpdateAmbiguousColumnFails() throws Exception {
assertSyntaxError(
"update tblx set y = y from tbly y where tblx.x = tbly.y and tblx.x > 10",
"update tblx set y = ".length(),
"Ambiguous column name",
partitionedModelOf("tblx")
.col("t", ColumnType.TIMESTAMP)
.col("x", ColumnType.INT)
.col("y", ColumnType.INT)
.timestamp(),
partitionedModelOf("tbly").col("t", ColumnType.TIMESTAMP).col("y", ColumnType.INT)
);
}
@Test
public void testUpdateDesignatedTimestampFails() throws Exception {
assertSyntaxError("update x set tt = 1",
13,
"Designated timestamp column cannot be updated",
partitionedModelOf("x")
.col("t", ColumnType.TIMESTAMP)
.timestamp("tt"));
}
@Test
public void testUpdateNonPartitionedTableFails() throws Exception {
assertSyntaxError(
"update tblx set x = 1",
7,
"UPDATE query can only be executed on partitioned tables",
modelOf("tblx")
.col("t", ColumnType.TIMESTAMP)
.col("x", ColumnType.INT)
.col("s", ColumnType.SYMBOL)
);
}
@Test
public void testUpdateNoSetFails() throws Exception {
assertSyntaxError(
"update tblx x = 1",
"update tblx x ".length(),
"SET expected",
partitionedModelOf("tblx")
.col("t", ColumnType.TIMESTAMP)
.col("x", ColumnType.INT)
.col("s", ColumnType.SYMBOL)
.timestamp("tt")
);
}
@Test
public void testUpdateNoTimestampFails() throws Exception {
assertSyntaxError(
"update tblx set x = 1",
7,
"PDATE query can only be executed on tables with Designated timestamp",
partitionedModelOf("tblx")
.col("t", ColumnType.TIMESTAMP)
.col("x", ColumnType.INT)
.col("s", ColumnType.SYMBOL)
);
}
@Test
public void testUpdateSameColumnTwiceFails() throws Exception {
assertSyntaxError(
"update tblx set x = 1, s = 'abc', x = 2",
"update tblx set x = 1, s = 'abc', ".length(),
"Duplicate column x in SET clause",
partitionedModelOf("tblx")
.col("t", ColumnType.TIMESTAMP)
.col("x", ColumnType.INT)
.col("s", ColumnType.SYMBOL)
.timestamp()
);
}
@Test
public void testUpdateSingleTableToBindVariable() throws Exception {
assertUpdate("update x set tt = $1 from (select-virtual $1 tt from (x timestamp (timestamp)))",
"update x set tt = $1",
partitionedModelOf("x")
.col("t", ColumnType.TIMESTAMP)
.col("tt", ColumnType.TIMESTAMP)
.timestamp());
}
@Test
public void testUpdateSingleTableToConst() throws Exception {
assertUpdate("update x set tt = 1 from (select-virtual 1 tt from (x timestamp (timestamp)))",
"update x set tt = 1",
partitionedModelOf("x")
.col("t", ColumnType.TIMESTAMP)
.col("tt", ColumnType.TIMESTAMP)
.timestamp());
}
@Test
public void testUpdateSingleTableWithAlias() throws Exception {
assertUpdate("update tblx as x set tt = tt + 1 from (select-virtual tt + 1 tt from (select [tt, t] from tblx x timestamp (timestamp) where t = NULL))",
"update tblx x set tt = tt + 1 WHERE x.t = NULL",
partitionedModelOf("tblx")
.col("t", ColumnType.TIMESTAMP)
.col("tt", ColumnType.TIMESTAMP)
.timestamp());
}
@Test
public void testUpdateSingleTableEndsSemicolon() throws Exception {
assertUpdate("update tblx set tt = tt + 1 from (select-virtual tt + 1 tt from (select [tt, t] from tblx timestamp (timestamp) where t = NULL))",
"update tblx set tt = tt + 1 WHERE t = NULL;",
partitionedModelOf("tblx")
.col("t", ColumnType.TIMESTAMP)
.col("tt", ColumnType.TIMESTAMP)
.timestamp());
}
@Test
public void testUpdateSingleTableWithJoinInFrom() throws Exception {
assertUpdate("update tblx set tt = tt + 1 from (select-virtual tt + 1 tt from (select [tt, x] from tblx timestamp (timestamp) join select [y] from tbly y on y = x where x > 10))",
"update tblx set tt = tt + 1 from tbly y where x = y and x > 10",
partitionedModelOf("tblx")
.col("t", ColumnType.TIMESTAMP)
.col("x", ColumnType.INT)
.col("tt", ColumnType.INT)
.timestamp(),
partitionedModelOf("tbly").col("t", ColumnType.TIMESTAMP).col("y", ColumnType.INT));
}
@Test
public void testUpdateSingleTableWithJoinAndFiltering() throws Exception {
assertUpdate("update tblx set tt = 1 from (select-virtual 1 tt from (select [x] from tblx timestamp (timestamp) join (select [y, t] from tbly y where t > 100) y on y = x where x > 10))",
"update tblx set tt = 1 from tbly y where x = y and x > 10 and y.t > 100",
partitionedModelOf("tblx")
.col("t", ColumnType.TIMESTAMP)
.col("x", ColumnType.INT)
.col("tt", ColumnType.INT)
.timestamp(),
partitionedModelOf("tbly").col("t", ColumnType.TIMESTAMP).col("y", ColumnType.INT));
}
@Test
public void testUpdateSingleTableWithJoinAndConstFiltering() throws Exception {
assertUpdate("update tblx set tt = 1 from (select-virtual 1 tt from (select [x] from tblx timestamp (timestamp) join select [y] from tbly y on y = x where x > 10 const-where 100 > 100))",
"update tblx set tt = 1 from tbly y where x = y and x > 10 and 100 > 100",
partitionedModelOf("tblx")
.col("t", ColumnType.TIMESTAMP)
.col("x", ColumnType.INT)
.col("tt", ColumnType.INT)
.timestamp(),
partitionedModelOf("tbly").col("t", ColumnType.TIMESTAMP).col("y", ColumnType.INT));
}
@Test
public void testUpdateSingleTableWithJoinAndNestedSampleBy() throws Exception {
assertUpdate("update tblx set tt = 1 from (select-virtual 1 tt from (select [x] from tblx timestamp (timestamp) join select [y] from (select-group-by [first(y) y, ts] ts, first(y) y from (select [y, ts] from tbly timestamp (ts)) sample by 1h) y on y = x))",
"update tblx set tt = 1 from (select ts, first(y) as y from tbly SAMPLE BY 1h) y where x = y",
partitionedModelOf("tblx")
.col("t", ColumnType.TIMESTAMP)
.col("x", ColumnType.INT)
.col("tt", ColumnType.INT)
.timestamp(),
partitionedModelOf("tbly")
.col("t", ColumnType.TIMESTAMP)
.col("y", ColumnType.INT)
.timestamp("ts")
);
}
@Test
public void testUpdateWithJoinKeywordFails() throws Exception {
assertSyntaxError(
"update tblx set tt = 1 from tblx join tbly where x = y and x > 10",
"update tblx set tt = 1 from tblx ".length(),
"JOIN is not supported on UPDATE statement",
partitionedModelOf("tblx")
.col("t", ColumnType.TIMESTAMP)
.col("x", ColumnType.INT)
.col("tt", ColumnType.INT)
.timestamp(),
partitionedModelOf("tbly").col("t", ColumnType.TIMESTAMP).col("y", ColumnType.INT)
);
}
@Test
public void testUpdateJoinTableWithDoubleFromFails() throws Exception {
assertSyntaxError(
"update tblx set tt = 1 from tblx from tbly where x = y and x > 10",
"update tblx set tt = 1 from tblx ".length(),
"unexpected token: from",
partitionedModelOf("tblx")
.col("t", ColumnType.TIMESTAMP)
.col("x", ColumnType.INT)
.col("tt", ColumnType.INT)
.timestamp(),
partitionedModelOf("tbly").col("t", ColumnType.TIMESTAMP).col("y", ColumnType.INT));
}
@Test
public void testUpdateJoinInvalidSyntaxFails() throws Exception {
assertSyntaxError(
"update tblx set tt = 1 join tblx on x = y and x > 10",
"update tblx set tt = 1 ".length(),
"FROM, WHERE or EOF expected",
partitionedModelOf("tblx")
.col("t", ColumnType.TIMESTAMP)
.col("x", ColumnType.INT)
.col("tt", ColumnType.INT)
.timestamp(),
partitionedModelOf("tbly").col("t", ColumnType.TIMESTAMP).col("y", ColumnType.INT));
}
@Test
public void testUpdateEmptyWhereFails() throws Exception {
assertSyntaxError(
"update tblx set tt = 1 where ",
"update tblx set tt = 1 where".length(),
"empty where clause",
partitionedModelOf("tblx")
.col("t", ColumnType.TIMESTAMP)
.col("x", ColumnType.INT)
.col("tt", ColumnType.INT)
.timestamp());
}
@Test
public void testUpdateSingleTableWithWhere() throws Exception {
assertUpdate("update x set tt = t from (select-choose t tt from (select [t] from x timestamp (timestamp) where t > '2005-04-02T12:00:00'))",
"update x set tt = t where t > '2005-04-02T12:00:00'",
partitionedModelOf("x")
.col("t", ColumnType.TIMESTAMP)
.col("tt", ColumnType.TIMESTAMP)
.timestamp()
);
}
@Test
public void testUpdateTwoColumnsToConst() throws Exception {
assertUpdate("update x set tt = 1,x = 2 from (select-virtual 1 tt, 2 x from (x timestamp (timestamp)))",
"update x set tt = 1, x = 2",
partitionedModelOf("x")
.col("t", ColumnType.TIMESTAMP)
.col("tt", ColumnType.TIMESTAMP)
.col("x", ColumnType.INT)
.timestamp());
}
@Test
public void testUpdateWithAggregatesFails() throws Exception {
assertSyntaxError(
"update tblx as xx set tt = count() where x > 10",
"update tblx as xx set tt = ".length(),
"Unsupported function in SET clause",
partitionedModelOf("tblx")
.col("t", ColumnType.TIMESTAMP)
.col("x", ColumnType.INT)
.col("tt", ColumnType.INT)
.col("s", ColumnType.SYMBOL)
.timestamp()
);
}
@Test
public void testUpdateWithInvalidColumnInSetLeftFails() throws Exception {
assertSyntaxError(
"update tblx as xx set invalidcol = t where x > 10",
"update tblx as xx set ".length(),
"Invalid column: invalidcol",
partitionedModelOf("tblx")
.col("t", ColumnType.TIMESTAMP)
.col("x", ColumnType.INT)
.col("s", ColumnType.SYMBOL)
.timestamp()
);
}
@Test
public void testUpdateWithInvalidColumnInSetRightFails() throws Exception {
assertSyntaxError(
"update tblx as xx set t = invalidcol where x > 10",
"update tblx as xx set t = ".length(),
"Invalid column: invalidcol",
partitionedModelOf("tblx")
.col("t", ColumnType.TIMESTAMP)
.col("x", ColumnType.INT)
.col("s", ColumnType.SYMBOL)
.timestamp()
);
}
@Test
public void testUpdateWithInvalidColumnInWhereFails() throws Exception {
assertSyntaxError(
"update tblx as xx set tt = t + 1 where invalidcol > 10",
"update tblx as xx set tt = t + 1 where ".length(),
"Invalid column: invalidcol",
partitionedModelOf("tblx")
.col("t", ColumnType.TIMESTAMP)
.col("x", ColumnType.INT)
.col("s", ColumnType.SYMBOL)
.timestamp()
);
}
@Test
public void testUpdateWithJoinAndTableAlias() throws Exception {
assertUpdate("update tblx as xx set tt = 1 from (select-virtual 1 tt from (select [x] from tblx xx timestamp (timestamp) join select [y] from tbly y on y = xx.x where x > 10))",
"update tblx as xx set tt = 1 from tbly y where xx.x = y and x > 10",
partitionedModelOf("tblx")
.col("t", ColumnType.TIMESTAMP)
.col("x", ColumnType.INT)
.col("tt", ColumnType.INT)
.timestamp(),
partitionedModelOf("tbly").col("t", ColumnType.TIMESTAMP).col("y", ColumnType.INT));
}
@Test
public void testUpdateWithLatestByFails() throws Exception {
assertSyntaxError(
"update tblx as xx set tt = 1 where x > 10 LATEST BY s",
"update tblx as xx set tt = 1 where x > 10 ".length(),
"unexpected token: LATEST",
partitionedModelOf("tblx")
.col("t", ColumnType.TIMESTAMP)
.col("x", ColumnType.INT)
.col("s", ColumnType.SYMBOL)
.timestamp()
);
}
@Test
public void testUpdateWithLimitFails() throws Exception {
assertSyntaxError(
"update tblx as xx set tt = 1 from tbly y where xx.x = y and x > 10 LIMIT 10",
"update tblx as xx set tt = 1 from tbly y where xx.x = y and x > 10 ".length(),
"unexpected token: LIMIT",
partitionedModelOf("tblx").col("t", ColumnType.TIMESTAMP).col("x", ColumnType.INT),
partitionedModelOf("tbly").col("t", ColumnType.TIMESTAMP).col("y", ColumnType.INT));
}
@Test
public void testUpdateWithLimitInJoin() throws Exception {
assertUpdate("update tblx as xx set tt = 1 from (select-virtual 1 tt from (select [x] from tblx xx timestamp (timestamp) join select [y] from (select-choose [y] t, y from (select [y] from tbly) limit 10) y on y = xx.x where x > 10))",
"update tblx as xx set tt = 1 from (tbly LIMIT 10) y where xx.x = y and x > 10",
partitionedModelOf("tblx")
.col("t", ColumnType.TIMESTAMP)
.col("x", ColumnType.INT)
.col("tt", ColumnType.INT)
.timestamp(),
partitionedModelOf("tbly").col("t", ColumnType.TIMESTAMP).col("y", ColumnType.INT));
}
@Test
public void testUpdateWithSampleByFails() throws Exception {
assertSyntaxError(
"update tblx as xx set tt = 1 where x > 10 SAMPLE BY 1h",
"update tblx as xx set tt = 1 where x > 10 ".length(),
"unexpected token: SAMPLE",
partitionedModelOf("tblx").col("t", ColumnType.TIMESTAMP).col("x", ColumnType.INT),
partitionedModelOf("tbly").col("t", ColumnType.TIMESTAMP).col("y", ColumnType.INT));
}
private static TableModel partitionedModelOf(String tableName) {
return new TableModel(configuration, tableName, PartitionBy.DAY);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册