提交 2c2481ff 编写于 作者: V Vlad Ilyushchenko

CAIRO: refactored exceptions

GRIFFIN: refactored insert statement implementation and tested exhaustively
上级 7f018ec7
......@@ -26,6 +26,7 @@ package io.questdb.cairo;
import io.questdb.cairo.pool.PoolListener;
import io.questdb.cairo.pool.ReaderPool;
import io.questdb.cairo.pool.WriterPool;
import io.questdb.cairo.sql.ReaderOutOfDateException;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.mp.SynchronizedJob;
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package io.questdb.cairo.sql;
import java.io.Closeable;
public interface InsertMethod extends Closeable {
void execute();
void commit();
}
......@@ -23,13 +23,13 @@
package io.questdb.cairo.sql;
import io.questdb.cairo.TableWriter;
import io.questdb.cairo.CairoEngine;
import io.questdb.griffin.SqlExecutionContext;
public interface InsertStatement {
void execute(TableWriter tableWriter, SqlExecutionContext executionContext);
CharSequence getTableName();
long getStructureVersion();
InsertMethod createMethod(CairoEngine engine, SqlExecutionContext executionContext);
}
......@@ -21,7 +21,7 @@
*
******************************************************************************/
package io.questdb.cairo;
package io.questdb.cairo.sql;
public class ReaderOutOfDateException extends RuntimeException {
public static final ReaderOutOfDateException INSTANCE = new ReaderOutOfDateException();
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package io.questdb.cairo.sql;
public class WriterOutOfDateException extends RuntimeException {
public static WriterOutOfDateException INSTANCE = new WriterOutOfDateException();
}
......@@ -29,6 +29,7 @@ import io.questdb.cairo.sql.Function;
import io.questdb.cairo.sql.RecordCursorFactory;
import io.questdb.cairo.sql.RecordMetadata;
import io.questdb.griffin.engine.functions.CursorFunction;
import io.questdb.griffin.engine.functions.bind.BindVariableService;
import io.questdb.griffin.engine.functions.bind.IndexedParameterLinkFunction;
import io.questdb.griffin.engine.functions.bind.NamedParameterLinkFunction;
import io.questdb.griffin.engine.functions.columns.*;
......@@ -37,6 +38,7 @@ import io.questdb.griffin.model.ExpressionNode;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.std.*;
import org.jetbrains.annotations.NotNull;
import java.util.ArrayDeque;
......@@ -200,7 +202,7 @@ public class FunctionParser implements PostOrderTreeTraversalAlgo.Visitor {
}
public Function createIndexParameter(int variableIndex, ExpressionNode node) throws SqlException {
Function function = sqlExecutionContext.getBindVariableService().getFunction(variableIndex);
Function function = getBindVariableService().getFunction(variableIndex);
if (function == null) {
throw SqlException.position(node.position).put("no bind variable defined at index ").put(variableIndex);
}
......@@ -208,13 +210,22 @@ public class FunctionParser implements PostOrderTreeTraversalAlgo.Visitor {
}
public Function createNamedParameter(ExpressionNode node) throws SqlException {
Function function = sqlExecutionContext.getBindVariableService().getFunction(node.token);
Function function = getBindVariableService().getFunction(node.token);
if (function == null) {
throw SqlException.position(node.position).put("undefined bind variable: ").put(node.token);
}
return new NamedParameterLinkFunction(Chars.toString(node.token), function.getType(), node.position);
}
@NotNull
private BindVariableService getBindVariableService() throws SqlException {
final BindVariableService bindVariableService = sqlExecutionContext.getBindVariableService();
if (bindVariableService == null) {
throw SqlException.$(0, "bind variable service is not provided");
}
return bindVariableService;
}
public boolean isGroupBy(CharSequence name) {
return groupByFunctionNames.contains(name);
}
......
......@@ -23,10 +23,10 @@
package io.questdb.griffin;
import io.questdb.cairo.CairoEngine;
import io.questdb.cairo.TableWriter;
import io.questdb.cairo.sql.Function;
import io.questdb.cairo.sql.InsertStatement;
import io.questdb.cairo.sql.VirtualRecord;
import io.questdb.cairo.sql.*;
import io.questdb.std.Misc;
import io.questdb.std.ObjList;
public class InsertStatementImpl implements InsertStatement {
......@@ -36,6 +36,7 @@ public class InsertStatementImpl implements InsertStatement {
private final RowFactory rowFactory;
private final long structureVersion;
private final String tableName;
private final InsertMethodImpl insertMethod = new InsertMethodImpl();
private SqlExecutionContext lastUsedContext;
// todo: recycle these
......@@ -68,17 +69,6 @@ public class InsertStatementImpl implements InsertStatement {
return tableName;
}
@Override
public void execute(TableWriter tableWriter, SqlExecutionContext executionContext) {
if (lastUsedContext != executionContext) {
initContext(executionContext);
}
final TableWriter.Row row = rowFactory.getRow(tableWriter);
copier.copy(virtualRecord, row);
row.append();
}
private TableWriter.Row getRowWithTimestamp(TableWriter tableWriter) {
return tableWriter.newRow(timestampFunction.getTimestamp(null));
}
......@@ -98,8 +88,44 @@ public class InsertStatementImpl implements InsertStatement {
}
}
@Override
public InsertMethod createMethod(CairoEngine engine, SqlExecutionContext executionContext) {
if (lastUsedContext != executionContext) {
initContext(executionContext);
}
final TableWriter writer = engine.getWriter(executionContext.getCairoSecurityContext(), tableName);
if (writer.getStructureVersion() != getStructureVersion()) {
writer.close();
throw WriterOutOfDateException.INSTANCE;
}
insertMethod.writer = writer;
return insertMethod;
}
@FunctionalInterface
private interface RowFactory {
TableWriter.Row getRow(TableWriter tableWriter);
}
private class InsertMethodImpl implements InsertMethod {
private TableWriter writer = null;
@Override
public void execute() {
final TableWriter.Row row = rowFactory.getRow(writer);
copier.copy(virtualRecord, row);
row.append();
}
@Override
public void commit() {
writer.commit();
}
@Override
public void close() {
writer = Misc.free(writer);
}
}
}
......@@ -1127,6 +1127,7 @@ public class SqlCompiler implements Closeable {
final ExpressionNode name = model.getTableName();
tableExistsOrFail(name.position, name.token, executionContext);
ObjList<Function> valueFunctions = null;
try (TableReader reader = engine.getReader(executionContext.getCairoSecurityContext(), name.token, TableUtils.ANY_TABLE_VERSION)) {
final long structureVersion = reader.getVersion();
final RecordMetadata metadata = reader.getMetadata();
......@@ -1134,7 +1135,6 @@ public class SqlCompiler implements Closeable {
final CharSequenceHashSet columnSet = model.getColumnSet();
final int columnSetSize = columnSet.size();
final ColumnFilter columnFilter;
final ObjList<Function> valueFunctions;
Function timestampFunction = null;
if (columnSetSize > 0) {
listColumnFilter.clear();
......@@ -1164,7 +1164,7 @@ public class SqlCompiler implements Closeable {
columnFilter = entityColumnFilter;
valueFunctions = new ObjList<>(columnCount);
for (int i = 0; i < columnCount; i++) {
Function function = functionParser.parseFunction(model.getColumnValues().getQuick(i), metadata, executionContext);
Function function = functionParser.parseFunction(model.getColumnValues().getQuick(i), EmptyRecordMetadata.INSTANCE, executionContext);
if (!isAssignableFrom(metadata.getColumnType(i), function.getType())) {
throw SqlException.$(model.getColumnValues().getQuick(i).position, "inconvertible types: ").put(ColumnType.nameOf(function.getType())).put(" -> ").put(ColumnType.nameOf(metadata.getColumnType(i)));
}
......@@ -1184,6 +1184,9 @@ public class SqlCompiler implements Closeable {
VirtualRecord record = new VirtualRecord(valueFunctions);
RecordToRowCopier copier = assembleRecordToRowCopier(asm, record, metadata, columnFilter);
return compiledQuery.ofInsert(new InsertStatementImpl(Chars.toString(name.token), record, copier, timestampFunction, structureVersion));
} catch (SqlException e) {
Misc.freeObjList(valueFunctions);
throw e;
}
}
......
......@@ -25,6 +25,7 @@ package io.questdb.cairo;
import io.questdb.cairo.pool.PoolListener;
import io.questdb.cairo.security.AllowAllCairoSecurityContext;
import io.questdb.cairo.sql.ReaderOutOfDateException;
import io.questdb.mp.Job;
import io.questdb.mp.RingQueue;
import io.questdb.mp.Sequence;
......
......@@ -26,6 +26,7 @@ package io.questdb.cairo;
import io.questdb.cairo.security.AllowAllCairoSecurityContext;
import io.questdb.cairo.sql.DataFrame;
import io.questdb.cairo.sql.DataFrameCursor;
import io.questdb.cairo.sql.ReaderOutOfDateException;
import io.questdb.std.Rnd;
import io.questdb.std.microtime.DateFormatUtils;
import io.questdb.test.tools.TestUtils;
......
......@@ -26,6 +26,7 @@ package io.questdb.cairo;
import io.questdb.cairo.security.AllowAllCairoSecurityContext;
import io.questdb.cairo.sql.DataFrame;
import io.questdb.cairo.sql.DataFrameCursor;
import io.questdb.cairo.sql.ReaderOutOfDateException;
import io.questdb.std.Rnd;
import io.questdb.test.tools.TestUtils;
import org.junit.Assert;
......
......@@ -24,10 +24,7 @@
package io.questdb.cairo;
import io.questdb.cairo.security.AllowAllCairoSecurityContext;
import io.questdb.cairo.sql.DataFrame;
import io.questdb.cairo.sql.DataFrameCursor;
import io.questdb.cairo.sql.RowCursor;
import io.questdb.cairo.sql.SymbolTable;
import io.questdb.cairo.sql.*;
import io.questdb.std.LongList;
import io.questdb.std.Rnd;
import io.questdb.std.microtime.DateFormatUtils;
......
......@@ -24,10 +24,7 @@
package io.questdb.cairo;
import io.questdb.cairo.security.AllowAllCairoSecurityContext;
import io.questdb.cairo.sql.DataFrame;
import io.questdb.cairo.sql.DataFrameCursor;
import io.questdb.cairo.sql.RowCursor;
import io.questdb.cairo.sql.SymbolTable;
import io.questdb.cairo.sql.*;
import io.questdb.std.LongList;
import io.questdb.std.Rnd;
import io.questdb.std.microtime.DateFormatUtils;
......
......@@ -23,10 +23,17 @@
package io.questdb.griffin;
import io.questdb.cairo.*;
import io.questdb.cairo.CairoTestUtils;
import io.questdb.cairo.PartitionBy;
import io.questdb.cairo.TableReader;
import io.questdb.cairo.TableReaderRecordCursor;
import io.questdb.cairo.security.AllowAllCairoSecurityContext;
import io.questdb.cairo.sql.InsertMethod;
import io.questdb.cairo.sql.InsertStatement;
import io.questdb.cairo.sql.Record;
import io.questdb.cairo.sql.WriterOutOfDateException;
import io.questdb.griffin.engine.TestBinarySequence;
import io.questdb.griffin.engine.functions.bind.BindVariableService;
import io.questdb.std.BinarySequence;
import io.questdb.std.Long256;
import io.questdb.std.Rnd;
......@@ -81,6 +88,86 @@ public class InsertTest extends AbstractGriffinTest {
});
}
@Test
public void testInsertValueCannotReferenceTableColumn() throws Exception {
TestUtils.assertMemoryLeak(() -> {
compiler.compile("create table balances(cust_id int, ccy symbol, balance double)");
try {
compiler.compile("insert into balances values (1, ccy, 356.12)");
Assert.fail();
} catch (SqlException e) {
Assert.assertEquals(32, e.getPosition());
}
engine.releaseAllWriters();
engine.releaseAllReaders();
});
}
@Test
public void testInsertExecutionAfterStructureChange() throws Exception {
TestUtils.assertMemoryLeak(() -> {
compiler.compile("create table balances(cust_id int, ccy symbol, balance double)");
try {
CompiledQuery cq = compiler.compile("insert into balances values (1, 'GBP', 356.12)");
Assert.assertEquals(CompiledQuery.INSERT, cq.getType());
InsertStatement insertStatement = cq.getInsertStatement();
compiler.compile("alter table balances drop column ccy");
insertStatement.createMethod(engine, sqlExecutionContext);
} catch (WriterOutOfDateException ignored) {
}
engine.releaseAllWriters();
engine.releaseAllReaders();
});
}
@Test
public void testInsertContextSwitch() throws Exception {
TestUtils.assertMemoryLeak(() -> {
compiler.compile("create table balances(cust_id int, ccy symbol, balance double)");
try {
sqlExecutionContext.getBindVariableService().setDouble("bal", 150.4);
CompiledQuery cq = compiler.compile("insert into balances values (1, 'GBP', :bal)", sqlExecutionContext);
Assert.assertEquals(CompiledQuery.INSERT, cq.getType());
InsertStatement insertStatement = cq.getInsertStatement();
try (InsertMethod method = insertStatement.createMethod(engine, sqlExecutionContext)) {
method.execute();
method.commit();
}
BindVariableService bindVariableService = new BindVariableService();
SqlExecutionContext sqlExecutionContext = new SqlExecutionContextImpl().with(AllowAllCairoSecurityContext.INSTANCE, bindVariableService);
bindVariableService.setDouble("bal", 56.4);
try (InsertMethod method = insertStatement.createMethod(engine, sqlExecutionContext)) {
method.execute();
method.commit();
}
try (TableReader reader = engine.getReader(sqlExecutionContext.getCairoSecurityContext(), insertStatement.getTableName())) {
printer.print(reader.getCursor(), reader.getMetadata(), true);
}
TestUtils.assertEquals("cust_id\tccy\tbalance\n" +
"1\tGBP\t150.400000000000\n" +
"1\tGBP\t56.400000000000\n",
sink
);
} catch (WriterOutOfDateException ignored) {
}
engine.releaseAllWriters();
engine.releaseAllReaders();
});
}
@Test
public void testInsertNoTimestamp() throws Exception {
TestUtils.assertMemoryLeak(() -> {
......@@ -88,10 +175,9 @@ public class InsertTest extends AbstractGriffinTest {
CompiledQuery cq = compiler.compile("insert into balances values (1, 'USD', 356.12)");
Assert.assertEquals(CompiledQuery.INSERT, cq.getType());
InsertStatement insert = cq.getInsertStatement();
try (TableWriter writer = engine.getWriter(sqlExecutionContext.getCairoSecurityContext(), insert.getTableName())) {
insert.execute(writer, sqlExecutionContext);
Assert.assertTrue(writer.inTransaction());
writer.commit();
try (InsertMethod method = insert.createMethod(engine, sqlExecutionContext)) {
method.execute();
method.commit();
}
String expected = "cust_id\tccy\tbalance\n" +
......@@ -172,8 +258,8 @@ public class InsertTest extends AbstractGriffinTest {
Assert.assertEquals(CompiledQuery.INSERT, cq.getType());
InsertStatement insert = cq.getInsertStatement();
try (TableWriter writer = engine.getWriter(sqlExecutionContext.getCairoSecurityContext(), insert.getTableName())) {
Assert.assertEquals(writer.getStructureVersion(), insert.getStructureVersion());
try (InsertMethod method = insert.createMethod(engine, sqlExecutionContext)) {
// Assert.assertEquals(writer.getStructureVersion(), insert.getStructureVersion());
for (int i = 0; i < 1_000_000; i++) {
bindVariableService.setInt(0, rnd.nextInt());
bindVariableService.setShort(1, rnd.nextShort());
......@@ -190,9 +276,9 @@ public class InsertTest extends AbstractGriffinTest {
bindVariableService.setLong256(11, rnd.nextLong(), rnd.nextLong(), rnd.nextLong(), rnd.nextLong());
bindVariableService.setChar(12, rnd.nextChar());
bindVariableService.setTimestamp(13, timestampFunction.getTimestamp());
cq.getInsertStatement().execute(writer, sqlExecutionContext);
method.execute();
}
writer.commit();
method.commit();
}
rnd.reset();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册