Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
jobily
Questdb
提交
a17edcbc
Q
Questdb
项目概览
jobily
/
Questdb
11 个月 前同步成功
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
Q
Questdb
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
a17edcbc
编写于
11月 24, 2021
作者:
M
mkaruza
提交者:
GitHub
11月 24, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat(sql) SQL for bulk insert (#1603)
上级
ce5977cb
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
408 addition
and
163 deletion
+408
-163
core/src/main/java/io/questdb/cairo/sql/InsertStatement.java
core/src/main/java/io/questdb/cairo/sql/InsertStatement.java
+3
-0
core/src/main/java/io/questdb/griffin/InsertRowImpl.java
core/src/main/java/io/questdb/griffin/InsertRowImpl.java
+99
-0
core/src/main/java/io/questdb/griffin/InsertStatementImpl.java
...src/main/java/io/questdb/griffin/InsertStatementImpl.java
+11
-52
core/src/main/java/io/questdb/griffin/SqlCompiler.java
core/src/main/java/io/questdb/griffin/SqlCompiler.java
+86
-70
core/src/main/java/io/questdb/griffin/SqlParser.java
core/src/main/java/io/questdb/griffin/SqlParser.java
+16
-9
core/src/main/java/io/questdb/griffin/model/InsertModel.java
core/src/main/java/io/questdb/griffin/model/InsertModel.java
+27
-20
core/src/test/java/io/questdb/griffin/AlterTableAlterSymbolColumnCacheFlagTest.java
...tdb/griffin/AlterTableAlterSymbolColumnCacheFlagTest.java
+9
-9
core/src/test/java/io/questdb/griffin/InsertTest.java
core/src/test/java/io/questdb/griffin/InsertTest.java
+155
-1
core/src/test/java/io/questdb/griffin/SimulatedDeleteTest.java
...src/test/java/io/questdb/griffin/SimulatedDeleteTest.java
+1
-1
core/src/test/java/io/questdb/griffin/SqlParserTest.java
core/src/test/java/io/questdb/griffin/SqlParserTest.java
+1
-1
未找到文件。
core/src/main/java/io/questdb/cairo/sql/InsertStatement.java
浏览文件 @
a17edcbc
...
...
@@ -25,6 +25,7 @@
package
io.questdb.cairo.sql
;
import
io.questdb.cairo.pool.WriterSource
;
import
io.questdb.griffin.InsertRowImpl
;
import
io.questdb.griffin.SqlException
;
import
io.questdb.griffin.SqlExecutionContext
;
...
...
@@ -43,4 +44,6 @@ public interface InsertStatement extends Closeable {
CharSequence
getTableName
();
void
detachWriter
();
void
addInsertRow
(
InsertRowImpl
row
);
}
core/src/main/java/io/questdb/griffin/InsertRowImpl.java
0 → 100644
浏览文件 @
a17edcbc
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package
io.questdb.griffin
;
import
io.questdb.cairo.CairoException
;
import
io.questdb.cairo.ColumnType
;
import
io.questdb.cairo.TableWriter
;
import
io.questdb.cairo.sql.Function
;
import
io.questdb.cairo.sql.VirtualRecord
;
import
io.questdb.griffin.model.IntervalUtils
;
import
io.questdb.std.NumericException
;
import
io.questdb.std.ObjList
;
public
class
InsertRowImpl
{
private
final
VirtualRecord
virtualRecord
;
private
final
SqlCompiler
.
RecordToRowCopier
copier
;
private
final
Function
timestampFunction
;
private
final
RowFactory
rowFactory
;
public
InsertRowImpl
(
VirtualRecord
virtualRecord
,
SqlCompiler
.
RecordToRowCopier
copier
,
Function
timestampFunction
)
{
this
.
virtualRecord
=
virtualRecord
;
this
.
copier
=
copier
;
this
.
timestampFunction
=
timestampFunction
;
if
(
timestampFunction
!=
null
)
{
if
(!
ColumnType
.
isString
(
timestampFunction
.
getType
()))
{
rowFactory
=
this
::
getRowWithTimestamp
;
}
else
{
rowFactory
=
this
::
getRowWithStringTimestamp
;
}
}
else
{
rowFactory
=
this
::
getRowWithoutTimestamp
;
}
}
private
TableWriter
.
Row
getRowWithTimestamp
(
TableWriter
tableWriter
)
{
long
timestamp
=
timestampFunction
.
getTimestamp
(
null
);
return
tableWriter
.
newRow
(
timestamp
);
}
private
TableWriter
.
Row
getRowWithStringTimestamp
(
TableWriter
tableWriter
)
{
CharSequence
tsStr
=
timestampFunction
.
getStr
(
null
);
try
{
long
timestamp
=
IntervalUtils
.
parseFloorPartialDate
(
tsStr
);
return
tableWriter
.
newRow
(
timestamp
);
}
catch
(
NumericException
e
)
{
throw
CairoException
.
instance
(
0
).
put
(
"Invalid timestamp: "
).
put
(
tsStr
);
}
}
private
TableWriter
.
Row
getRowWithoutTimestamp
(
TableWriter
tableWriter
)
{
return
tableWriter
.
newRow
();
}
public
void
initContext
(
SqlExecutionContext
executionContext
)
throws
SqlException
{
final
ObjList
<?
extends
Function
>
functions
=
virtualRecord
.
getFunctions
();
Function
.
init
(
functions
,
null
,
executionContext
);
if
(
timestampFunction
!=
null
)
{
timestampFunction
.
init
(
null
,
executionContext
);
}
}
public
void
append
(
TableWriter
writer
)
{
final
TableWriter
.
Row
row
=
rowFactory
.
getRow
(
writer
);
copier
.
copy
(
virtualRecord
,
row
);
row
.
append
();
}
@FunctionalInterface
private
interface
RowFactory
{
TableWriter
.
Row
getRow
(
TableWriter
tableWriter
);
}
}
core/src/main/java/io/questdb/griffin/InsertStatementImpl.java
浏览文件 @
a17edcbc
...
...
@@ -25,48 +25,26 @@
package
io.questdb.griffin
;
import
io.questdb.cairo.CairoEngine
;
import
io.questdb.cairo.CairoException
;
import
io.questdb.cairo.ColumnType
;
import
io.questdb.cairo.TableWriter
;
import
io.questdb.cairo.pool.WriterSource
;
import
io.questdb.cairo.sql.*
;
import
io.questdb.griffin.model.IntervalUtils
;
import
io.questdb.std.Misc
;
import
io.questdb.std.NumericException
;
import
io.questdb.std.ObjList
;
public
class
InsertStatementImpl
implements
InsertStatement
{
private
final
VirtualRecord
virtualRecord
;
private
final
SqlCompiler
.
RecordToRowCopier
copier
;
private
final
Function
timestampFunction
;
private
final
RowFactory
rowFactory
;
private
final
long
structureVersion
;
private
final
String
tableName
;
private
final
InsertMethodImpl
insertMethod
=
new
InsertMethodImpl
();
private
final
ObjList
<
InsertRowImpl
>
insertRows
=
new
ObjList
<>();
private
final
CairoEngine
engine
;
public
InsertStatementImpl
(
CairoEngine
engine
,
String
tableName
,
VirtualRecord
virtualRecord
,
SqlCompiler
.
RecordToRowCopier
copier
,
Function
timestampFunction
,
long
structureVersion
)
{
this
.
engine
=
engine
;
this
.
tableName
=
tableName
;
this
.
virtualRecord
=
virtualRecord
;
this
.
copier
=
copier
;
this
.
timestampFunction
=
timestampFunction
;
if
(
timestampFunction
!=
null
)
{
if
(!
ColumnType
.
isString
(
timestampFunction
.
getType
()))
{
rowFactory
=
this
::
getRowWithTimestamp
;
}
else
{
rowFactory
=
this
::
getRowWithStringTimestamp
;
}
}
else
{
rowFactory
=
this
::
getRowWithoutTimestamp
;
}
this
.
structureVersion
=
structureVersion
;
}
@Override
...
...
@@ -108,46 +86,27 @@ public class InsertStatementImpl implements InsertStatement {
insertMethod
.
close
();
}
private
TableWriter
.
Row
getRowWithTimestamp
(
TableWriter
tableWriter
)
{
long
timestamp
=
timestampFunction
.
getTimestamp
(
null
);
return
tableWriter
.
newRow
(
timestamp
);
}
private
TableWriter
.
Row
getRowWithStringTimestamp
(
TableWriter
tableWriter
)
{
CharSequence
tsStr
=
timestampFunction
.
getStr
(
null
);
try
{
long
timestamp
=
IntervalUtils
.
parseFloorPartialDate
(
tsStr
);
return
tableWriter
.
newRow
(
timestamp
);
}
catch
(
NumericException
e
)
{
throw
CairoException
.
instance
(
0
).
put
(
"Invalid timestamp: "
).
put
(
tsStr
);
}
}
private
TableWriter
.
Row
getRowWithoutTimestamp
(
TableWriter
tableWriter
)
{
return
tableWriter
.
newRow
();
@Override
public
void
addInsertRow
(
InsertRowImpl
row
)
{
insertRows
.
add
(
row
);
}
private
void
initContext
(
SqlExecutionContext
executionContext
)
throws
SqlException
{
final
ObjList
<?
extends
Function
>
functions
=
virtualRecord
.
getFunctions
();
Function
.
init
(
functions
,
null
,
executionContext
);
if
(
timestampFunction
!=
null
)
{
timestampFunction
.
init
(
null
,
executionContext
);
for
(
int
i
=
0
,
n
=
insertRows
.
size
();
i
<
n
;
i
++)
{
InsertRowImpl
row
=
insertRows
.
get
(
i
);
row
.
initContext
(
executionContext
);
}
}
@FunctionalInterface
private
interface
RowFactory
{
TableWriter
.
Row
getRow
(
TableWriter
tableWriter
);
}
private
class
InsertMethodImpl
implements
InsertMethod
{
private
TableWriter
writer
=
null
;
@Override
public
long
execute
()
{
final
TableWriter
.
Row
row
=
rowFactory
.
getRow
(
writer
);
copier
.
copy
(
virtualRecord
,
row
);
row
.
append
();
for
(
int
i
=
0
,
n
=
insertRows
.
size
();
i
<
n
;
i
++)
{
InsertRowImpl
row
=
insertRows
.
get
(
i
);
row
.
append
(
writer
);
}
return
1
;
}
...
...
core/src/main/java/io/questdb/griffin/SqlCompiler.java
浏览文件 @
a17edcbc
...
...
@@ -1452,8 +1452,9 @@ public class SqlCompiler implements Closeable {
configuration
.
getCreateAsSelectRetryCount
(),
executionContext
);
}
else
{
return
insert
(
executionModel
,
executionContext
);
}
return
insert
(
executionModel
,
executionContext
);
}
}
...
...
@@ -1796,84 +1797,92 @@ public class SqlCompiler implements Closeable {
))
{
final
long
structureVersion
=
reader
.
getVersion
();
final
RecordMetadata
metadata
=
reader
.
getMetadata
();
final
InsertStatementImpl
insertStatement
=
new
InsertStatementImpl
(
engine
,
reader
.
getTableName
(),
structureVersion
);
final
int
writerTimestampIndex
=
metadata
.
getTimestampIndex
();
final
CharSequenceHashSet
columnSet
=
model
.
getColumnSet
();
final
int
columnSetSize
=
columnSet
.
size
();
Function
timestampFunction
=
null
;
listColumnFilter
.
clear
();
if
(
columnSetSize
>
0
)
{
for
(
int
t
=
0
,
n
=
model
.
getRowTupleCount
();
t
<
n
;
t
++)
{
Function
timestampFunction
=
null
;
listColumnFilter
.
clear
();
valueFunctions
=
new
ObjList
<>(
columnSetSize
);
for
(
int
i
=
0
;
i
<
columnSetSize
;
i
++)
{
int
index
=
metadata
.
getColumnIndexQuiet
(
columnSet
.
get
(
i
));
if
(
index
>
-
1
)
{
final
ExpressionNode
node
=
model
.
getColumnValues
().
getQuick
(
i
);
Function
function
=
functionParser
.
parseFunction
(
node
,
GenericRecordMetadata
.
EMPTY
,
executionContext
);
if
(
columnSetSize
>
0
)
{
valueFunctions
=
new
ObjList
<>(
columnSetSize
);
for
(
int
i
=
0
;
i
<
columnSetSize
;
i
++)
{
int
index
=
metadata
.
getColumnIndexQuiet
(
columnSet
.
get
(
i
));
if
(
index
>
-
1
)
{
final
ExpressionNode
node
=
model
.
getRowTupleValues
(
t
).
getQuick
(
i
);
Function
function
=
functionParser
.
parseFunction
(
node
,
GenericRecordMetadata
.
EMPTY
,
executionContext
);
function
=
validateAndConsume
(
model
,
t
,
valueFunctions
,
metadata
,
writerTimestampIndex
,
i
,
index
,
function
,
node
.
position
,
executionContext
.
getBindVariableService
()
);
if
(
writerTimestampIndex
==
index
)
{
timestampFunction
=
function
;
}
function
=
validateAndConsume
(
model
,
valueFunctions
,
metadata
,
writerTimestampIndex
,
i
,
index
,
function
,
node
.
position
,
executionContext
.
getBindVariableService
()
);
if
(
writerTimestampIndex
==
index
)
{
timestampFunction
=
function
;
}
else
{
throw
SqlException
.
invalidColumn
(
model
.
getColumnPosition
(
i
),
columnSet
.
get
(
i
));
}
}
}
else
{
throw
SqlException
.
invalidColumn
(
model
.
getColumnPosition
(
i
),
columnSet
.
get
(
i
));
}
}
}
else
{
final
int
columnCount
=
metadata
.
getColumnCount
();
final
ObjList
<
ExpressionNode
>
values
=
model
.
getColumnValues
();
final
int
valueCount
=
values
.
size
();
if
(
columnCount
!=
valueCount
)
{
throw
SqlException
.
$
(
model
.
getEndOfValuesPosition
(),
"not enough values [expected="
).
put
(
columnCount
).
put
(
", actual="
).
put
(
values
.
size
()).
put
(
']'
);
}
valueFunctions
=
new
ObjList
<>(
columnCount
);
for
(
int
i
=
0
;
i
<
columnCount
;
i
++)
{
final
ExpressionNode
node
=
values
.
getQuick
(
i
);
Function
function
=
functionParser
.
parseFunction
(
node
,
EmptyRecordMetadata
.
INSTANCE
,
executionContext
);
validateAndConsume
(
model
,
valueFunctions
,
metadata
,
writerTimestampIndex
,
i
,
i
,
function
,
node
.
position
,
executionContext
.
getBindVariableService
()
);
final
int
columnCount
=
metadata
.
getColumnCount
();
final
ObjList
<
ExpressionNode
>
values
=
model
.
getRowTupleValues
(
t
);
final
int
valueCount
=
values
.
size
();
if
(
columnCount
!=
valueCount
)
{
throw
SqlException
.
$
(
model
.
getEndOfRowTupleValuesPosition
(
t
),
"row value count does not match column count [expected="
).
put
(
columnCount
).
put
(
", actual="
).
put
(
values
.
size
())
.
put
(
", tuple="
).
put
(
t
+
1
).
put
(
']'
);
}
valueFunctions
=
new
ObjList
<>(
columnCount
);
if
(
writerTimestampIndex
==
i
)
{
timestampFunction
=
function
;
for
(
int
i
=
0
;
i
<
columnCount
;
i
++)
{
final
ExpressionNode
node
=
values
.
getQuick
(
i
);
Function
function
=
functionParser
.
parseFunction
(
node
,
EmptyRecordMetadata
.
INSTANCE
,
executionContext
);
validateAndConsume
(
model
,
t
,
valueFunctions
,
metadata
,
writerTimestampIndex
,
i
,
i
,
function
,
node
.
position
,
executionContext
.
getBindVariableService
()
);
if
(
writerTimestampIndex
==
i
)
{
timestampFunction
=
function
;
}
}
}
// validate timestamp
if
(
writerTimestampIndex
>
-
1
&&
(
timestampFunction
==
null
||
ColumnType
.
isNull
(
timestampFunction
.
getType
())))
{
throw
SqlException
.
$
(
0
,
"insert statement must populate timestamp"
);
}
}
// validate timestamp
if
(
writerTimestampIndex
>
-
1
&&
(
timestampFunction
==
null
||
ColumnType
.
isNull
(
timestampFunction
.
getType
())))
{
throw
SqlException
.
$
(
0
,
"insert statement must populate timestamp"
);
VirtualRecord
record
=
new
VirtualRecord
(
valueFunctions
);
RecordToRowCopier
copier
=
assembleRecordToRowCopier
(
asm
,
record
,
metadata
,
listColumnFilter
);
insertStatement
.
addInsertRow
(
new
InsertRowImpl
(
record
,
copier
,
timestampFunction
)
);
}
VirtualRecord
record
=
new
VirtualRecord
(
valueFunctions
);
RecordToRowCopier
copier
=
assembleRecordToRowCopier
(
asm
,
record
,
metadata
,
listColumnFilter
);
return
compiledQuery
.
ofInsert
(
new
InsertStatementImpl
(
engine
,
Chars
.
toString
(
name
.
token
),
record
,
copier
,
timestampFunction
,
structureVersion
));
return
compiledQuery
.
ofInsert
(
insertStatement
);
}
catch
(
SqlException
e
)
{
Misc
.
freeObjList
(
valueFunctions
);
throw
e
;
...
...
@@ -2023,8 +2032,14 @@ public class SqlCompiler implements Closeable {
}
int
columnSetSize
=
model
.
getColumnSet
().
size
();
if
(
columnSetSize
>
0
&&
columnSetSize
!=
model
.
getColumnValues
().
size
())
{
throw
SqlException
.
$
(
model
.
getColumnPosition
(
0
),
"value count does not match column count"
);
for
(
int
i
=
0
,
n
=
model
.
getRowTupleCount
();
i
<
n
;
i
++)
{
if
(
columnSetSize
>
0
&&
columnSetSize
!=
model
.
getRowTupleValues
(
i
).
size
())
{
throw
SqlException
.
$
(
model
.
getEndOfRowTupleValuesPosition
(
i
),
"row value count does not match column count [expected="
).
put
(
columnSetSize
).
put
(
", actual="
).
put
(
model
.
getRowTupleValues
(
i
).
size
())
.
put
(
", tuple="
).
put
(
i
+
1
).
put
(
']'
);
}
}
return
model
;
...
...
@@ -2247,6 +2262,7 @@ public class SqlCompiler implements Closeable {
private
Function
validateAndConsume
(
InsertModel
model
,
int
tupleIndex
,
ObjList
<
Function
>
valueFunctions
,
RecordMetadata
metadata
,
int
writerTimestampIndex
,
...
...
@@ -2289,7 +2305,7 @@ public class SqlCompiler implements Closeable {
throw
SqlException
.
inconvertibleTypes
(
functionPosition
,
function
.
getType
(),
model
.
get
ColumnValues
(
).
getQuick
(
bottomUpColumnIndex
).
token
,
model
.
get
RowTupleValues
(
tupleIndex
).
getQuick
(
bottomUpColumnIndex
).
token
,
metadata
.
getColumnType
(
metadataColumnIndex
),
metadata
.
getColumnName
(
metadataColumnIndex
)
);
...
...
core/src/main/java/io/questdb/griffin/SqlParser.java
浏览文件 @
a17edcbc
...
...
@@ -1098,17 +1098,24 @@ public final class SqlParser {
}
if
(
isValuesKeyword
(
tok
))
{
expectTok
(
lexer
,
'('
);
do
{
model
.
addColumnValue
(
expectExpr
(
lexer
));
}
while
(
Chars
.
equals
((
tok
=
tok
(
lexer
,
"','"
)),
','
));
expectTok
(
tok
,
lexer
.
lastTokenPosition
(),
')'
);
model
.
setEndOfValuesPosition
(
lexer
.
lastTokenPosition
());
return
model
;
expectTok
(
lexer
,
'('
);
ObjList
<
ExpressionNode
>
rowValues
=
new
ObjList
<>();
do
{
rowValues
.
add
(
expectExpr
(
lexer
));
}
while
(
Chars
.
equals
((
tok
=
tok
(
lexer
,
"','"
)),
','
));
expectTok
(
tok
,
lexer
.
lastTokenPosition
(),
')'
);
model
.
addRowTupleValues
(
rowValues
);
model
.
addEndOfRowTupleValuesPosition
(
lexer
.
lastTokenPosition
());
tok
=
optTok
(
lexer
);
// no more tokens or ';' should indicate end of statement
if
(
tok
==
null
||
Chars
.
equals
(
tok
,
';'
))
{
return
model
;
}
expectTok
(
tok
,
lexer
.
lastTokenPosition
(),
','
);
}
while
(
true
);
}
throw
err
(
lexer
,
"'select' or 'values' expected"
);
}
...
...
core/src/main/java/io/questdb/griffin/model/InsertModel.java
浏览文件 @
a17edcbc
...
...
@@ -30,12 +30,12 @@ import io.questdb.std.str.CharSink;
public
class
InsertModel
implements
ExecutionModel
,
Mutable
,
Sinkable
{
public
static
final
ObjectFactory
<
InsertModel
>
FACTORY
=
InsertModel:
:
new
;
private
final
CharSequenceHashSet
columnSet
=
new
CharSequenceHashSet
();
private
final
ObjList
<
ExpressionNode
>
columnValues
=
new
ObjList
<>();
private
final
ObjList
<
ObjList
<
ExpressionNode
>>
rowTupleValues
=
new
ObjList
<>();
private
final
IntList
endOfRowTupleValuesPositions
=
new
IntList
();
private
final
IntList
columnPositions
=
new
IntList
();
private
ExpressionNode
tableName
;
private
QueryModel
queryModel
;
private
int
selectKeywordPosition
;
private
int
endOfValuesPosition
;
private
long
batchSize
=
-
1
;
private
long
commitLag
=
0
;
...
...
@@ -50,8 +50,8 @@ public class InsertModel implements ExecutionModel, Mutable, Sinkable {
return
false
;
}
public
void
add
ColumnValue
(
ExpressionNode
value
)
{
columnValues
.
add
(
value
);
public
void
add
RowTupleValues
(
ObjList
<
ExpressionNode
>
row
)
{
rowTupleValues
.
add
(
row
);
}
@Override
...
...
@@ -60,9 +60,12 @@ public class InsertModel implements ExecutionModel, Mutable, Sinkable {
this
.
queryModel
=
null
;
this
.
columnSet
.
clear
();
this
.
columnPositions
.
clear
();
this
.
columnValues
.
clear
();
for
(
int
i
=
0
,
n
=
this
.
rowTupleValues
.
size
();
i
<
n
;
i
++)
{
this
.
rowTupleValues
.
get
(
i
).
clear
();
}
this
.
rowTupleValues
.
clear
();
this
.
selectKeywordPosition
=
0
;
this
.
endOf
ValuesPosition
=
0
;
this
.
endOf
RowTupleValuesPositions
.
clear
()
;
this
.
batchSize
=
-
1
;
this
.
commitLag
=
0
;
}
...
...
@@ -75,8 +78,8 @@ public class InsertModel implements ExecutionModel, Mutable, Sinkable {
return
columnSet
;
}
public
ObjList
<
ExpressionNode
>
get
ColumnValues
(
)
{
return
columnValues
;
public
ObjList
<
ExpressionNode
>
get
RowTupleValues
(
int
index
)
{
return
rowTupleValues
.
get
(
index
)
;
}
public
int
getSelectKeywordPosition
()
{
...
...
@@ -116,6 +119,8 @@ public class InsertModel implements ExecutionModel, Mutable, Sinkable {
this
.
commitLag
=
lag
;
}
public
int
getRowTupleCount
()
{
return
rowTupleValues
.
size
();
}
public
ExpressionNode
getTableName
()
{
return
tableName
;
}
...
...
@@ -124,12 +129,12 @@ public class InsertModel implements ExecutionModel, Mutable, Sinkable {
this
.
tableName
=
tableName
;
}
public
int
getEndOf
ValuesPosition
(
)
{
return
endOf
ValuesPosition
;
public
int
getEndOf
RowTupleValuesPosition
(
int
index
)
{
return
endOf
RowTupleValuesPositions
.
get
(
index
)
;
}
public
void
setEndOf
ValuesPosition
(
int
endOfValuesPosition
)
{
this
.
endOfValuesPosition
=
endOfValuesPosition
;
public
void
addEndOfRowTuple
ValuesPosition
(
int
endOfValuesPosition
)
{
endOfRowTupleValuesPositions
.
add
(
endOfValuesPosition
)
;
}
@Override
...
...
@@ -158,16 +163,18 @@ public class InsertModel implements ExecutionModel, Mutable, Sinkable {
if
(
queryModel
!=
null
)
{
queryModel
.
toSink
(
sink
);
}
else
{
sink
.
put
(
"values ("
);
for
(
int
i
=
0
,
m
=
columnValues
.
size
();
i
<
m
;
i
++)
{
if
(
i
>
0
)
{
sink
.
put
(
", "
);
sink
.
put
(
"values "
);
for
(
int
t
=
0
,
s
=
rowTupleValues
.
size
();
t
<
s
;
t
++)
{
ObjList
<
ExpressionNode
>
rowValues
=
rowTupleValues
.
get
(
t
);
sink
.
put
(
'('
);
for
(
int
i
=
0
,
m
=
rowValues
.
size
();
i
<
m
;
i
++)
{
if
(
i
>
0
)
{
sink
.
put
(
", "
);
}
sink
.
put
(
rowValues
.
getQuick
(
i
));
}
sink
.
put
(
columnValues
.
getQuick
(
i
)
);
sink
.
put
(
')'
);
}
sink
.
put
(
')'
);
}
}
}
core/src/test/java/io/questdb/griffin/AlterTableAlterSymbolColumnCacheFlagTest.java
浏览文件 @
a17edcbc
...
...
@@ -123,15 +123,15 @@ public class AlterTableAlterSymbolColumnCacheFlagTest extends AbstractGriffinTes
assertMemoryLeak
(()
->
{
compiler
.
compile
(
"create table x (i int, sym symbol nocache) ;"
,
sqlExecutionContext
);
executeInsert
(
"insert into x values (1, 'GBP')
\"
"
);
executeInsert
(
"insert into x values (2, 'CHF')
\"
"
);
executeInsert
(
"insert into x values (3, 'GBP')
\"
"
);
executeInsert
(
"insert into x values (4, 'JPY')
\"
"
);
executeInsert
(
"insert into x values (5, 'USD')
\"
"
);
executeInsert
(
"insert into x values (6, 'GBP')
\"
"
);
executeInsert
(
"insert into x values (7, 'GBP')
\"
"
);
executeInsert
(
"insert into x values (8, 'GBP')
\"
"
);
executeInsert
(
"insert into x values (9, 'GBP')
\"
"
);
executeInsert
(
"insert into x values (1, 'GBP')"
);
executeInsert
(
"insert into x values (2, 'CHF')"
);
executeInsert
(
"insert into x values (3, 'GBP')"
);
executeInsert
(
"insert into x values (4, 'JPY')"
);
executeInsert
(
"insert into x values (5, 'USD')"
);
executeInsert
(
"insert into x values (6, 'GBP')"
);
executeInsert
(
"insert into x values (7, 'GBP')"
);
executeInsert
(
"insert into x values (8, 'GBP')"
);
executeInsert
(
"insert into x values (9, 'GBP')"
);
});
String
expectedOrdered
=
"sym\n"
+
...
...
core/src/test/java/io/questdb/griffin/InsertTest.java
浏览文件 @
a17edcbc
...
...
@@ -505,7 +505,7 @@ public class InsertTest extends AbstractGriffinTest {
compiler
.
compile
(
"insert into balances values (1, 'USD')"
,
sqlExecutionContext
);
}
catch
(
SqlException
e
)
{
Assert
.
assertEquals
(
37
,
e
.
getPosition
());
TestUtils
.
assertContains
(
e
.
getFlyweightMessage
(),
"
not enough values
"
);
TestUtils
.
assertContains
(
e
.
getFlyweightMessage
(),
"
row value count does not match column count [expected=3, actual=2, tuple=1]
"
);
}
});
}
...
...
@@ -734,6 +734,160 @@ public class InsertTest extends AbstractGriffinTest {
});
}
@Test
public
void
testInsertMultipleRows
()
throws
Exception
{
assertMemoryLeak
(()
->
{
compiler
.
compile
(
"create table trades (ts timestamp, sym symbol) timestamp(ts);"
,
sqlExecutionContext
);
executeInsert
(
"insert into trades VALUES (1262599200000000, 'USDJPY'), (3262599300000000, 'USDFJD');"
);
String
expected
=
"ts\tsym\n"
+
"2010-01-04T10:00:00.000000Z\tUSDJPY\n"
+
"2073-05-21T13:35:00.000000Z\tUSDFJD\n"
;
assertReader
(
expected
,
"trades"
);
});
}
@Test
public
void
testInsertMultipleRowsExtraParentheses
()
throws
Exception
{
assertMemoryLeak
(()
->
{
compiler
.
compile
(
"create table trades (i INT, sym symbol)"
,
sqlExecutionContext
);
executeInsert
(
"insert into trades VALUES ((1), 'USD'), ((2), (('FJD')));"
);
String
expected
=
"i\tsym\n"
+
"1\tUSD\n"
+
"2\tFJD\n"
;
assertReader
(
expected
,
"trades"
);
});
}
@Test
public
void
testInsertMultipleRowsOutOfOrder
()
throws
Exception
{
assertMemoryLeak
(()
->
{
compiler
.
compile
(
"create table trades (ts timestamp) timestamp(ts);"
,
sqlExecutionContext
);
try
{
executeInsert
(
"insert into trades VALUES (1), (3), (2);"
);
}
catch
(
CairoException
e
)
{
TestUtils
.
assertContains
(
e
.
getFlyweightMessage
(),
"Cannot insert rows out of order."
);
}
});
}
@Test
public
void
testInsertMultipleRowsFailTypeConversion
()
throws
Exception
{
assertMemoryLeak
(()
->
{
compiler
.
compile
(
"create table trades (sym symbol)"
,
sqlExecutionContext
);
try
{
compiler
.
compile
(
"insert into trades VALUES ('USDJPY'), (1), ('USDFJD');"
,
sqlExecutionContext
);
}
catch
(
SqlException
e
)
{
Assert
.
assertEquals
(
39
,
e
.
getPosition
());
TestUtils
.
assertContains
(
e
.
getFlyweightMessage
(),
"inconvertible types: INT -> SYMBOL [from=1, to=sym]"
);
}
});
}
@Test
public
void
testInsertMultipleRowsFailInvalidSyntax
()
throws
Exception
{
assertMemoryLeak
(()
->
{
compiler
.
compile
(
"create table trades (i int, sym symbol)"
,
sqlExecutionContext
);
// No comma delimiter between rows
try
{
compiler
.
compile
(
"insert into trades VALUES (1, 'USDJPY')(2, 'USDFJD');"
,
sqlExecutionContext
);
}
catch
(
SqlException
e
)
{
Assert
.
assertEquals
(
39
,
e
.
getPosition
());
TestUtils
.
assertContains
(
e
.
getFlyweightMessage
(),
"',' expected"
);
}
// Empty row
try
{
compiler
.
compile
(
"insert into trades VALUES (1, 'USDJPY'), ();"
,
sqlExecutionContext
);
}
catch
(
SqlException
e
)
{
Assert
.
assertEquals
(
42
,
e
.
getPosition
());
TestUtils
.
assertContains
(
e
.
getFlyweightMessage
(),
"Expression expected"
);
}
// Empty row with comma delimiter inside
try
{
compiler
.
compile
(
"insert into trades VALUES (1, 'USDJPY'), (2, 'USDFJD'), (,);"
,
sqlExecutionContext
);
}
catch
(
SqlException
e
)
{
Assert
.
assertEquals
(
57
,
e
.
getPosition
());
TestUtils
.
assertContains
(
e
.
getFlyweightMessage
(),
"Expression expected"
);
}
// Empty row column
try
{
compiler
.
compile
(
"insert into trades VALUES (1, 'USDJPY'), (2, 'USDFJD'), (3,);"
,
sqlExecutionContext
);
}
catch
(
SqlException
e
)
{
Assert
.
assertEquals
(
59
,
e
.
getPosition
());
TestUtils
.
assertContains
(
e
.
getFlyweightMessage
(),
"Expression expected"
);
}
// Multi row insert can't end in comma token
try
{
compiler
.
compile
(
"insert into trades VALUES (1, 'USDJPY'), (2, 'USDFJD'),;"
,
sqlExecutionContext
);
}
catch
(
SqlException
e
)
{
Assert
.
assertEquals
(
55
,
e
.
getPosition
());
TestUtils
.
assertContains
(
e
.
getFlyweightMessage
(),
"'(' expected"
);
}
});
}
@Test
public
void
testInsertMultipleRowsFailRowWrongColumnCount
()
throws
Exception
{
assertMemoryLeak
(()
->
{
compiler
.
compile
(
"create table trades (i int, sym symbol)"
,
sqlExecutionContext
);
try
{
compiler
.
compile
(
"insert into trades VALUES (1, 'USDJPY'), ('USDFJD');"
,
sqlExecutionContext
);
}
catch
(
SqlException
e
)
{
Assert
.
assertEquals
(
50
,
e
.
getPosition
());
TestUtils
.
assertContains
(
e
.
getFlyweightMessage
(),
"row value count does not match column count [expected=2, actual=1, tuple=2]"
);
}
});
}
@Test
public
void
testInsertMultipleRowsBindVariables
()
throws
Exception
{
assertMemoryLeak
(()
->
{
compiler
.
compile
(
"create table trades (ts timestamp, sym symbol) timestamp(ts);"
,
sqlExecutionContext
);
final
String
sql
=
"insert into trades VALUES (1262599200000000, $1), (3262599300000000, $2);"
;
final
CompiledQuery
cq
=
compiler
.
compile
(
sql
,
sqlExecutionContext
);
Assert
.
assertEquals
(
CompiledQuery
.
INSERT
,
cq
.
getType
());
InsertStatement
insert
=
cq
.
getInsertStatement
();
try
(
InsertMethod
method
=
insert
.
createMethod
(
sqlExecutionContext
))
{
bindVariableService
.
setStr
(
0
,
"USDJPY"
);
bindVariableService
.
setStr
(
1
,
"USDFJD"
);
method
.
execute
();
method
.
commit
();
}
String
expected
=
"ts\tsym\n"
+
"2010-01-04T10:00:00.000000Z\tUSDJPY\n"
+
"2073-05-21T13:35:00.000000Z\tUSDFJD\n"
;
assertReader
(
expected
,
"trades"
);
});
}
@Test
public
void
testInsertMultipleRowsMissingBindVariables
()
throws
Exception
{
assertMemoryLeak
(()
->
{
compiler
.
compile
(
"create table t (ts timestamp, i int) timestamp(ts);"
,
sqlExecutionContext
);
final
String
sql
=
"insert into t VALUES (1262599200000000, $1), (3262599300000000, $2);"
;
final
CompiledQuery
cq
=
compiler
.
compile
(
sql
,
sqlExecutionContext
);
Assert
.
assertEquals
(
CompiledQuery
.
INSERT
,
cq
.
getType
());
InsertStatement
insert
=
cq
.
getInsertStatement
();
try
(
InsertMethod
method
=
insert
.
createMethod
(
sqlExecutionContext
))
{
bindVariableService
.
setInt
(
0
,
1
);
method
.
execute
();
method
.
commit
();
}
String
expected
=
"ts\ti\n"
+
"2010-01-04T10:00:00.000000Z\t1\n"
+
"2073-05-21T13:35:00.000000Z\tNaN\n"
;
assertReader
(
expected
,
"t"
);
});
}
private
void
assertInsertTimestamp
(
String
expected
,
String
ddl2
,
String
exceptionType
,
boolean
commitInsert
)
throws
Exception
{
if
(
commitInsert
)
{
compiler
.
compile
(
"create table tab(seq long, ts timestamp) timestamp(ts)"
,
sqlExecutionContext
);
...
...
core/src/test/java/io/questdb/griffin/SimulatedDeleteTest.java
浏览文件 @
a17edcbc
...
...
@@ -38,7 +38,7 @@ public class SimulatedDeleteTest extends AbstractGriffinTest {
execInsert
(
compiler
.
compile
(
"insert into balances (cust_id, balance_ccy, balance, timestamp) values (1, 'EUR', 650.50, 6000000002);"
,
sqlExecutionContext
).
getInsertStatement
());
execInsert
(
compiler
.
compile
(
"insert into balances (cust_id, balance_ccy, balance, timestamp) values (2, 'USD', 900.75, 6000000003);"
,
sqlExecutionContext
).
getInsertStatement
());
execInsert
(
compiler
.
compile
(
"insert into balances (cust_id, balance_ccy, balance, timestamp) values (2, 'EUR', 880.20, 6000000004);"
,
sqlExecutionContext
).
getInsertStatement
());
execInsert
(
compiler
.
compile
(
"insert into balances (cust_id, balance_ccy, inactive, timestamp) values (1, 'USD', true, 6000000006)
)
;"
,
sqlExecutionContext
).
getInsertStatement
());
execInsert
(
compiler
.
compile
(
"insert into balances (cust_id, balance_ccy, inactive, timestamp) values (1, 'USD', true, 6000000006);"
,
sqlExecutionContext
).
getInsertStatement
());
assertSql
(
"(select * from balances latest by balance_ccy where cust_id=1) where not inactive;"
,
...
...
core/src/test/java/io/questdb/griffin/SqlParserTest.java
浏览文件 @
a17edcbc
...
...
@@ -2775,7 +2775,7 @@ public class SqlParserTest extends AbstractSqlParserTest {
@Test
public
void
testInsertColumnValueMismatch
()
throws
Exception
{
assertSyntaxError
(
"insert into x (a,b) values (?)"
,
15
,
29
,
"value count does not match column count"
,
modelOf
(
"x"
)
.
col
(
"a"
,
ColumnType
.
INT
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录