提交 a175dcf7 编写于 作者: V Vlad Ilyushchenko

GRIFFIN: support multiple columns in 'LATEST BY' clause

上级 a5368b49
......@@ -46,6 +46,14 @@ import static io.questdb.griffin.model.ExpressionNode.FUNCTION;
public class SqlCodeGenerator {
private static final IntHashSet limitTypes = new IntHashSet();
static {
limitTypes.add(ColumnType.LONG);
limitTypes.add(ColumnType.BYTE);
limitTypes.add(ColumnType.SHORT);
limitTypes.add(ColumnType.INT);
}
private final WhereClauseParser filterAnalyser = new WhereClauseParser();
private final FunctionParser functionParser;
private final CairoEngine engine;
......@@ -53,7 +61,6 @@ public class SqlCodeGenerator {
// this list is used to generate record sinks
private final ListColumnFilter listColumnFilterA = new ListColumnFilter();
private final ListColumnFilter listColumnFilterB = new ListColumnFilter();
private final SingleColumnType singleColumnType = new SingleColumnType();
private final CairoConfiguration configuration;
private final RecordComparatorCompiler recordComparatorCompiler;
private final IntHashSet intHashSet = new IntHashSet();
......@@ -61,6 +68,7 @@ public class SqlCodeGenerator {
private final ArrayColumnTypes valueTypes = new ArrayColumnTypes();
private final EntityColumnFilter entityColumnFilter = new EntityColumnFilter();
private boolean fullFatJoins = false;
public SqlCodeGenerator(
CairoEngine engine,
CairoConfiguration configuration,
......@@ -569,13 +577,11 @@ public class SqlCodeGenerator {
QueryModel model,
TableReader reader,
RecordMetadata metadata,
int latestByIndex,
String tableName,
IntrinsicModel intrinsicModel,
Function filter,
SqlExecutionContext executionContext
) throws SqlException {
final boolean indexed = metadata.isColumnIndexed(latestByIndex);
final DataFrameCursorFactory dataFrameCursorFactory;
if (intrinsicModel.intervals != null) {
dataFrameCursorFactory = new IntervalBwdDataFrameCursorFactory(engine, tableName, model.getTableVersion(), intrinsicModel.intervals);
......@@ -583,130 +589,133 @@ public class SqlCodeGenerator {
dataFrameCursorFactory = new FullBwdDataFrameCursorFactory(engine, tableName, model.getTableVersion());
}
if (intrinsicModel.keyColumn != null) {
// key column must always be the same as latest by column
assert latestByIndex == metadata.getColumnIndexQuiet(intrinsicModel.keyColumn);
if (listColumnFilterA.size() == 1) {
final int latestByIndex = listColumnFilterA.getColumnIndex(0);
final boolean indexed = metadata.isColumnIndexed(latestByIndex);
if (intrinsicModel.keySubQuery != null) {
if (intrinsicModel.keyColumn != null) {
// key column must always be the same as latest by column
assert latestByIndex == metadata.getColumnIndexQuiet(intrinsicModel.keyColumn);
final RecordCursorFactory rcf = generate(intrinsicModel.keySubQuery, executionContext);
final int firstColumnType = validateSubQueryColumnAndGetType(intrinsicModel, rcf.getMetadata());
if (intrinsicModel.keySubQuery != null) {
return new LatestBySubQueryRecordCursorFactory(
configuration,
metadata,
dataFrameCursorFactory,
latestByIndex,
rcf,
filter,
indexed,
firstColumnType
);
}
final RecordCursorFactory rcf = generate(intrinsicModel.keySubQuery, executionContext);
final int firstColumnType = validateSubQueryColumnAndGetType(intrinsicModel, rcf.getMetadata());
final int nKeyValues = intrinsicModel.keyValues.size();
if (indexed) {
return new LatestBySubQueryRecordCursorFactory(
configuration,
metadata,
dataFrameCursorFactory,
latestByIndex,
rcf,
filter,
indexed,
firstColumnType
);
}
assert nKeyValues > 0;
// deal with key values as a list
// 1. resolve each value of the list to "int"
// 2. get first row in index for each value (stream)
final int nKeyValues = intrinsicModel.keyValues.size();
if (indexed) {
final SymbolMapReader symbolMapReader = reader.getSymbolMapReader(latestByIndex);
final RowCursorFactory rcf;
if (nKeyValues == 1) {
final CharSequence symbolValue = intrinsicModel.keyValues.get(0);
final int symbol = symbolMapReader.getQuick(symbolValue);
assert nKeyValues > 0;
// deal with key values as a list
// 1. resolve each value of the list to "int"
// 2. get first row in index for each value (stream)
if (filter == null) {
if (symbol == SymbolTable.VALUE_NOT_FOUND) {
rcf = new LatestByValueDeferredIndexedRowCursorFactory(latestByIndex, Chars.toString(symbolValue), false);
} else {
rcf = new LatestByValueIndexedRowCursorFactory(latestByIndex, symbol, false);
final SymbolMapReader symbolMapReader = reader.getSymbolMapReader(latestByIndex);
final RowCursorFactory rcf;
if (nKeyValues == 1) {
final CharSequence symbolValue = intrinsicModel.keyValues.get(0);
final int symbol = symbolMapReader.getQuick(symbolValue);
if (filter == null) {
if (symbol == SymbolTable.VALUE_NOT_FOUND) {
rcf = new LatestByValueDeferredIndexedRowCursorFactory(latestByIndex, Chars.toString(symbolValue), false);
} else {
rcf = new LatestByValueIndexedRowCursorFactory(latestByIndex, symbol, false);
}
return new DataFrameRecordCursorFactory(copyMetadata(metadata), dataFrameCursorFactory, rcf, null);
}
return new DataFrameRecordCursorFactory(copyMetadata(metadata), dataFrameCursorFactory, rcf, null);
}
if (symbol == SymbolTable.VALUE_NOT_FOUND) {
return new LatestByValueDeferredIndexedFilteredRecordCursorFactory(
if (symbol == SymbolTable.VALUE_NOT_FOUND) {
return new LatestByValueDeferredIndexedFilteredRecordCursorFactory(
copyMetadata(metadata),
dataFrameCursorFactory,
latestByIndex,
Chars.toString(symbolValue),
filter);
}
return new LatestByValueIndexedFilteredRecordCursorFactory(
copyMetadata(metadata),
dataFrameCursorFactory,
latestByIndex,
Chars.toString(symbolValue),
symbol,
filter);
}
return new LatestByValueIndexedFilteredRecordCursorFactory(
return new LatestByValuesIndexedFilteredRecordCursorFactory(
configuration,
copyMetadata(metadata),
dataFrameCursorFactory,
latestByIndex,
symbol,
filter);
intrinsicModel.keyValues,
symbolMapReader,
filter
);
}
return new LatestByValuesIndexedFilteredRecordCursorFactory(
configuration,
copyMetadata(metadata),
dataFrameCursorFactory,
latestByIndex,
intrinsicModel.keyValues,
symbolMapReader,
filter
);
}
assert nKeyValues > 0;
assert nKeyValues > 0;
// we have "latest by" column values, but no index
final SymbolMapReader symbolMapReader = reader.getSymbolMapReader(latestByIndex);
// we have "latest by" column values, but no index
final SymbolMapReader symbolMapReader = reader.getSymbolMapReader(latestByIndex);
if (nKeyValues > 1) {
return new LatestByValuesFilteredRecordCursorFactory(
configuration,
copyMetadata(metadata),
dataFrameCursorFactory,
latestByIndex,
intrinsicModel.keyValues,
symbolMapReader,
filter
);
}
if (nKeyValues > 1) {
return new LatestByValuesFilteredRecordCursorFactory(
configuration,
copyMetadata(metadata),
dataFrameCursorFactory,
latestByIndex,
intrinsicModel.keyValues,
symbolMapReader,
filter
);
// we have a single symbol key
int symbolKey = symbolMapReader.getQuick(intrinsicModel.keyValues.get(0));
if (symbolKey == SymbolTable.VALUE_NOT_FOUND) {
return new LatestByValueDeferredFilteredRecordCursorFactory(
copyMetadata(metadata),
dataFrameCursorFactory,
latestByIndex,
Chars.toString(intrinsicModel.keyValues.get(0)),
filter
);
}
return new LatestByValueFilteredRecordCursorFactory(copyMetadata(metadata), dataFrameCursorFactory, latestByIndex, symbolKey, filter);
}
// we select all values of "latest by" column
assert intrinsicModel.keyValues.size() == 0;
// get latest rows for all values of "latest by" column
// we have a single symbol key
int symbolKey = symbolMapReader.getQuick(intrinsicModel.keyValues.get(0));
if (symbolKey == SymbolTable.VALUE_NOT_FOUND) {
return new LatestByValueDeferredFilteredRecordCursorFactory(
if (indexed) {
return new LatestByAllIndexedFilteredRecordCursorFactory(
configuration,
copyMetadata(metadata),
dataFrameCursorFactory,
latestByIndex,
Chars.toString(intrinsicModel.keyValues.get(0)),
filter
);
filter);
}
return new LatestByValueFilteredRecordCursorFactory(copyMetadata(metadata), dataFrameCursorFactory, latestByIndex, symbolKey, filter);
}
// we select all values of "latest by" column
assert intrinsicModel.keyValues.size() == 0;
// get latest rows for all values of "latest by" column
if (indexed) {
return new LatestByAllIndexedFilteredRecordCursorFactory(
configuration,
copyMetadata(metadata),
dataFrameCursorFactory,
latestByIndex,
filter);
}
listColumnFilterA.clear();
listColumnFilterA.add(latestByIndex);
return new LatestByAllFilteredRecordCursorFactory(
copyMetadata(metadata),
configuration,
dataFrameCursorFactory,
RecordSinkFactory.getInstance(asm, metadata, listColumnFilterA, false),
singleColumnType.of(metadata.getColumnType(latestByIndex)),
keyTypes,
filter
);
}
......@@ -1231,7 +1240,7 @@ public class SqlCodeGenerator {
QueryModel model,
SqlExecutionContext executionContext
) throws SqlException {
final ExpressionNode latestBy = model.getLatestBy();
final ObjList<ExpressionNode> latestBy = model.getLatestBy();
final ExpressionNode whereClause = model.getWhereClause();
try (TableReader reader = engine.getReader(
......@@ -1250,24 +1259,31 @@ public class SqlCodeGenerator {
timestampIndex = -1;
}
final int latestByIndex;
if (latestBy != null) {
listColumnFilterA.clear();
final int latestByColumnCount = latestBy.size();
if (latestBy.size() > 0) {
// validate latest by against current reader
// first check if column is valid
latestByIndex = metadata.getColumnIndexQuiet(latestBy.token);
if (latestByIndex == -1) {
throw SqlException.invalidColumn(latestBy.position, latestBy.token);
for (int i = 0; i < latestByColumnCount; i++) {
final int index = metadata.getColumnIndexQuiet(latestBy.getQuick(i).token);
if (index == -1) {
throw SqlException.invalidColumn(latestBy.getQuick(i).position, latestBy.getQuick(i).token);
}
// we are reusing collections which leads to confusing naming for this method
// keyTypes are types of columns we collect 'latest by' for
keyTypes.add(metadata.getColumnType(index));
// columnFilterA are indexes of columns we collect 'latest by' for
listColumnFilterA.add(index);
}
} else {
latestByIndex = -1;
}
final String tableName = Chars.toString(model.getTableName().token);
if (whereClause != null) {
final IntrinsicModel intrinsicModel = filterAnalyser.extract(model, whereClause, metadata, latestBy != null ? latestBy.token : null, timestampIndex);
final IntrinsicModel intrinsicModel = filterAnalyser.extract(model, whereClause, metadata, latestByColumnCount > 0 ? latestBy.getQuick(0).token : null, timestampIndex);
if (intrinsicModel.intrinsicValue == IntrinsicModel.FALSE) {
return new EmptyTableRecordCursorFactory(metadata);
......@@ -1297,12 +1313,11 @@ public class SqlCodeGenerator {
DataFrameCursorFactory dfcFactory;
if (latestByIndex > -1) {
if (latestByColumnCount > 0) {
return generateLatestByQuery(
model,
reader,
metadata,
latestByIndex,
tableName,
intrinsicModel,
filter,
......@@ -1377,28 +1392,26 @@ public class SqlCodeGenerator {
}
// no where clause
if (latestByIndex == -1) {
if (latestByColumnCount == 0) {
return new TableReaderRecordCursorFactory(copyMetadata(metadata), engine, tableName, model.getTableVersion());
}
if (metadata.isColumnIndexed(latestByIndex)) {
if (latestByColumnCount == 1 && metadata.isColumnIndexed(listColumnFilterA.getQuick(0))) {
return new LatestByAllIndexedFilteredRecordCursorFactory(
configuration,
copyMetadata(metadata),
new FullBwdDataFrameCursorFactory(engine, tableName, model.getTableVersion()),
latestByIndex,
listColumnFilterA.getQuick(0),
null
);
}
listColumnFilterA.clear();
listColumnFilterA.add(latestByIndex);
return new LatestByAllFilteredRecordCursorFactory(
copyMetadata(metadata),
configuration,
new FullBwdDataFrameCursorFactory(engine, tableName, model.getTableVersion()),
RecordSinkFactory.getInstance(asm, metadata, listColumnFilterA, false),
singleColumnType.of(metadata.getColumnType(latestByIndex)),
keyTypes,
null
);
}
......@@ -1518,11 +1531,4 @@ public class SqlCodeGenerator {
return firstColumnType;
}
static {
limitTypes.add(ColumnType.LONG);
limitTypes.add(ColumnType.BYTE);
limitTypes.add(ColumnType.SHORT);
limitTypes.add(ColumnType.INT);
}
}
......@@ -913,7 +913,15 @@ public final class SqlParser {
private void parseLatestBy(GenericLexer lexer, QueryModel model) throws SqlException {
expectTok(lexer, "by");
model.setLatestBy(expr(lexer, model));
CharSequence tok;
do {
model.addLatestBy(expectLiteral(lexer));
tok = SqlUtil.fetchNext(lexer);
} while (Chars.equalsNc(tok, ','));
if (tok != null) {
lexer.unparse();
}
}
private ExecutionModel parseRenameStatement(GenericLexer lexer) throws SqlException {
......
......@@ -92,7 +92,7 @@ public class QueryModel implements Mutable, ExecutionModel, AliasTranslator, Sin
private long tableVersion;
private Function tableNameFunction;
private ExpressionNode alias;
private ExpressionNode latestBy;
private ObjList<ExpressionNode> latestBy = new ObjList<>();
private ExpressionNode timestamp;
private ExpressionNode sampleBy;
private JoinContext context;
......@@ -225,7 +225,7 @@ public class QueryModel implements Mutable, ExecutionModel, AliasTranslator, Sin
nestedModel = null;
tableName = null;
alias = null;
latestBy = null;
latestBy.clear();
joinCriteria = null;
joinType = JOIN_INNER;
joinKeywordPosition = 0;
......@@ -388,12 +388,12 @@ public class QueryModel implements Mutable, ExecutionModel, AliasTranslator, Sin
this.joinKeywordPosition = position;
}
public ExpressionNode getLatestBy() {
public ObjList<ExpressionNode> getLatestBy() {
return latestBy;
}
public void setLatestBy(ExpressionNode latestBy) {
this.latestBy = latestBy;
public void addLatestBy(ExpressionNode latestBy) {
this.latestBy.add(latestBy);
}
public ExpressionNode getLimitHi() {
......@@ -685,9 +685,11 @@ public class QueryModel implements Mutable, ExecutionModel, AliasTranslator, Sin
sink.put(')');
}
if (getLatestBy() != null) {
if (getLatestBy().size() > 0) {
sink.put(" latest by ");
getLatestBy().toSink(sink);
for (int i = 0, n = getLatestBy().size(); i < n; i++) {
getLatestBy().getQuick(i).toSink(sink);
}
}
if (orderedJoinModels.size() > 1) {
......
......@@ -1104,6 +1104,42 @@ public class SqlCodeGeneratorTest extends AbstractGriffinTest {
"24.593452776060\tfalse\t2019-01-01T00:00:00.000000Z\n");
}
@Test
public void testLatestByMultipleColumns() throws Exception {
assertQuery("cust_id\tbalance_ccy\tbalance\tstatus\ttimestamp\n",
"select * from balances latest by cust_id, balance_ccy",
"create table balances (\n" +
"\tcust_id int, \n" +
"\tbalance_ccy symbol, \n" +
"\tbalance double, \n" +
"\tstatus byte, \n" +
"\ttimestamp timestamp\n" +
") timestamp(timestamp)",
"timestamp",
"insert into balances select * from (" +
" select" +
" abs(rnd_int()) % 4," +
" rnd_str('USD', 'GBP', 'EUR')," +
" rnd_double()," +
" rnd_byte(0,1)," +
" to_timestamp(0) timestamp" +
" from long_sequence(150)" +
") timestamp (timestamp)",
"cust_id\tbalance_ccy\tbalance\tstatus\ttimestamp\n" +
"3\tUSD\t0.879641346857\t0\t1970-01-01T00:00:00.000000Z\n" +
"3\tEUR\t0.011099265672\t0\t1970-01-01T00:00:00.000000Z\n" +
"1\tEUR\t0.107475118336\t1\t1970-01-01T00:00:00.000000Z\n" +
"1\tGBP\t0.152748580781\t1\t1970-01-01T00:00:00.000000Z\n" +
"0\tGBP\t0.073834641749\t1\t1970-01-01T00:00:00.000000Z\n" +
"2\tEUR\t0.300620110525\t0\t1970-01-01T00:00:00.000000Z\n" +
"1\tUSD\t0.124540547653\t0\t1970-01-01T00:00:00.000000Z\n" +
"0\tUSD\t0.312445801061\t0\t1970-01-01T00:00:00.000000Z\n" +
"2\tUSD\t0.794318576750\t1\t1970-01-01T00:00:00.000000Z\n" +
"2\tGBP\t0.438886409177\t1\t1970-01-01T00:00:00.000000Z\n" +
"0\tEUR\t0.592145777030\t1\t1970-01-01T00:00:00.000000Z\n" +
"3\tGBP\t0.318618433941\t1\t1970-01-01T00:00:00.000000Z\n");
}
@Test
public void testLatestByAllConstantFilter() throws Exception {
final String expected = "a\tb\tk\n" +
......
......@@ -3338,6 +3338,33 @@ public class SqlParserTest extends AbstractGriffinTest {
);
}
@Test
public void testLatestBySyntax2() {
assertSyntaxError(
"select * from tab latest by x, ",
30,
"literal expected"
);
}
@Test
public void testLatestBySyntax3() {
assertSyntaxError(
"select * from tab latest by",
27,
"literal expected"
);
}
@Test
public void testLatestBySyntax4() {
assertSyntaxError(
"select * from tab latest by x+1",
29,
"unexpected token: +"
);
}
@Test
public void testLexerReset() {
for (int i = 0; i < 10; i++) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册