Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
33c191c2
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
33c191c2
编写于
5月 13, 2022
作者:
S
slzhou
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of github.com:taosdata/TDengine into feature/udf
上级
0f22b96f
00b17f86
变更
31
展开全部
显示空白变更内容
内联
并排
Showing
31 changed file
with
2136 addition
and
1257 deletion
+2136
-1257
example/src/tmq.c
example/src/tmq.c
+1
-1
include/common/ttypes.h
include/common/ttypes.h
+1
-0
include/libs/function/function.h
include/libs/function/function.h
+5
-21
include/libs/parser/parser.h
include/libs/parser/parser.h
+1
-1
source/client/inc/clientStmt.h
source/client/inc/clientStmt.h
+1
-0
source/client/src/clientMain.c
source/client/src/clientMain.c
+2
-1
source/client/src/clientSml.c
source/client/src/clientSml.c
+1323
-800
source/client/src/clientStmt.c
source/client/src/clientStmt.c
+7
-3
source/client/test/smlTest.cpp
source/client/test/smlTest.cpp
+275
-44
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+1
-0
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+0
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+2
-2
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+16
-4
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+57
-56
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+1
-1
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+77
-25
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+5
-5
source/libs/function/inc/builtinsimpl.h
source/libs/function/inc/builtinsimpl.h
+4
-0
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+31
-0
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+101
-16
source/libs/function/src/texpr.c
source/libs/function/src/texpr.c
+5
-119
source/libs/parser/src/parInsert.c
source/libs/parser/src/parInsert.c
+14
-15
source/libs/planner/src/planner.c
source/libs/planner/src/planner.c
+1
-1
source/libs/scalar/CMakeLists.txt
source/libs/scalar/CMakeLists.txt
+1
-1
source/libs/scalar/inc/sclInt.h
source/libs/scalar/inc/sclInt.h
+2
-0
source/libs/scalar/src/filter.c
source/libs/scalar/src/filter.c
+7
-15
source/libs/scalar/src/scalar.c
source/libs/scalar/src/scalar.c
+28
-3
source/libs/scalar/src/sclfunc.c
source/libs/scalar/src/sclfunc.c
+19
-3
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+7
-6
source/libs/wal/src/walSeek.c
source/libs/wal/src/walSeek.c
+4
-4
tests/script/api/batchprepare.c
tests/script/api/batchprepare.c
+137
-109
未找到文件。
example/src/tmq.c
浏览文件 @
33c191c2
...
...
@@ -22,7 +22,7 @@
static
int
running
=
1
;
static
void
msg_process
(
TAOS_RES
*
msg
)
{
char
buf
[
1024
];
memset
(
buf
,
0
,
1024
);
/*memset(buf, 0, 1024);*/
printf
(
"topic: %s
\n
"
,
tmq_get_topic_name
(
msg
));
printf
(
"vg: %d
\n
"
,
tmq_get_vgroup_id
(
msg
));
while
(
1
)
{
...
...
include/common/ttypes.h
浏览文件 @
33c191c2
...
...
@@ -50,6 +50,7 @@ typedef struct {
#define varDataLenByData(v) (*(VarDataLenT *)(((char *)(v)) - VARSTR_HEADER_SIZE))
#define varDataSetLen(v, _len) (((VarDataLenT *)(v))[0] = (VarDataLenT)(_len))
#define IS_VAR_DATA_TYPE(t) (((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_NCHAR) || ((t) == TSDB_DATA_TYPE_JSON))
#define IS_STR_DATA_TYPE(t) (((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_NCHAR))
#define varDataNetLen(v) (htons(((VarDataLenT *)(v))[0]))
#define varDataNetTLen(v) (sizeof(VarDataLenT) + varDataNetLen(v))
...
...
include/libs/function/function.h
浏览文件 @
33c191c2
...
...
@@ -126,7 +126,7 @@ enum {
enum
{
MAIN_SCAN
=
0x0u
,
REVERSE_SCAN
=
0x1u
,
REVERSE_SCAN
=
0x1u
,
// todo remove it
REPEAT_SCAN
=
0x2u
,
//repeat scan belongs to the master scan
MERGE_STAGE
=
0x20u
,
};
...
...
@@ -222,13 +222,6 @@ enum {
typedef
struct
tExprNode
{
int32_t
nodeType
;
union
{
struct
{
int32_t
optr
;
// binary operator
void
*
info
;
// support filter operation on this expression only available for leaf node
struct
tExprNode
*
pLeft
;
// left child pointer
struct
tExprNode
*
pRight
;
// right child pointer
}
_node
;
SSchema
*
pSchema
;
// column node
struct
SVariant
*
pVal
;
// value node
...
...
@@ -237,12 +230,6 @@ typedef struct tExprNode {
int32_t
functionId
;
int32_t
num
;
struct
SFunctionNode
*
pFunctNode
;
// Note that the attribute of pChild is not the parameter of function, it is the columns that involved in the
// calculation instead.
// E.g., Cov(col1, col2), the column information, w.r.t. the col1 and col2, is kept in pChild nodes.
// The concat function, concat(col1, col2), is a binary scalar
// operator and is kept in the attribute of _node.
struct
tExprNode
**
pChild
;
}
_function
;
struct
{
...
...
@@ -273,6 +260,7 @@ typedef struct SAggFunctionInfo {
struct
SScalarParam
{
SColumnInfoData
*
columnData
;
SHashObj
*
pHashFilter
;
void
*
param
;
// other parameter, such as meta handle from vnode, to extract table name/tag value
int32_t
numOfRows
;
};
...
...
@@ -281,10 +269,6 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
bool
qIsValidUdf
(
SArray
*
pUdfInfo
,
const
char
*
name
,
int32_t
len
,
int32_t
*
functionId
);
tExprNode
*
exprTreeFromBinary
(
const
void
*
data
,
size_t
size
);
tExprNode
*
exprdup
(
tExprNode
*
pTree
);
void
resetResultRowEntryResult
(
SqlFunctionCtx
*
pCtx
,
int32_t
num
);
void
cleanupResultRowEntry
(
struct
SResultRowEntryInfo
*
pCell
);
int32_t
getNumOfResult
(
SqlFunctionCtx
*
pCtx
,
int32_t
num
,
SSDataBlock
*
pResBlock
);
...
...
include/libs/parser/parser.h
浏览文件 @
33c191c2
...
...
@@ -77,7 +77,7 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char*
void
*
smlInitHandle
(
SQuery
*
pQuery
);
void
smlDestroyHandle
(
void
*
pHandle
);
int32_t
smlBindData
(
void
*
handle
,
SArray
*
tags
,
SArray
*
cols
Format
,
SArray
*
cols
Schema
,
SArray
*
cols
,
bool
format
,
int32_t
smlBindData
(
void
*
handle
,
SArray
*
tags
,
SArray
*
colsSchema
,
SArray
*
cols
,
bool
format
,
STableMeta
*
pTableMeta
,
char
*
tableName
,
char
*
msgBuf
,
int16_t
msgBufLen
);
int32_t
smlBuildOutput
(
void
*
handle
,
SHashObj
*
pVgHash
);
...
...
source/client/inc/clientStmt.h
浏览文件 @
33c191c2
...
...
@@ -71,6 +71,7 @@ typedef struct SStmtBindInfo {
typedef
struct
SStmtExecInfo
{
int32_t
affectedRows
;
bool
emptyRes
;
SRequestObj
*
pRequest
;
SHashObj
*
pVgHash
;
SHashObj
*
pBlockHash
;
...
...
source/client/src/clientMain.c
浏览文件 @
33c191c2
...
...
@@ -303,6 +303,7 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
break
;
}
}
str
[
len
]
=
0
;
return
len
;
}
...
...
@@ -567,7 +568,7 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param
// todo directly call fp
}
taos_query_l
(
taos
,
sql
,
(
int32_t
)
strlen
(
sql
));
taos_query_l
(
taos
,
sql
,
(
int32_t
)
strlen
(
sql
));
}
void
taos_fetch_rows_a
(
TAOS_RES
*
res
,
__taos_async_fn_t
fp
,
void
*
param
)
{
...
...
source/client/src/clientSml.c
浏览文件 @
33c191c2
此差异已折叠。
点击以展开。
source/client/src/clientStmt.c
浏览文件 @
33c191c2
...
...
@@ -279,6 +279,7 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool freeRequest) {
}
pStmt
->
exec
.
autoCreateTbl
=
false
;
pStmt
->
exec
.
emptyRes
=
false
;
if
(
keepTable
)
{
return
TSDB_CODE_SUCCESS
;
...
...
@@ -628,8 +629,7 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
STMT_ERR_RET
(
stmtRestoreQueryFields
(
pStmt
));
}
bool
emptyResult
=
false
;
STMT_RET
(
qStmtBindParam
(
pStmt
->
sql
.
pQueryPlan
,
bind
,
colIdx
,
pStmt
->
exec
.
pRequest
->
requestId
,
&
emptyResult
));
STMT_RET
(
qStmtBindParam
(
pStmt
->
sql
.
pQueryPlan
,
bind
,
colIdx
,
pStmt
->
exec
.
pRequest
->
requestId
,
&
pStmt
->
exec
.
emptyRes
));
}
STableDataBlocks
**
pDataBlock
=
(
STableDataBlocks
**
)
taosHashGet
(
pStmt
->
exec
.
pBlockHash
,
pStmt
->
bInfo
.
tbFName
,
strlen
(
pStmt
->
bInfo
.
tbFName
));
...
...
@@ -736,7 +736,11 @@ int stmtExec(TAOS_STMT *stmt) {
STMT_ERR_RET
(
stmtSwitchStatus
(
pStmt
,
STMT_EXECUTE
));
if
(
STMT_TYPE_QUERY
==
pStmt
->
sql
.
type
)
{
if
(
pStmt
->
exec
.
emptyRes
)
{
pStmt
->
exec
.
pRequest
->
type
=
TSDB_SQL_RETRIEVE_EMPTY_RESULT
;
}
else
{
scheduleQuery
(
pStmt
->
exec
.
pRequest
,
pStmt
->
sql
.
pQueryPlan
,
pStmt
->
sql
.
nodeList
,
NULL
);
}
}
else
{
STMT_ERR_RET
(
qBuildStmtOutput
(
pStmt
->
sql
.
pQuery
,
pStmt
->
exec
.
pVgHash
,
pStmt
->
exec
.
pBlockHash
));
launchQueryImpl
(
pStmt
->
exec
.
pRequest
,
pStmt
->
sql
.
pQuery
,
TSDB_CODE_SUCCESS
,
true
,
(
autoCreateTbl
?
(
void
**
)
&
pRsp
:
NULL
));
...
...
source/client/test/smlTest.cpp
浏览文件 @
33c191c2
...
...
@@ -33,7 +33,7 @@ int main(int argc, char **argv) {
return
RUN_ALL_TESTS
();
}
TEST
(
testCase
,
smlParseString_Test
)
{
TEST
(
testCase
,
smlParse
Influx
String_Test
)
{
char
msg
[
256
]
=
{
0
};
SSmlMsgBuf
msgBuf
;
msgBuf
.
buf
=
msg
;
...
...
@@ -42,7 +42,7 @@ TEST(testCase, smlParseString_Test) {
// case 1
char
*
sql
=
"st,t1=3,t2=4,t3=t3 c1=3i64,c3=
\"
passit hello,c1=2
\"
,c2=false,c4=4f64 1626006833639000000 ,32,c=3"
;
int
ret
=
smlParseString
(
sql
,
&
elements
,
&
msgBuf
);
int
ret
=
smlParse
Influx
String
(
sql
,
&
elements
,
&
msgBuf
);
ASSERT_EQ
(
ret
,
0
);
ASSERT_EQ
(
elements
.
measure
,
sql
);
ASSERT_EQ
(
elements
.
measureLen
,
strlen
(
"st"
));
...
...
@@ -60,13 +60,13 @@ TEST(testCase, smlParseString_Test) {
// case 2 false
sql
=
"st,t1=3,t2=4,t3=t3 c1=3i64,c3=
\"
passit hello,c1=2,c2=false,c4=4f64 1626006833639000000"
;
memset
(
&
elements
,
0
,
sizeof
(
SSmlLineInfo
));
ret
=
smlParseString
(
sql
,
&
elements
,
&
msgBuf
);
ret
=
smlParse
Influx
String
(
sql
,
&
elements
,
&
msgBuf
);
ASSERT_NE
(
ret
,
0
);
// case 3 false
sql
=
"st, t1=3,t2=4,t3=t3 c1=3i64,c3=
\"
passit hello,c1=2,c2=false,c4=4f64 1626006833639000000"
;
memset
(
&
elements
,
0
,
sizeof
(
SSmlLineInfo
));
ret
=
smlParseString
(
sql
,
&
elements
,
&
msgBuf
);
ret
=
smlParse
Influx
String
(
sql
,
&
elements
,
&
msgBuf
);
ASSERT_EQ
(
ret
,
0
);
ASSERT_EQ
(
elements
.
cols
,
sql
+
elements
.
measureTagsLen
+
2
);
ASSERT_EQ
(
elements
.
colsLen
,
strlen
(
"t1=3,t2=4,t3=t3"
));
...
...
@@ -74,7 +74,7 @@ TEST(testCase, smlParseString_Test) {
// case 4 tag is null
sql
=
"st, c1=3i64,c3=
\"
passit hello,c1=2
\"
,c2=false,c4=4f64 1626006833639000000"
;
memset
(
&
elements
,
0
,
sizeof
(
SSmlLineInfo
));
ret
=
smlParseString
(
sql
,
&
elements
,
&
msgBuf
);
ret
=
smlParse
Influx
String
(
sql
,
&
elements
,
&
msgBuf
);
ASSERT_EQ
(
ret
,
0
);
ASSERT_EQ
(
elements
.
measure
,
sql
);
ASSERT_EQ
(
elements
.
measureLen
,
strlen
(
"st"
));
...
...
@@ -92,7 +92,7 @@ TEST(testCase, smlParseString_Test) {
// case 5 tag is null
sql
=
" st c1=3i64,c3=
\"
passit hello,c1=2
\"
,c2=false,c4=4f64 1626006833639000000 "
;
memset
(
&
elements
,
0
,
sizeof
(
SSmlLineInfo
));
ret
=
smlParseString
(
sql
,
&
elements
,
&
msgBuf
);
ret
=
smlParse
Influx
String
(
sql
,
&
elements
,
&
msgBuf
);
sql
++
;
ASSERT_EQ
(
ret
,
0
);
ASSERT_EQ
(
elements
.
measure
,
sql
);
...
...
@@ -111,13 +111,13 @@ TEST(testCase, smlParseString_Test) {
// case 6
sql
=
" st c1=3i64,c3=
\"
passit hello,c1=2
\"
,c2=false,c4=4f64 "
;
memset
(
&
elements
,
0
,
sizeof
(
SSmlLineInfo
));
ret
=
smlParseString
(
sql
,
&
elements
,
&
msgBuf
);
ret
=
smlParse
Influx
String
(
sql
,
&
elements
,
&
msgBuf
);
ASSERT_EQ
(
ret
,
0
);
// case 7
sql
=
" st , "
;
memset
(
&
elements
,
0
,
sizeof
(
SSmlLineInfo
));
ret
=
smlParseString
(
sql
,
&
elements
,
&
msgBuf
);
ret
=
smlParse
Influx
String
(
sql
,
&
elements
,
&
msgBuf
);
sql
++
;
ASSERT_EQ
(
ret
,
0
);
ASSERT_EQ
(
elements
.
cols
,
sql
+
elements
.
measureTagsLen
+
3
);
...
...
@@ -126,7 +126,7 @@ TEST(testCase, smlParseString_Test) {
// case 8 false
sql
=
", st , "
;
memset
(
&
elements
,
0
,
sizeof
(
SSmlLineInfo
));
ret
=
smlParseString
(
sql
,
&
elements
,
&
msgBuf
);
ret
=
smlParse
Influx
String
(
sql
,
&
elements
,
&
msgBuf
);
ASSERT_NE
(
ret
,
0
);
}
...
...
@@ -140,15 +140,13 @@ TEST(testCase, smlParseCols_Error_Test) {
"c=f64"
,
// double
"c=8f64f"
,
"c=8ef64"
,
"c=1.7976931348623158e+390f64"
,
"c=f32"
,
// float
"c=8f32f"
,
"c=8wef32"
,
"c=-3.402823466e+39f32"
,
"c="
,
//
float
"c="
,
//
double
"c=8f"
,
"c=8we"
,
"c=3.402823466e+39"
,
"c=i8"
,
// tiny int
"c=-8i8f"
,
"c=8wei8"
,
...
...
@@ -218,7 +216,7 @@ TEST(testCase, smlParseCols_tag_Test) {
SHashObj
*
dumplicateKey
=
taosHashInit
(
32
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
const
char
*
data
=
"cbin=
\"
passit hello
,c=2
\"
,cnch=L
\"
iisdfsf
\"
,cbool=false,cf64=4.31f64,cf32
_=8.32,cf32=8.23f32,ci8=-34i8,cu8=89u8,ci16=233i16,cu16=898u16,ci32=98289i32,cu32=12323u32,ci64=-89238i64,ci=989i,cu64=8989323u64,cbooltrue=true,cboolt=t,cboolf=f,cnch_=l
\"
iuwq
\"
"
;
"cbin=
\"
passit hello
c=2
\"
,cnch=L
\"
iisdfsf
\"
,cbool=false,cf64=4.31f64,cf64
_=8.32,cf32=8.23f32,ci8=-34i8,cu8=89u8,ci16=233i16,cu16=898u16,ci32=98289i32,cu32=12323u32,ci64=-89238i64,ci=989i,cu64=8989323u64,cbooltrue=true,cboolt=t,cboolf=f,cnch_=l
\"
iuwq
\"
"
;
int32_t
len
=
strlen
(
data
);
int32_t
ret
=
smlParseCols
(
data
,
len
,
cols
,
true
,
dumplicateKey
,
&
msgBuf
);
ASSERT_EQ
(
ret
,
TSDB_CODE_SUCCESS
);
...
...
@@ -230,7 +228,7 @@ TEST(testCase, smlParseCols_tag_Test) {
ASSERT_EQ
(
strncasecmp
(
kv
->
key
,
"cbin"
,
4
),
0
);
ASSERT_EQ
(
kv
->
keyLen
,
4
);
ASSERT_EQ
(
kv
->
type
,
TSDB_DATA_TYPE_NCHAR
);
ASSERT_EQ
(
kv
->
valueLen
,
1
8
);
ASSERT_EQ
(
kv
->
valueLen
,
1
7
);
ASSERT_EQ
(
strncasecmp
(
kv
->
value
,
"
\"
passit"
,
7
),
0
);
taosMemoryFree
(
kv
);
...
...
@@ -280,7 +278,7 @@ TEST(testCase, smlParseCols_Test) {
SHashObj
*
dumplicateKey
=
taosHashInit
(
32
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
const
char
*
data
=
"cbin=
\"
passit hello,c=2
\"
,cnch=L
\"
iisdfsf
\"
,cbool=false,cf64=4.31f64,cf
32
_=8.32,cf32=8.23f32,ci8=-34i8,cu8=89u8,ci16=233i16,cu16=898u16,ci32=98289i32,cu32=12323u32,ci64=-89238i64,ci=989i,cu64=8989323u64,cbooltrue=true,cboolt=t,cboolf=f,cnch_=l
\"
iuwq
\"
"
;
const
char
*
data
=
"cbin=
\"
passit hello,c=2
\"
,cnch=L
\"
iisdfsf
\"
,cbool=false,cf64=4.31f64,cf
64
_=8.32,cf32=8.23f32,ci8=-34i8,cu8=89u8,ci16=233i16,cu16=898u16,ci32=98289i32,cu32=12323u32,ci64=-89238i64,ci=989i,cu64=8989323u64,cbooltrue=true,cboolt=t,cboolf=f,cnch_=l
\"
iuwq
\"
"
;
int32_t
len
=
strlen
(
data
);
int32_t
ret
=
smlParseCols
(
data
,
len
,
cols
,
false
,
dumplicateKey
,
&
msgBuf
);
ASSERT_EQ
(
ret
,
TSDB_CODE_SUCCESS
);
...
...
@@ -321,17 +319,17 @@ TEST(testCase, smlParseCols_Test) {
ASSERT_EQ
(
kv
->
type
,
TSDB_DATA_TYPE_DOUBLE
);
ASSERT_EQ
(
kv
->
length
,
8
);
//ASSERT_EQ(kv->d, 4.31);
printf
(
"4.31 = kv->
f
:%f
\n
"
,
kv
->
d
);
printf
(
"4.31 = kv->
d
:%f
\n
"
,
kv
->
d
);
taosMemoryFree
(
kv
);
// float
kv
=
(
SSmlKv
*
)
taosArrayGetP
(
cols
,
4
);
ASSERT_EQ
(
strncasecmp
(
kv
->
key
,
"cf
32
_"
,
5
),
0
);
ASSERT_EQ
(
strncasecmp
(
kv
->
key
,
"cf
64
_"
,
5
),
0
);
ASSERT_EQ
(
kv
->
keyLen
,
5
);
ASSERT_EQ
(
kv
->
type
,
TSDB_DATA_TYPE_
FLOAT
);
ASSERT_EQ
(
kv
->
length
,
4
);
ASSERT_EQ
(
kv
->
type
,
TSDB_DATA_TYPE_
DOUBLE
);
ASSERT_EQ
(
kv
->
length
,
8
);
//ASSERT_EQ(kv->f, 8.32);
printf
(
"8.32 = kv->
f:%f
\n
"
,
kv
->
f
);
printf
(
"8.32 = kv->
d:%f
\n
"
,
kv
->
d
);
taosMemoryFree
(
kv
);
// float
...
...
@@ -467,7 +465,7 @@ TEST(testCase, smlParseCols_Test) {
taosHashCleanup
(
dumplicateKey
);
}
TEST
(
testCase
,
smlP
arseLine
_Test
)
{
TEST
(
testCase
,
smlP
rocess_influx
_Test
)
{
TAOS
*
taos
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
ASSERT_NE
(
taos
,
nullptr
);
...
...
@@ -483,7 +481,7 @@ TEST(testCase, smlParseLine_Test) {
SSmlHandle
*
info
=
smlBuildSmlInfo
(
taos
,
request
,
TSDB_SML_LINE_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
,
true
);
ASSERT_NE
(
info
,
nullptr
);
const
char
*
sql
[
9
]
=
{
const
char
*
sql
[
11
]
=
{
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0 1451606400000000000"
,
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451607400000000000"
,
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,heading=221,grade=0,fuel_consumption=25 1451608400000000000"
,
...
...
@@ -492,14 +490,24 @@ TEST(testCase, smlParseLine_Test) {
"readings,name=truck_1,fleet=South,driver=Albert,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=72.45258,longitude=68.83761,elevation=255,velocity=0,heading=181,grade=0,fuel_consumption=25 1451606400000000000"
,
"readings,name=truck_2,driver=Derek,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451606400000000000"
,
"readings,name=truck_2,fleet=North,driver=Derek,model=F-150 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451609400000000000"
,
"readings,fleet=South,name=truck_0,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451629400000000000"
"readings,fleet=South,name=truck_0,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451629400000000000"
,
"stable,t1=t1,t2=t2,t3=t3 c1=1,c2=2,c3=3,c4=4 1451629500000000000"
,
"stable,t2=t2,t1=t1,t3=t3 c1=1,c3=3,c4=4 1451629600000000000"
};
smlInsertLines
(
info
,
(
char
**
)
sql
,
9
);
// for (int i = 0; i < 3; i++) {
// smlParseLine(info, sql[i]);
// }
smlProcess
(
info
,
(
char
**
)
sql
,
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]));
TAOS_RES
*
res
=
taos_query
(
taos
,
"select * from t_6885c584b98481584ee13dac399e173d"
);
ASSERT_NE
(
res
,
nullptr
);
int
fieldNum
=
taos_field_count
(
res
);
ASSERT_EQ
(
fieldNum
,
11
);
int
rowNum
=
taos_affected_rows
(
res
);
ASSERT_EQ
(
rowNum
,
2
);
for
(
int
i
=
0
;
i
<
rowNum
;
++
i
)
{
TAOS_ROW
rows
=
taos_fetch_row
(
res
);
}
}
// different types
TEST
(
testCase
,
smlParseLine_error_Test
)
{
TAOS
*
taos
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
ASSERT_NE
(
taos
,
nullptr
);
...
...
@@ -520,24 +528,247 @@ TEST(testCase, smlParseLine_error_Test) {
"measure,t1=3 c1=8"
,
"measure,t2=3 c1=8u8"
};
int
ret
=
sml
InsertLines
(
info
,
(
char
**
)
sql
,
2
);
int
ret
=
sml
Process
(
info
,
(
char
**
)
sql
,
sizeof
(
sql
)
/
sizeof
(
sql
[
0
])
);
ASSERT_NE
(
ret
,
0
);
}
// TEST(testCase, smlParseTS_Test) {
// char msg[256] = {0};
// SSmlMsgBuf msgBuf;
// msgBuf.buf = msg;
// msgBuf.len = 256;
// SSmlLineInfo elements = {0};
//
// SSmlHandle* info = smlBuildSmlInfo(taos, request, protocol, precision, dataFormat);
// if(!info){
// return (TAOS_RES*)request;
// }
// ret = smlParseTS(info, elements.timestamp, elements.timestampLen, cols);
// if(ret != TSDB_CODE_SUCCESS){
// uError("SML:0x%"PRIx64" smlParseTS failed", info->id);
// return ret;
// }
TEST
(
testCase
,
smlGetTimestampLen_Test
)
{
uint8_t
len
=
smlGetTimestampLen
(
0
);
ASSERT_EQ
(
len
,
1
);
len
=
smlGetTimestampLen
(
1
);
ASSERT_EQ
(
len
,
1
);
len
=
smlGetTimestampLen
(
10
);
ASSERT_EQ
(
len
,
2
);
len
=
smlGetTimestampLen
(
390
);
ASSERT_EQ
(
len
,
3
);
len
=
smlGetTimestampLen
(
-
1
);
ASSERT_EQ
(
len
,
1
);
len
=
smlGetTimestampLen
(
-
10
);
ASSERT_EQ
(
len
,
2
);
len
=
smlGetTimestampLen
(
-
390
);
ASSERT_EQ
(
len
,
3
);
}
TEST
(
testCase
,
smlProcess_telnet_Test
)
{
TAOS
*
taos
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
ASSERT_NE
(
taos
,
nullptr
);
TAOS_RES
*
pRes
=
taos_query
(
taos
,
"create database if not exists sml_db"
);
taos_free_result
(
pRes
);
pRes
=
taos_query
(
taos
,
"use sml_db"
);
taos_free_result
(
pRes
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
NULL
,
TSDB_SQL_INSERT
);
ASSERT_NE
(
request
,
nullptr
);
SSmlHandle
*
info
=
smlBuildSmlInfo
(
taos
,
request
,
TSDB_SML_TELNET_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
,
true
);
ASSERT_NE
(
info
,
nullptr
);
const
char
*
sql
[
4
]
=
{
"sys.if.bytes.out 1479496100 1.3E0 host=web01 interface=eth0"
,
"sys.if.bytes.out 1479496101 1.3E1 interface=eth0 host=web01 "
,
"sys.if.bytes.out 1479496102 1.3E3 network=tcp"
,
"sys.procs.running 1479496100 42 host=web01"
};
int
ret
=
smlProcess
(
info
,
(
char
**
)
sql
,
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]));
ASSERT_EQ
(
ret
,
0
);
TAOS_RES
*
res
=
taos_query
(
taos
,
"select * from t_8c30283b3c4131a071d1e16cf6d7094a"
);
ASSERT_NE
(
res
,
nullptr
);
int
fieldNum
=
taos_field_count
(
res
);
ASSERT_EQ
(
fieldNum
,
2
);
int
rowNum
=
taos_affected_rows
(
res
);
ASSERT_EQ
(
rowNum
,
1
);
for
(
int
i
=
0
;
i
<
rowNum
;
++
i
)
{
TAOS_ROW
rows
=
taos_fetch_row
(
res
);
}
res
=
taos_query
(
taos
,
"select * from t_6931529054e5637ca92c78a1ad441961"
);
ASSERT_NE
(
res
,
nullptr
);
fieldNum
=
taos_field_count
(
res
);
ASSERT_EQ
(
fieldNum
,
2
);
rowNum
=
taos_affected_rows
(
res
);
ASSERT_EQ
(
rowNum
,
2
);
for
(
int
i
=
0
;
i
<
rowNum
;
++
i
)
{
TAOS_ROW
rows
=
taos_fetch_row
(
res
);
}
}
TEST
(
testCase
,
smlProcess_json_Test
)
{
TAOS
*
taos
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
ASSERT_NE
(
taos
,
nullptr
);
TAOS_RES
*
pRes
=
taos_query
(
taos
,
"create database if not exists sml_db"
);
taos_free_result
(
pRes
);
pRes
=
taos_query
(
taos
,
"use sml_db"
);
taos_free_result
(
pRes
);
SRequestObj
*
request
=
(
SRequestObj
*
)
createRequest
((
STscObj
*
)
taos
,
NULL
,
NULL
,
TSDB_SQL_INSERT
);
ASSERT_NE
(
request
,
nullptr
);
SSmlHandle
*
info
=
smlBuildSmlInfo
(
taos
,
request
,
TSDB_SML_JSON_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
,
true
);
ASSERT_NE
(
info
,
nullptr
);
const
char
*
sql
=
"[
\n
"
" {
\n
"
"
\"
metric
\"
:
\"
sys.cpu.nice
\"
,
\n
"
"
\"
timestamp
\"
: 1346846400,
\n
"
"
\"
value
\"
: 18,
\n
"
"
\"
tags
\"
: {
\n
"
"
\"
host
\"
:
\"
web01
\"
,
\n
"
"
\"
dc
\"
:
\"
lga
\"\n
"
" }
\n
"
" },
\n
"
" {
\n
"
"
\"
metric
\"
:
\"
sys.cpu.nice
\"
,
\n
"
"
\"
timestamp
\"
: 1346846400,
\n
"
"
\"
value
\"
: 9,
\n
"
"
\"
tags
\"
: {
\n
"
"
\"
host
\"
:
\"
web02
\"
,
\n
"
"
\"
dc
\"
:
\"
lga
\"\n
"
" }
\n
"
" }
\n
"
"]"
;
int
ret
=
smlProcess
(
info
,
(
char
**
)(
&
sql
),
-
1
);
ASSERT_EQ
(
ret
,
0
);
TAOS_RES
*
res
=
taos_query
(
taos
,
"select * from t_cb27a7198d637b4f1c6464bd73f756a7"
);
ASSERT_NE
(
res
,
nullptr
);
int
fieldNum
=
taos_field_count
(
res
);
ASSERT_EQ
(
fieldNum
,
2
);
// int rowNum = taos_affected_rows(res);
// ASSERT_EQ(rowNum, 1);
// for (int i = 0; i < rowNum; ++i) {
// TAOS_ROW rows = taos_fetch_row(res);
// }
sql
=
"{
\n
"
"
\"
metric
\"
:
\"
meter_current
\"
,
\n
"
"
\"
timestamp
\"
: {
\n
"
"
\"
value
\"
: 1346846400,
\n
"
"
\"
type
\"
:
\"
s
\"\n
"
" },
\n
"
"
\"
value
\"
: {
\n
"
"
\"
value
\"
: 10.3,
\n
"
"
\"
type
\"
:
\"
i64
\"\n
"
" },
\n
"
"
\"
tags
\"
: {
\n
"
"
\"
groupid
\"
: {
\n
"
"
\"
value
\"
: 2,
\n
"
"
\"
type
\"
:
\"
bigint
\"\n
"
" },
\n
"
"
\"
location
\"
: {
\n
"
"
\"
value
\"
:
\"
北京
\"
,
\n
"
"
\"
type
\"
:
\"
binary
\"\n
"
" },
\n
"
"
\"
id
\"
:
\"
d1001
\"\n
"
" }
\n
"
"}"
;
ret
=
smlProcess
(
info
,
(
char
**
)(
&
sql
),
-
1
);
ASSERT_EQ
(
ret
,
0
);
sql
=
"{
\n
"
"
\"
metric
\"
:
\"
meter_current
\"
,
\n
"
"
\"
timestamp
\"
: {
\n
"
"
\"
value
\"
: 1346846400,
\n
"
"
\"
type
\"
:
\"
s
\"\n
"
" },
\n
"
"
\"
value
\"
: {
\n
"
"
\"
value
\"
: 10.3,
\n
"
"
\"
type
\"
:
\"
i64
\"\n
"
" },
\n
"
"
\"
tags
\"
: {
\n
"
"
\"
t1
\"
: {
\n
"
"
\"
value
\"
: 2,
\n
"
"
\"
type
\"
:
\"
bigint
\"\n
"
" },
\n
"
"
\"
t2
\"
: {
\n
"
"
\"
value
\"
: 2,
\n
"
"
\"
type
\"
:
\"
int
\"\n
"
" },
\n
"
"
\"
t3
\"
: {
\n
"
"
\"
value
\"
: 2,
\n
"
"
\"
type
\"
:
\"
i16
\"\n
"
" },
\n
"
"
\"
t4
\"
: {
\n
"
"
\"
value
\"
: 2,
\n
"
"
\"
type
\"
:
\"
i8
\"\n
"
" },
\n
"
"
\"
t5
\"
: {
\n
"
"
\"
value
\"
: 2,
\n
"
"
\"
type
\"
:
\"
f32
\"\n
"
" },
\n
"
"
\"
t6
\"
: {
\n
"
"
\"
value
\"
: 2,
\n
"
"
\"
type
\"
:
\"
double
\"\n
"
" },
\n
"
"
\"
t7
\"
: {
\n
"
"
\"
value
\"
:
\"
8323
\"
,
\n
"
"
\"
type
\"
:
\"
binary
\"\n
"
" },
\n
"
"
\"
t8
\"
: {
\n
"
"
\"
value
\"
:
\"
北京
\"
,
\n
"
"
\"
type
\"
:
\"
binary
\"\n
"
" },
\n
"
"
\"
t9
\"
: {
\n
"
"
\"
value
\"
: true,
\n
"
"
\"
type
\"
:
\"
bool
\"\n
"
" },
\n
"
"
\"
id
\"
:
\"
d1001
\"\n
"
" }
\n
"
"}"
;
ret
=
smlProcess
(
info
,
(
char
**
)(
&
sql
),
-
1
);
ASSERT_EQ
(
ret
,
0
);
sql
=
"{
\n
"
"
\"
metric
\"
:
\"
meter_current
\"
,
\n
"
"
\"
timestamp
\"
: {
\n
"
"
\"
value
\"
: 1346846400000,
\n
"
"
\"
type
\"
:
\"
ms
\"\n
"
" },
\n
"
"
\"
value
\"
:
\"
ni
\"
,
\n
"
"
\"
tags
\"
: {
\n
"
"
\"
t1
\"
: {
\n
"
"
\"
value
\"
: 20,
\n
"
"
\"
type
\"
:
\"
i64
\"\n
"
" },
\n
"
"
\"
t2
\"
: {
\n
"
"
\"
value
\"
: 25,
\n
"
"
\"
type
\"
:
\"
i32
\"\n
"
" },
\n
"
"
\"
t3
\"
: {
\n
"
"
\"
value
\"
: 2,
\n
"
"
\"
type
\"
:
\"
smallint
\"\n
"
" },
\n
"
"
\"
t4
\"
: {
\n
"
"
\"
value
\"
: 2,
\n
"
"
\"
type
\"
:
\"
tinyint
\"\n
"
" },
\n
"
"
\"
t5
\"
: {
\n
"
"
\"
value
\"
: 2,
\n
"
"
\"
type
\"
:
\"
float
\"\n
"
" },
\n
"
"
\"
t6
\"
: {
\n
"
"
\"
value
\"
: 0.2,
\n
"
"
\"
type
\"
:
\"
f64
\"\n
"
" },
\n
"
"
\"
t7
\"
:
\"
nsj
\"
,
\n
"
"
\"
t8
\"
: {
\n
"
"
\"
value
\"
:
\"
北京
\"
,
\n
"
"
\"
type
\"
:
\"
binary
\"\n
"
" },
\n
"
"
\"
t9
\"
: false,
\n
"
"
\"
id
\"
:
\"
d1001
\"\n
"
" }
\n
"
"}"
;
ret
=
smlProcess
(
info
,
(
char
**
)(
&
sql
),
-
1
);
ASSERT_EQ
(
ret
,
0
);
}
source/dnode/vnode/inc/vnode.h
浏览文件 @
33c191c2
...
...
@@ -115,6 +115,7 @@ void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond)
void
tsdbDestroyTableGroup
(
STableGroupInfo
*
pGroupList
);
int32_t
tsdbGetOneTableGroup
(
void
*
pMeta
,
uint64_t
uid
,
TSKEY
startKey
,
STableGroupInfo
*
pGroupInfo
);
int32_t
tsdbGetTableGroupFromIdList
(
SVnode
*
pVnode
,
SArray
*
pTableIdList
,
STableGroupInfo
*
pGroupInfo
);
void
tsdbCleanupReadHandle
(
tsdbReaderT
queryHandle
);
// tq
...
...
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
33c191c2
...
...
@@ -99,7 +99,6 @@ int32_t tsdbInitSma(STsdb *pTsdb);
int32_t
tsdbDropTSma
(
STsdb
*
pTsdb
,
char
*
pMsg
);
int32_t
tsdbDropTSmaData
(
STsdb
*
pTsdb
,
int64_t
indexUid
);
int32_t
tsdbInsertRSmaData
(
STsdb
*
pTsdb
,
char
*
msg
);
void
tsdbCleanupReadHandle
(
tsdbReaderT
queryHandle
);
typedef
enum
{
TSDB_FILE_HEAD
=
0
,
// .head
TSDB_FILE_DATA
,
// .data
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
33c191c2
...
...
@@ -457,9 +457,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
if
(
pHeadWithCkSum
->
head
.
msgType
!=
TDMT_VND_SUBMIT
)
{
walSkipFetchBody
(
pExec
->
pWalReader
,
pHeadWithCkSum
);
ASSERT
(
walSkipFetchBody
(
pExec
->
pWalReader
,
pHeadWithCkSum
)
==
0
);
}
else
{
walFetchBody
(
pExec
->
pWalReader
,
&
pHeadWithCkSum
);
ASSERT
(
walFetchBody
(
pExec
->
pWalReader
,
&
pHeadWithCkSum
)
==
0
);
}
SWalReadHead
*
pHead
=
&
pHeadWithCkSum
->
head
;
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
33c191c2
...
...
@@ -333,6 +333,8 @@ typedef struct SScanInfo {
typedef
struct
STableScanInfo
{
void
*
dataReader
;
SReadHandle
readHandle
;
SFileBlockLoadRecorder
readRecorder
;
int64_t
numOfRows
;
int64_t
elapsedTime
;
...
...
@@ -348,6 +350,11 @@ typedef struct STableScanInfo {
SArray
*
pColMatchInfo
;
int32_t
numOfOutput
;
SExprInfo
*
pPseudoExpr
;
int32_t
numOfPseudoExpr
;
SqlFunctionCtx
*
pPseudoCtx
;
// int32_t* rowCellInfoOffset;
SQueryTableDataCond
cond
;
int32_t
scanFlag
;
// table scan flag to denote if it is a repeat/reverse/main scan
int32_t
dataBlockLoadFlag
;
...
...
@@ -628,7 +635,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
int32_t
compLen
,
int32_t
numOfOutput
,
int64_t
startTs
,
uint64_t
*
total
,
SArray
*
pColList
);
void
getAlignQueryTimeWindow
(
SInterval
*
pInterval
,
int32_t
precision
,
int64_t
key
,
STimeWindow
*
win
);
int32_t
getTableScan
Order
(
SOperatorInfo
*
pOperator
);
int32_t
getTableScan
Info
(
SOperatorInfo
*
pOperator
,
int32_t
*
order
,
int32_t
*
scanFlag
);
void
doSetOperatorCompleted
(
SOperatorInfo
*
pOperator
);
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
);
...
...
@@ -644,12 +651,17 @@ SSDataBlock* loadNextDataBlock(void* param);
void
setResultRowInitCtx
(
SResultRow
*
pResult
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
*
rowCellInfoOffset
);
SArray
*
extractColMatchInfo
(
SNodeList
*
pNodeList
,
SDataBlockDescNode
*
pOutputNodeList
,
int32_t
*
numOfOutputCols
,
int32_t
type
);
SExprInfo
*
createExprInfo
(
SNodeList
*
pNodeList
,
SNodeList
*
pGroupKeys
,
int32_t
*
numOfExprs
);
SSDataBlock
*
createResDataBlock
(
SDataBlockDescNode
*
pNode
);
int32_t
initQueryTableDataCond
(
SQueryTableDataCond
*
pCond
,
const
STableScanPhysiNode
*
pTableScanNode
);
SResultRow
*
doSetResultOutBufByKey
(
SDiskbasedBuf
*
pResultBuf
,
SResultRowInfo
*
pResultRowInfo
,
char
*
pData
,
int16_t
bytes
,
bool
masterscan
,
uint64_t
groupId
,
SExecTaskInfo
*
pTaskInfo
,
bool
isIntervalQuery
,
SAggSupporter
*
pSup
);
SOperatorInfo
*
createTableScanOperatorInfo
(
void
*
pDataReader
,
SQueryTableDataCond
*
pCond
,
int32_t
numOfOutput
,
int32_t
dataLoadFlag
,
const
uint8_t
*
scanInfo
,
SArray
*
pColMatchInfo
,
SSDataBlock
*
pResBlock
,
SNode
*
pCondition
,
SInterval
*
pInterval
,
double
sampleRatio
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTableScanOperatorInfo
(
STableScanPhysiNode
*
pTableScanNode
,
tsdbReaderT
pDataReader
,
SReadHandle
*
pHandle
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createAggregateOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SExprInfo
*
pScalarExprInfo
,
int32_t
numOfScalarExpr
,
SExecTaskInfo
*
pTaskInfo
,
const
STableGroupInfo
*
pTableGroupInfo
);
...
...
@@ -704,7 +716,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim
int32_t
projectApplyFunctions
(
SExprInfo
*
pExpr
,
SSDataBlock
*
pResult
,
SSDataBlock
*
pSrcBlock
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
SArray
*
pPseudoList
);
void
setInputDataBlock
(
SOperatorInfo
*
pOperator
,
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
int32_t
order
,
bool
createDummyCol
);
void
setInputDataBlock
(
SOperatorInfo
*
pOperator
,
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
int32_t
order
,
int32_t
scanFlag
,
bool
createDummyCol
);
void
copyTsColoum
(
SSDataBlock
*
pRes
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
33c191c2
...
...
@@ -654,7 +654,7 @@ static FORCE_INLINE TSKEY reviseWindowEkey(STaskAttr* pQueryAttr, STimeWindow* p
}
static
int32_t
doSetInputDataBlock
(
SOperatorInfo
*
pOperator
,
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
int32_t
order
,
bool
createDummyCol
);
int32_t
scanFlag
,
bool
createDummyCol
);
static
void
doSetInputDataBlockInfo
(
SOperatorInfo
*
pOperator
,
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
int32_t
order
)
{
...
...
@@ -665,12 +665,12 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pC
}
}
void
setInputDataBlock
(
SOperatorInfo
*
pOperator
,
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
int32_t
order
,
void
setInputDataBlock
(
SOperatorInfo
*
pOperator
,
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
int32_t
order
,
int32_t
scanFlag
,
bool
createDummyCol
)
{
if
(
pBlock
->
pBlockAgg
!=
NULL
)
{
doSetInputDataBlockInfo
(
pOperator
,
pCtx
,
pBlock
,
order
);
}
else
{
doSetInputDataBlock
(
pOperator
,
pCtx
,
pBlock
,
order
,
createDummyCol
);
doSetInputDataBlock
(
pOperator
,
pCtx
,
pBlock
,
order
,
scanFlag
,
createDummyCol
);
}
}
...
...
@@ -717,14 +717,14 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc
}
static
int32_t
doSetInputDataBlock
(
SOperatorInfo
*
pOperator
,
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
int32_t
order
,
bool
createDummyCol
)
{
int32_t
scanFlag
,
bool
createDummyCol
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
for
(
int32_t
i
=
0
;
i
<
pOperator
->
numOfExprs
;
++
i
)
{
pCtx
[
i
].
order
=
order
;
pCtx
[
i
].
size
=
pBlock
->
info
.
rows
;
pCtx
[
i
].
pSrcBlock
=
pBlock
;
pCtx
[
i
].
currentStage
=
MAIN_SCAN
;
pCtx
[
i
].
currentStage
=
scanFlag
;
SInputColumnInfoData
*
pInput
=
&
pCtx
[
i
].
input
;
pInput
->
uid
=
pBlock
->
info
.
uid
;
...
...
@@ -740,7 +740,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
pInput
->
numOfRows
=
pBlock
->
info
.
rows
;
pInput
->
startRowIndex
=
0
;
//
the last parameter is the
timestamp column
//
NOTE: the last parameter is the primary
timestamp column
if
(
fmIsTimelineFunc
(
pCtx
[
i
].
functionId
)
&&
(
j
==
pOneExpr
->
base
.
numOfParams
-
1
))
{
pInput
->
pPTS
=
pInput
->
pData
[
j
];
}
...
...
@@ -884,7 +884,8 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
}
else
if
(
pExpr
[
k
].
pExpr
->
nodeType
==
QUERY_NODE_FUNCTION
)
{
ASSERT
(
!
fmIsAggFunc
(
pfCtx
->
functionId
));
if
(
fmIsPseudoColumnFunc
(
pfCtx
->
functionId
))
{
// _rowts/_c0, not tbname column
if
(
fmIsPseudoColumnFunc
(
pfCtx
->
functionId
)
&&
(
!
fmIsScanPseudoColumnFunc
(
pfCtx
->
functionId
)))
{
// do nothing
}
else
if
(
fmIsNonstandardSQLFunc
(
pfCtx
->
functionId
))
{
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
&
pCtx
[
k
]);
...
...
@@ -3506,7 +3507,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
break
;
}
setInputDataBlock
(
pOperator
,
pInfo
->
binfo
.
pCtx
,
pDataBlock
,
TSDB_ORDER_ASC
,
true
);
setInputDataBlock
(
pOperator
,
pInfo
->
binfo
.
pCtx
,
pDataBlock
,
TSDB_ORDER_ASC
,
MAIN_SCAN
,
true
);
// updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor,
// pOperator->pRuntimeEnv, true);
doMergeImpl
(
pOperator
,
pOperator
->
numOfExprs
,
pDataBlock
);
...
...
@@ -3671,17 +3672,24 @@ _error:
return
NULL
;
}
int32_t
getTableScanOrder
(
SOperatorInfo
*
pOperator
)
{
if
(
pOperator
->
operatorType
!=
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
)
{
int32_t
getTableScanInfo
(
SOperatorInfo
*
pOperator
,
int32_t
*
order
,
int32_t
*
scanFlag
)
{
// todo add more information about exchange operation
if
(
pOperator
->
operatorType
==
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
)
{
*
order
=
TSDB_ORDER_ASC
;
*
scanFlag
=
MAIN_SCAN
;
return
TSDB_CODE_SUCCESS
;
}
else
if
(
pOperator
->
operatorType
==
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
)
{
STableScanInfo
*
pTableScanInfo
=
pOperator
->
info
;
*
order
=
pTableScanInfo
->
cond
.
order
;
*
scanFlag
=
pTableScanInfo
->
scanFlag
;
return
TSDB_CODE_SUCCESS
;
}
else
{
if
(
pOperator
->
pDownstream
==
NULL
||
pOperator
->
pDownstream
[
0
]
==
NULL
)
{
return
TSDB_
ORDER_ASC
;
return
TSDB_
CODE_INVALID_PARA
;
}
else
{
return
getTableScan
Order
(
pOperator
->
pDownstream
[
0
]
);
return
getTableScan
Info
(
pOperator
->
pDownstream
[
0
],
order
,
scanFlag
);
}
}
STableScanInfo
*
pTableScanInfo
=
pOperator
->
info
;
return
pTableScanInfo
->
cond
.
order
;
}
// this is a blocking operator
...
...
@@ -3697,6 +3705,9 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
int32_t
order
=
TSDB_ORDER_ASC
;
int32_t
scanFlag
=
MAIN_SCAN
;
while
(
1
)
{
publishOperatorProfEvent
(
downstream
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
...
...
@@ -3709,11 +3720,14 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
// setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
// }
int32_t
order
=
getTableScanOrder
(
pOperator
);
int32_t
code
=
getTableScanInfo
(
pOperator
,
&
order
,
&
scanFlag
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
}
// there is an scalar expression that needs to be calculated before apply the group aggregation.
if
(
pAggInfo
->
pScalarExprInfo
!=
NULL
)
{
int32_t
code
=
projectApplyFunctions
(
pAggInfo
->
pScalarExprInfo
,
pBlock
,
pBlock
,
pAggInfo
->
pScalarCtx
,
code
=
projectApplyFunctions
(
pAggInfo
->
pScalarExprInfo
,
pBlock
,
pBlock
,
pAggInfo
->
pScalarCtx
,
pAggInfo
->
numOfScalarExpr
,
NULL
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pTaskInfo
->
code
=
code
;
...
...
@@ -3723,7 +3737,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again
setExecutionContext
(
pOperator
->
numOfExprs
,
pBlock
->
info
.
groupId
,
pTaskInfo
,
pAggInfo
);
setInputDataBlock
(
pOperator
,
pInfo
->
pCtx
,
pBlock
,
order
,
true
);
setInputDataBlock
(
pOperator
,
pInfo
->
pCtx
,
pBlock
,
order
,
scanFlag
,
true
);
doAggregateImpl
(
pOperator
,
0
,
pInfo
->
pCtx
);
#if 0 // test for encode/decode result info
...
...
@@ -4004,6 +4018,9 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
}
#endif
int32_t
order
=
0
;
int32_t
scanFlag
=
0
;
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
while
(
1
)
{
...
...
@@ -4035,15 +4052,14 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// }
// the pDataBlock are always the same one, no need to call this again
int32_t
order
=
getTableScanOrder
(
pOperator
->
pDownstream
[
0
]
);
int32_t
code
=
getTableScanInfo
(
pOperator
->
pDownstream
[
0
],
&
order
,
&
scanFlag
);
setInputDataBlock
(
pOperator
,
pInfo
->
pCtx
,
pBlock
,
order
,
false
);
setInputDataBlock
(
pOperator
,
pInfo
->
pCtx
,
pBlock
,
order
,
scanFlag
,
false
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pInfo
->
pRes
->
info
.
rows
+
pBlock
->
info
.
rows
);
pTaskInfo
->
code
=
projectApplyFunctions
(
pOperator
->
pExpr
,
pInfo
->
pRes
,
pBlock
,
pInfo
->
pCtx
,
pOperator
->
numOfExprs
,
pProjectInfo
->
pPseudoColInfo
);
if
(
pTaskInfo
->
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
pTaskInfo
->
code
);
code
=
projectApplyFunctions
(
pOperator
->
pExpr
,
pInfo
->
pRes
,
pBlock
,
pInfo
->
pCtx
,
pOperator
->
numOfExprs
,
pProjectInfo
->
pPseudoColInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
}
int32_t
status
=
handleLimitOffset
(
pOperator
,
pBlock
);
...
...
@@ -4642,8 +4658,22 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
pExp
->
pExpr
->
_function
.
functionId
=
pFuncNode
->
funcId
;
pExp
->
pExpr
->
_function
.
pFunctNode
=
pFuncNode
;
strncpy
(
pExp
->
pExpr
->
_function
.
functionName
,
pFuncNode
->
functionName
,
tListLen
(
pExp
->
pExpr
->
_function
.
functionName
));
#if 1
// todo refactor: add the parameter for tbname function
if
(
strcmp
(
pExp
->
pExpr
->
_function
.
functionName
,
"tbname"
)
==
0
)
{
pFuncNode
->
pParameterList
=
nodesMakeList
();
ASSERT
(
LIST_LENGTH
(
pFuncNode
->
pParameterList
)
==
0
);
SValueNode
*
res
=
(
SValueNode
*
)
nodesMakeNode
(
QUERY_NODE_VALUE
);
if
(
NULL
==
res
)
{
// todo handle error
}
else
{
res
->
node
.
resType
=
(
SDataType
)
{.
bytes
=
sizeof
(
int64_t
),
.
type
=
TSDB_DATA_TYPE_BIGINT
};
nodesListAppend
(
pFuncNode
->
pParameterList
,
res
);
}
}
#endif
int32_t
numOfParam
=
LIST_LENGTH
(
pFuncNode
->
pParameterList
);
...
...
@@ -4704,58 +4734,29 @@ static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t
uint64_t
queryId
,
uint64_t
taskId
);
static
SArray
*
extractTableIdList
(
const
STableGroupInfo
*
pTableGroupInfo
);
static
SArray
*
extractColumnInfo
(
SNodeList
*
pNodeList
);
static
SArray
*
extractColMatchInfo
(
SNodeList
*
pNodeList
,
SDataBlockDescNode
*
pOutputNodeList
,
int32_t
*
numOfOutputCols
,
int32_t
type
);
static
SArray
*
createSortInfo
(
SNodeList
*
pNodeList
);
static
SArray
*
extractPartitionColInfo
(
SNodeList
*
pNodeList
);
static
int32_t
initQueryTableDataCond
(
SQueryTableDataCond
*
pCond
,
const
STableScanPhysiNode
*
pTableScanNode
);
static
void
setJoinColumnInfo
(
SColumnInfo
*
pColumn
,
const
SColumnNode
*
pColumnNode
);
static
SInterval
extractIntervalInfo
(
const
STableScanPhysiNode
*
pTableScanNode
)
{
SInterval
interval
=
{
.
interval
=
pTableScanNode
->
interval
,
.
sliding
=
pTableScanNode
->
sliding
,
.
intervalUnit
=
pTableScanNode
->
intervalUnit
,
.
slidingUnit
=
pTableScanNode
->
slidingUnit
,
.
offset
=
pTableScanNode
->
offset
,
};
return
interval
;
}
SOperatorInfo
*
createOperatorTree
(
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
queryId
,
uint64_t
taskId
,
STableGroupInfo
*
pTableGroupInfo
)
{
int32_t
type
=
nodeType
(
pPhyNode
);
if
(
pPhyNode
->
pChildren
==
NULL
||
LIST_LENGTH
(
pPhyNode
->
pChildren
)
==
0
)
{
if
(
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
==
type
)
{
SScanPhysiNode
*
pScanPhyNode
=
(
SScanPhysiNode
*
)
pPhyNode
;
STableScanPhysiNode
*
pTableScanNode
=
(
STableScanPhysiNode
*
)
pPhyNode
;
int32_t
numOfCols
=
0
;
tsdbReaderT
pDataReader
=
doCreateDataReader
(
pTableScanNode
,
pHandle
,
pTableGroupInfo
,
(
uint64_t
)
queryId
,
taskId
);
if
(
pDataReader
==
NULL
&&
terrno
!=
0
)
{
return
NULL
;
}
S
DataBlockDescNode
*
pDescNode
=
pScanPhyNode
->
node
.
pOutputDataBlockDesc
;
S
OperatorInfo
*
pOperator
=
createTableScanOperatorInfo
(
pTableScanNode
,
pDataReader
,
pHandle
,
pTaskInfo
)
;
SArray
*
pColList
=
extractColMatchInfo
(
pScanPhyNode
->
pScanCols
,
pDescNode
,
&
numOfCols
,
COL_MATCH_FROM_COL_ID
);
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pDescNode
);
SQueryTableDataCond
cond
=
{
0
};
int32_t
code
=
initQueryTableDataCond
(
&
cond
,
pTableScanNode
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
NULL
;
}
SInterval
interval
=
extractIntervalInfo
(
pTableScanNode
);
SOperatorInfo
*
pOperator
=
createTableScanOperatorInfo
(
pDataReader
,
&
cond
,
numOfCols
,
pTableScanNode
->
dataRequired
,
pTableScanNode
->
scanSeq
,
pColList
,
pResBlock
,
pScanPhyNode
->
node
.
pConditions
,
&
interval
,
pTableScanNode
->
ratio
,
pTaskInfo
);
STableScanInfo
*
pScanInfo
=
pOperator
->
info
;
pTaskInfo
->
cost
.
pRecoder
=
&
pScanInfo
->
readRecorder
;
return
pOperator
;
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
==
type
)
{
SExchangePhysiNode
*
pExchange
=
(
SExchangePhysiNode
*
)
pPhyNode
;
...
...
@@ -4945,7 +4946,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return
pOptr
;
}
static
int32_t
initQueryTableDataCond
(
SQueryTableDataCond
*
pCond
,
const
STableScanPhysiNode
*
pTableScanNode
)
{
int32_t
initQueryTableDataCond
(
SQueryTableDataCond
*
pCond
,
const
STableScanPhysiNode
*
pTableScanNode
)
{
pCond
->
loadExternalRows
=
false
;
pCond
->
order
=
pTableScanNode
->
scanSeq
[
0
]
>
0
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
33c191c2
...
...
@@ -287,7 +287,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pInfo
->
binfo
.
pCtx
,
pBlock
,
order
,
true
);
setInputDataBlock
(
pOperator
,
pInfo
->
binfo
.
pCtx
,
pBlock
,
order
,
MAIN_SCAN
,
true
);
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if
(
pInfo
->
pScalarExprInfo
!=
NULL
)
{
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
33c191c2
...
...
@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <libs/function/function.h>
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
...
...
@@ -284,6 +283,27 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
continue
;
}
// currently only the tbname pseudo column
if
(
pTableScanInfo
->
numOfPseudoExpr
>
0
)
{
int32_t
dstSlotId
=
pTableScanInfo
->
pPseudoExpr
->
base
.
resSchema
.
slotId
;
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
dstSlotId
);
colInfoDataEnsureCapacity
(
pColInfoData
,
0
,
pBlock
->
info
.
rows
);
struct
SScalarFuncExecFuncs
fpSet
;
fmGetScalarFuncExecFuncs
(
pTableScanInfo
->
pPseudoExpr
->
pExpr
->
_function
.
functionId
,
&
fpSet
);
SColumnInfoData
infoData
=
{
0
};
infoData
.
info
.
type
=
TSDB_DATA_TYPE_BIGINT
;
infoData
.
info
.
bytes
=
sizeof
(
uint64_t
);
colInfoDataEnsureCapacity
(
&
infoData
,
0
,
1
);
colDataAppendInt64
(
&
infoData
,
0
,
&
pBlock
->
info
.
uid
);
SScalarParam
srcParam
=
{.
numOfRows
=
pBlock
->
info
.
rows
,
.
param
=
pTableScanInfo
->
readHandle
.
meta
,
.
columnData
=
&
infoData
};
SScalarParam
param
=
{.
columnData
=
pColInfoData
};
fpSet
.
process
(
&
srcParam
,
1
,
&
param
);
}
return
pBlock
;
}
...
...
@@ -314,8 +334,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
STimeWindow
*
pWin
=
&
pTableScanInfo
->
cond
.
twindow
;
qDebug
(
"%s start to repeat ascending order scan data blocks due to query func required, qrange:%"
PRId64
"-%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pWin
->
skey
,
pWin
->
ekey
);
"-%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pWin
->
skey
,
pWin
->
ekey
);
// do prepare for the next round table scan operation
tsdbResetReadHandle
(
pTableScanInfo
->
dataReader
,
&
pTableScanInfo
->
cond
);
...
...
@@ -359,10 +378,29 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return
NULL
;
}
SOperatorInfo
*
createTableScanOperatorInfo
(
void
*
pDataReader
,
SQueryTableDataCond
*
pCond
,
int32_t
numOfOutput
,
int32_t
dataLoadFlag
,
const
uint8_t
*
scanInfo
,
SArray
*
pColMatchInfo
,
SSDataBlock
*
pResBlock
,
SNode
*
pCondition
,
SInterval
*
pInterval
,
double
sampleRatio
,
SExecTaskInfo
*
pTaskInfo
)
{
SInterval
extractIntervalInfo
(
const
STableScanPhysiNode
*
pTableScanNode
)
{
SInterval
interval
=
{
.
interval
=
pTableScanNode
->
interval
,
.
sliding
=
pTableScanNode
->
sliding
,
.
intervalUnit
=
pTableScanNode
->
intervalUnit
,
.
slidingUnit
=
pTableScanNode
->
slidingUnit
,
.
offset
=
pTableScanNode
->
offset
,
};
return
interval
;
}
static
void
destroyTableScanOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
STableScanInfo
*
pTableScanInfo
=
(
STableScanInfo
*
)
param
;
taosMemoryFree
(
pTableScanInfo
->
pResBlock
);
tsdbCleanupReadHandle
(
pTableScanInfo
->
dataReader
);
if
(
pTableScanInfo
->
pColMatchInfo
!=
NULL
)
{
taosArrayDestroy
(
pTableScanInfo
->
pColMatchInfo
);
}
}
SOperatorInfo
*
createTableScanOperatorInfo
(
STableScanPhysiNode
*
pTableScanNode
,
tsdbReaderT
pDataReader
,
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
)
{
STableScanInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
STableScanInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
...
...
@@ -373,27 +411,42 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCon
return
NULL
;
}
pInfo
->
cond
=
*
pCond
;
pInfo
->
scanInfo
=
(
SScanInfo
){.
numOfAsc
=
scanInfo
[
0
],
.
numOfDesc
=
scanInfo
[
1
]};
SDataBlockDescNode
*
pDescNode
=
pTableScanNode
->
scan
.
node
.
pOutputDataBlockDesc
;
int32_t
numOfCols
=
0
;
SArray
*
pColList
=
extractColMatchInfo
(
pTableScanNode
->
scan
.
pScanCols
,
pDescNode
,
&
numOfCols
,
COL_MATCH_FROM_COL_ID
);
int32_t
code
=
initQueryTableDataCond
(
&
pInfo
->
cond
,
pTableScanNode
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
NULL
;
}
if
(
pTableScanNode
->
scan
.
pScanPseudoCols
!=
NULL
)
{
pInfo
->
pPseudoExpr
=
createExprInfo
(
pTableScanNode
->
scan
.
pScanPseudoCols
,
NULL
,
&
pInfo
->
numOfPseudoExpr
);
pInfo
->
pPseudoCtx
=
createSqlFunctionCtx
(
pInfo
->
pPseudoExpr
,
pInfo
->
numOfPseudoExpr
,
&
pInfo
->
rowCellInfoOffset
);
}
pInfo
->
scanInfo
=
(
SScanInfo
){.
numOfAsc
=
pTableScanNode
->
scanSeq
[
0
],
.
numOfDesc
=
pTableScanNode
->
scanSeq
[
1
]};
pInfo
->
interval
=
*
pInterval
;
pInfo
->
sampleRatio
=
sampleRatio
;
pInfo
->
dataBlockLoadFlag
=
dataLoadFlag
;
pInfo
->
pResBlock
=
pResBlock
;
pInfo
->
pFilterNode
=
pCondition
;
pInfo
->
readHandle
=
*
readHandle
;
pInfo
->
interval
=
extractIntervalInfo
(
pTableScanNode
);
pInfo
->
sampleRatio
=
pTableScanNode
->
ratio
;
pInfo
->
dataBlockLoadFlag
=
pTableScanNode
->
dataRequired
;
pInfo
->
pResBlock
=
createResDataBlock
(
pDescNode
);
pInfo
->
pFilterNode
=
pTableScanNode
->
scan
.
node
.
pConditions
;
pInfo
->
dataReader
=
pDataReader
;
pInfo
->
scanFlag
=
MAIN_SCAN
;
pInfo
->
pColMatchInfo
=
pColMatchInfo
;
pInfo
->
pColMatchInfo
=
pColList
;
pOperator
->
name
=
"TableScanOperator"
;
// for du
bug purpose
pOperator
->
name
=
"TableScanOperator"
;
// for de
bug purpose
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
numOfExprs
=
numOfOutput
;
pOperator
->
numOfExprs
=
numOfCols
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doTableScan
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doTableScan
,
NULL
,
NULL
,
destroyTableScanOperatorInfo
,
NULL
,
NULL
,
NULL
);
static
int32_t
cost
=
0
;
...
...
@@ -1311,7 +1364,6 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
metaReaderClear
(
&
mr
);
colDataAppend
(
pDst
,
count
,
str
,
false
);
// data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes);
// dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes;
// doSetTagValueToResultBuf(dst, data, type, bytes);
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
33c191c2
...
...
@@ -775,7 +775,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pInfo
->
binfo
.
pCtx
,
pBlock
,
order
,
true
);
setInputDataBlock
(
pOperator
,
pInfo
->
binfo
.
pCtx
,
pBlock
,
order
,
MAIN_SCAN
,
true
);
STableQueryInfo
*
pTableQueryInfo
=
pInfo
->
pCurrent
;
setIntervalQueryRange
(
pTableQueryInfo
,
pBlock
->
info
.
window
.
skey
,
&
pTaskInfo
->
window
);
...
...
@@ -910,7 +910,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
break
;
}
setInputDataBlock
(
pOperator
,
pBInfo
->
pCtx
,
pBlock
,
order
,
true
);
setInputDataBlock
(
pOperator
,
pBInfo
->
pCtx
,
pBlock
,
order
,
MAIN_SCAN
,
true
);
doStateWindowAggImpl
(
pOperator
,
pInfo
,
pBlock
);
}
...
...
@@ -1024,7 +1024,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pInfo
->
binfo
.
pCtx
,
pBlock
,
order
,
true
);
setInputDataBlock
(
pOperator
,
pInfo
->
binfo
.
pCtx
,
pBlock
,
order
,
MAIN_SCAN
,
true
);
if
(
pInfo
->
invertible
)
{
setInverFunction
(
pInfo
->
binfo
.
pCtx
,
pOperator
->
numOfExprs
,
pBlock
->
info
.
type
);
}
...
...
@@ -1286,7 +1286,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pBInfo
->
pCtx
,
pBlock
,
order
,
true
);
setInputDataBlock
(
pOperator
,
pBInfo
->
pCtx
,
pBlock
,
order
,
MAIN_SCAN
,
true
);
doSessionWindowAggImpl
(
pOperator
,
pInfo
,
pBlock
);
}
...
...
@@ -1334,7 +1334,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) {
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfExprs);
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pSliceInfo
->
binfo
.
pCtx
,
pBlock
,
order
,
true
);
setInputDataBlock
(
pOperator
,
pSliceInfo
->
binfo
.
pCtx
,
pBlock
,
order
,
MAIN_SCAN
,
true
);
// hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0);
}
...
...
source/libs/function/inc/builtinsimpl.h
浏览文件 @
33c191c2
...
...
@@ -98,6 +98,10 @@ int32_t stateDurationFunction(SqlFunctionCtx* pCtx);
bool
getCsumFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
int32_t
csumFunction
(
SqlFunctionCtx
*
pCtx
);
bool
getMavgFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
mavgFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
mavgFunction
(
SqlFunctionCtx
*
pCtx
);
bool
getSelectivityFuncEnv
(
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
#ifdef __cplusplus
...
...
source/libs/function/src/builtins.c
浏览文件 @
33c191c2
...
...
@@ -339,6 +339,27 @@ static int32_t translateCsum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
translateMavg
(
SFunctionNode
*
pFunc
,
char
*
pErrBuf
,
int32_t
len
)
{
if
(
2
!=
LIST_LENGTH
(
pFunc
->
pParameterList
))
{
return
invaildFuncParaNumErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
SNode
*
pPara
=
nodesListGetNode
(
pFunc
->
pParameterList
,
0
);
if
(
QUERY_NODE_COLUMN
!=
nodeType
(
pPara
))
{
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_ERROR
,
"The input parameter of MAVG function can only be column"
);
}
uint8_t
colType
=
((
SExprNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
0
))
->
resType
.
type
;
uint8_t
paraType
=
((
SExprNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
1
))
->
resType
.
type
;
if
(
!
IS_NUMERIC_TYPE
(
colType
)
||
!
IS_INTEGER_TYPE
(
paraType
))
{
return
invaildFuncParaTypeErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
pFunc
->
node
.
resType
=
(
SDataType
){.
bytes
=
tDataTypes
[
TSDB_DATA_TYPE_DOUBLE
].
bytes
,
.
type
=
TSDB_DATA_TYPE_DOUBLE
};
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
translateLastRow
(
SFunctionNode
*
pFunc
,
char
*
pErrBuf
,
int32_t
len
)
{
// todo
return
TSDB_CODE_SUCCESS
;
...
...
@@ -783,6 +804,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
processFunc
=
csumFunction
,
.
finalizeFunc
=
NULL
},
{
.
name
=
"mavg"
,
.
type
=
FUNCTION_TYPE_MAVG
,
.
classification
=
FUNC_MGT_NONSTANDARD_SQL_FUNC
|
FUNC_MGT_TIMELINE_FUNC
,
.
translateFunc
=
translateMavg
,
.
getEnvFunc
=
getMavgFuncEnv
,
.
initFunc
=
mavgFunctionSetup
,
.
processFunc
=
mavgFunction
,
.
finalizeFunc
=
NULL
},
{
.
name
=
"abs"
,
.
type
=
FUNCTION_TYPE_ABS
,
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
33c191c2
...
...
@@ -21,7 +21,8 @@
#include "tdatablock.h"
#include "tpercentile.h"
#define HISTOGRAM_MAX_BINS_NUM 100
#define HISTOGRAM_MAX_BINS_NUM 1000
#define MAVG_MAX_POINTS_NUM 1000
typedef
struct
SSumRes
{
union
{
...
...
@@ -141,6 +142,14 @@ typedef enum {
STATE_OPER_EQ
,
}
EStateOperType
;
typedef
struct
SMavgInfo
{
int32_t
pos
;
double
sum
;
int32_t
numOfPoints
;
bool
pointsMeet
;
double
points
[];
}
SMavgInfo
;
#define SET_VAL(_info, numOfElem, res) \
do { \
if ((numOfElem) <= 0) { \
...
...
@@ -1644,7 +1653,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
pResInfo
->
complete
=
true
;
return
0
;
}
else
{
pInfo
->
pMemBucket
=
tMemBucketCreate
(
pC
tx
->
inputBytes
,
pCtx
->
inputT
ype
,
pInfo
->
minval
,
pInfo
->
maxval
);
pInfo
->
pMemBucket
=
tMemBucketCreate
(
pC
ol
->
info
.
bytes
,
t
ype
,
pInfo
->
minval
,
pInfo
->
maxval
);
}
}
...
...
@@ -1695,10 +1704,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
pInfo
->
numOfElems
+=
1
;
}
}
return
0
;
}
}
else
{
// the second stage, calculate the true percentile value
int32_t
start
=
pInput
->
startRowIndex
;
for
(
int32_t
i
=
start
;
i
<
pInput
->
numOfRows
+
start
;
++
i
)
{
...
...
@@ -1707,18 +1713,19 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
}
char
*
data
=
colDataGetData
(
pCol
,
i
);
notNullElems
+=
1
;
tMemBucketPut
(
pInfo
->
pMemBucket
,
data
,
1
);
}
SET_VAL
(
pResInfo
,
notNullElems
,
1
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
percentileFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
)
{
SVariant
*
pVal
=
&
pCtx
->
param
[
1
].
param
;
double
v
=
pVal
->
nType
==
TSDB_DATA_TYPE_INT
?
pVal
->
i
:
pVal
->
d
;
double
v
=
(
pVal
->
nType
==
TSDB_DATA_TYPE_BIGINT
)
?
pVal
->
i
:
pVal
->
d
;
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SPercentileInfo
*
ppInfo
=
(
SPercentileInfo
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
...
...
@@ -2949,3 +2956,81 @@ int32_t csumFunction(SqlFunctionCtx* pCtx) {
return
numOfElems
;
}
bool
getMavgFuncEnv
(
SFunctionNode
*
UNUSED_PARAM
(
pFunc
),
SFuncExecEnv
*
pEnv
)
{
pEnv
->
calcMemSize
=
sizeof
(
SMavgInfo
)
+
MAVG_MAX_POINTS_NUM
*
sizeof
(
double
);
return
true
;
}
bool
mavgFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
)
{
if
(
!
functionSetup
(
pCtx
,
pResultInfo
))
{
return
false
;
}
SMavgInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResultInfo
);
pInfo
->
pos
=
0
;
pInfo
->
sum
=
0
;
pInfo
->
numOfPoints
=
pCtx
->
param
[
1
].
param
.
i
;
if
(
pInfo
->
numOfPoints
<
1
||
pInfo
->
numOfPoints
>
MAVG_MAX_POINTS_NUM
)
{
return
false
;
}
pInfo
->
pointsMeet
=
false
;
return
true
;
}
int32_t
mavgFunction
(
SqlFunctionCtx
*
pCtx
)
{
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
SMavgInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResInfo
);
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
TSKEY
*
tsList
=
(
int64_t
*
)
pInput
->
pPTS
->
pData
;
SColumnInfoData
*
pInputCol
=
pInput
->
pData
[
0
];
SColumnInfoData
*
pTsOutput
=
pCtx
->
pTsOutput
;
SColumnInfoData
*
pOutput
=
(
SColumnInfoData
*
)
pCtx
->
pOutput
;
int32_t
numOfElems
=
0
;
int32_t
type
=
pInputCol
->
info
.
type
;
int32_t
startOffset
=
pCtx
->
offset
;
for
(
int32_t
i
=
pInput
->
startRowIndex
;
i
<
pInput
->
numOfRows
+
pInput
->
startRowIndex
;
i
+=
1
)
{
int32_t
pos
=
startOffset
+
numOfElems
;
if
(
colDataIsNull_f
(
pInputCol
->
nullbitmap
,
i
))
{
//colDataAppendNULL(pOutput, i);
continue
;
}
char
*
data
=
colDataGetData
(
pInputCol
,
i
);
double
v
;
GET_TYPED_DATA
(
v
,
double
,
type
,
data
);
if
(
!
pInfo
->
pointsMeet
&&
(
pInfo
->
pos
<
pInfo
->
numOfPoints
-
1
))
{
pInfo
->
points
[
pInfo
->
pos
]
=
v
;
pInfo
->
sum
+=
v
;
}
else
{
if
(
!
pInfo
->
pointsMeet
&&
(
pInfo
->
pos
==
pInfo
->
numOfPoints
-
1
))
{
pInfo
->
sum
+=
v
;
pInfo
->
pointsMeet
=
true
;
}
else
{
pInfo
->
sum
=
pInfo
->
sum
+
v
-
pInfo
->
points
[
pInfo
->
pos
];
}
pInfo
->
points
[
pInfo
->
pos
]
=
v
;
double
result
=
pInfo
->
sum
/
pInfo
->
numOfPoints
;
colDataAppend
(
pOutput
,
pos
,
(
char
*
)
&
result
,
false
);
//TODO: remove this after pTsOutput is handled
if
(
pTsOutput
!=
NULL
)
{
colDataAppendInt64
(
pTsOutput
,
pos
,
&
tsList
[
i
]);
}
numOfElems
++
;
}
pInfo
->
pos
++
;
if
(
pInfo
->
pos
==
pInfo
->
numOfPoints
)
{
pInfo
->
pos
=
0
;
}
}
return
numOfElems
;
}
source/libs/function/src/texpr.c
浏览文件 @
33c191c2
...
...
@@ -27,19 +27,6 @@
#include "tvariant.h"
#include "tdef.h"
//static uint8_t UNUSED_FUNC isQueryOnPrimaryKey(const char *primaryColumnName, const tExprNode *pLeft, const tExprNode *pRight) {
// if (pLeft->nodeType == TEXPR_COL_NODE) {
// // if left node is the primary column,return true
// return (strcmp(primaryColumnName, pLeft->pSchema->name) == 0) ? 1 : 0;
// } else {
// // if any children have query on primary key, their parents are also keep this value
// return ((pLeft->nodeType == TEXPR_BINARYEXPR_NODE && pLeft->_node.hasPK == 1) ||
// (pRight->nodeType == TEXPR_BINARYEXPR_NODE && pRight->_node.hasPK == 1)) == true
// ? 1
// : 0;
// }
//}
static
void
doExprTreeDestroy
(
tExprNode
**
pExpr
,
void
(
*
fp
)(
void
*
));
void
tExprTreeDestroy
(
tExprNode
*
pNode
,
void
(
*
fp
)(
void
*
))
{
...
...
@@ -64,21 +51,7 @@ static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
}
int32_t
type
=
(
*
pExpr
)
->
nodeType
;
if
(
type
==
TEXPR_BINARYEXPR_NODE
)
{
doExprTreeDestroy
(
&
(
*
pExpr
)
->
_node
.
pLeft
,
fp
);
doExprTreeDestroy
(
&
(
*
pExpr
)
->
_node
.
pRight
,
fp
);
if
(
fp
!=
NULL
)
{
fp
((
*
pExpr
)
->
_node
.
info
);
}
}
else
if
(
type
==
TEXPR_UNARYEXPR_NODE
)
{
doExprTreeDestroy
(
&
(
*
pExpr
)
->
_node
.
pLeft
,
fp
);
if
(
fp
!=
NULL
)
{
fp
((
*
pExpr
)
->
_node
.
info
);
}
assert
((
*
pExpr
)
->
_node
.
pRight
==
NULL
);
}
else
if
(
type
==
TEXPR_VALUE_NODE
)
{
if
(
type
==
TEXPR_VALUE_NODE
)
{
taosVariantDestroy
((
*
pExpr
)
->
pVal
);
taosMemoryFree
((
*
pExpr
)
->
pVal
);
}
else
if
(
type
==
TEXPR_COL_NODE
)
{
...
...
@@ -90,9 +63,7 @@ static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
}
bool
exprTreeApplyFilter
(
tExprNode
*
pExpr
,
const
void
*
pItem
,
SExprTraverseSupp
*
param
)
{
tExprNode
*
pLeft
=
pExpr
->
_node
.
pLeft
;
tExprNode
*
pRight
=
pExpr
->
_node
.
pRight
;
#if 0
//non-leaf nodes, recursively traverse the expression tree in the post-root order
if (pLeft->nodeType == TEXPR_BINARYEXPR_NODE && pRight->nodeType == TEXPR_BINARYEXPR_NODE) {
if (pExpr->_node.optr == LOGIC_COND_TYPE_OR) { // or
...
...
@@ -114,6 +85,9 @@ bool exprTreeApplyFilter(tExprNode *pExpr, const void *pItem, SExprTraverseSupp
// handle the leaf node
param->setupInfoFn(pExpr, param->pExtInfo);
return param->nodeFilterFn(pItem, pExpr->_node.info);
#endif
return
0
;
}
// TODO: these three functions should be made global
...
...
@@ -141,59 +115,6 @@ static UNUSED_FUNC char* exception_strdup(const char* str) {
return
p
;
}
static
tExprNode
*
exprTreeFromBinaryImpl
(
SBufferReader
*
br
)
{
int32_t
anchor
=
CLEANUP_GET_ANCHOR
();
if
(
CLEANUP_EXCEED_LIMIT
())
{
THROW
(
TSDB_CODE_QRY_EXCEED_TAGS_LIMIT
);
return
NULL
;
}
tExprNode
*
pExpr
=
exception_calloc
(
1
,
sizeof
(
tExprNode
));
CLEANUP_PUSH_VOID_PTR_PTR
(
true
,
tExprTreeDestroy
,
pExpr
,
NULL
);
pExpr
->
nodeType
=
tbufReadUint8
(
br
);
if
(
pExpr
->
nodeType
==
TEXPR_VALUE_NODE
)
{
SVariant
*
pVal
=
exception_calloc
(
1
,
sizeof
(
SVariant
));
pExpr
->
pVal
=
pVal
;
pVal
->
nType
=
tbufReadUint32
(
br
);
if
(
pVal
->
nType
==
TSDB_DATA_TYPE_BINARY
)
{
tbufReadToBuffer
(
br
,
&
pVal
->
nLen
,
sizeof
(
pVal
->
nLen
));
pVal
->
pz
=
taosMemoryCalloc
(
1
,
pVal
->
nLen
+
1
);
tbufReadToBuffer
(
br
,
pVal
->
pz
,
pVal
->
nLen
);
}
else
{
pVal
->
i
=
tbufReadInt64
(
br
);
}
}
else
if
(
pExpr
->
nodeType
==
TEXPR_COL_NODE
)
{
SSchema
*
pSchema
=
exception_calloc
(
1
,
sizeof
(
SSchema
));
pExpr
->
pSchema
=
pSchema
;
pSchema
->
colId
=
tbufReadInt16
(
br
);
pSchema
->
bytes
=
tbufReadInt16
(
br
);
pSchema
->
type
=
tbufReadUint8
(
br
);
tbufReadToString
(
br
,
pSchema
->
name
,
TSDB_COL_NAME_LEN
);
}
else
if
(
pExpr
->
nodeType
==
TEXPR_BINARYEXPR_NODE
)
{
pExpr
->
_node
.
optr
=
tbufReadUint8
(
br
);
pExpr
->
_node
.
pLeft
=
exprTreeFromBinaryImpl
(
br
);
pExpr
->
_node
.
pRight
=
exprTreeFromBinaryImpl
(
br
);
assert
(
pExpr
->
_node
.
pLeft
!=
NULL
&&
pExpr
->
_node
.
pRight
!=
NULL
);
}
CLEANUP_EXECUTE_TO
(
anchor
,
false
);
return
pExpr
;
}
tExprNode
*
exprTreeFromBinary
(
const
void
*
data
,
size_t
size
)
{
if
(
size
==
0
)
{
return
NULL
;
}
SBufferReader
br
=
tbufInitReader
(
data
,
size
,
false
);
return
exprTreeFromBinaryImpl
(
&
br
);
}
void
buildFilterSetFromBinary
(
void
**
q
,
const
char
*
buf
,
int32_t
len
)
{
SBufferReader
br
=
tbufInitReader
(
buf
,
len
,
false
);
uint32_t
type
=
tbufReadUint32
(
&
br
);
...
...
@@ -405,38 +326,3 @@ err_ret:
taosHashCleanup
(
pObj
);
taosMemoryFreeClear
(
tmp
);
}
tExprNode
*
exprdup
(
tExprNode
*
pNode
)
{
if
(
pNode
==
NULL
)
{
return
NULL
;
}
tExprNode
*
pCloned
=
taosMemoryCalloc
(
1
,
sizeof
(
tExprNode
));
if
(
pNode
->
nodeType
==
TEXPR_BINARYEXPR_NODE
)
{
tExprNode
*
pLeft
=
exprdup
(
pNode
->
_node
.
pLeft
);
tExprNode
*
pRight
=
exprdup
(
pNode
->
_node
.
pRight
);
pCloned
->
_node
.
pLeft
=
pLeft
;
pCloned
->
_node
.
pRight
=
pRight
;
pCloned
->
_node
.
optr
=
pNode
->
_node
.
optr
;
}
else
if
(
pNode
->
nodeType
==
TEXPR_VALUE_NODE
)
{
pCloned
->
pVal
=
taosMemoryCalloc
(
1
,
sizeof
(
SVariant
));
taosVariantAssign
(
pCloned
->
pVal
,
pNode
->
pVal
);
}
else
if
(
pNode
->
nodeType
==
TEXPR_COL_NODE
)
{
pCloned
->
pSchema
=
taosMemoryCalloc
(
1
,
sizeof
(
SSchema
));
*
pCloned
->
pSchema
=
*
pNode
->
pSchema
;
}
else
if
(
pNode
->
nodeType
==
TEXPR_FUNCTION_NODE
)
{
strcpy
(
pCloned
->
_function
.
functionName
,
pNode
->
_function
.
functionName
);
int32_t
num
=
pNode
->
_function
.
num
;
pCloned
->
_function
.
num
=
num
;
pCloned
->
_function
.
pChild
=
taosMemoryCalloc
(
num
,
POINTER_BYTES
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
pCloned
->
_function
.
pChild
[
i
]
=
exprdup
(
pNode
->
_function
.
pChild
[
i
]);
}
}
pCloned
->
nodeType
=
pNode
->
nodeType
;
return
pCloned
;
}
source/libs/parser/src/parInsert.c
浏览文件 @
33c191c2
...
...
@@ -1071,6 +1071,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
int32_t
tbNum
=
0
;
char
tbFName
[
TSDB_TABLE_FNAME_LEN
];
bool
autoCreateTbl
=
false
;
STableMeta
*
pMeta
=
NULL
;
// for each table
while
(
1
)
{
...
...
@@ -1127,10 +1128,12 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
CHECK_CODE
(
getDataBlockFromList
(
pCxt
->
pTableBlockHashObj
,
tbFName
,
strlen
(
tbFName
),
TSDB_DEFAULT_PAYLOAD_SIZE
,
sizeof
(
SSubmitBlk
),
getTableInfo
(
pCxt
->
pTableMeta
).
rowSize
,
pCxt
->
pTableMeta
,
&
dataBuf
,
NULL
,
&
pCxt
->
createTblReq
));
pMeta
=
pCxt
->
pTableMeta
;
pCxt
->
pTableMeta
=
NULL
;
if
(
TK_NK_LP
==
sToken
.
type
)
{
// pSql -> field1_name, ...)
CHECK_CODE
(
parseBoundColumns
(
pCxt
,
&
dataBuf
->
boundColumnInfo
,
getTableColumnSchema
(
p
Cxt
->
pTable
Meta
)));
CHECK_CODE
(
parseBoundColumns
(
pCxt
,
&
dataBuf
->
boundColumnInfo
,
getTableColumnSchema
(
pMeta
)));
NEXT_TOKEN
(
pCxt
->
pSql
,
sToken
);
}
...
...
@@ -1166,8 +1169,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
memcpy
(
tags
,
&
pCxt
->
tags
,
sizeof
(
pCxt
->
tags
));
(
*
pCxt
->
pStmtCb
->
setInfoFn
)(
pCxt
->
pStmtCb
->
pStmt
,
pCxt
->
pTableMeta
,
tags
,
tbFName
,
autoCreateTbl
,
pCxt
->
pVgroupsHashObj
,
pCxt
->
pTableBlockHashObj
);
(
*
pCxt
->
pStmtCb
->
setInfoFn
)(
pCxt
->
pStmtCb
->
pStmt
,
pMeta
,
tags
,
tbFName
,
autoCreateTbl
,
pCxt
->
pVgroupsHashObj
,
pCxt
->
pTableBlockHashObj
);
memset
(
&
pCxt
->
tags
,
0
,
sizeof
(
pCxt
->
tags
));
pCxt
->
pVgroupsHashObj
=
NULL
;
...
...
@@ -1677,8 +1679,8 @@ static int32_t smlBuildTagRow(SArray* cols, SKVRowBuilder* tagsBuilder, SParsedD
return
TSDB_CODE_SUCCESS
;
}
int32_t
smlBindData
(
void
*
handle
,
SArray
*
tags
,
SArray
*
colsFormat
,
SArray
*
colsSchema
,
SArray
*
cols
,
bool
format
,
STableMeta
*
pTableMeta
,
char
*
tableName
,
char
*
msgBuf
,
int16_t
msgBufLen
)
{
int32_t
smlBindData
(
void
*
handle
,
SArray
*
tags
,
SArray
*
colsSchema
,
SArray
*
cols
,
bool
format
,
STableMeta
*
pTableMeta
,
char
*
tableName
,
char
*
msgBuf
,
int16_t
msgBufLen
)
{
SMsgBuf
pBuf
=
{.
buf
=
msgBuf
,
.
len
=
msgBufLen
};
SSmlExecHandle
*
smlHandle
=
(
SSmlExecHandle
*
)
handle
;
...
...
@@ -1720,8 +1722,8 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsFormat, SArray* cols
initRowBuilder
(
&
pDataBlock
->
rowBuilder
,
pDataBlock
->
pTableMeta
->
sversion
,
&
pDataBlock
->
boundColumnInfo
);
int32_t
rowNum
=
format
?
taosArrayGetSize
(
colsFormat
)
:
taosArrayGetSize
(
cols
);
if
(
rowNum
<=
0
)
{
int32_t
rowNum
=
taosArrayGetSize
(
cols
);
if
(
rowNum
<=
0
)
{
return
buildInvalidOperationMsg
(
&
pBuf
,
"cols size <= 0"
);
}
ret
=
allocateMemForSize
(
pDataBlock
,
extendedRowSize
*
rowNum
);
...
...
@@ -1732,13 +1734,10 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsFormat, SArray* cols
for
(
int32_t
r
=
0
;
r
<
rowNum
;
++
r
)
{
STSRow
*
row
=
(
STSRow
*
)(
pDataBlock
->
pData
+
pDataBlock
->
size
);
// skip the SSubmitBlk header
tdSRowResetBuf
(
pBuilder
,
row
);
void
*
rowData
=
NULL
;
void
*
rowData
=
taosArrayGetP
(
cols
,
r
)
;
size_t
rowDataSize
=
0
;
if
(
format
)
{
rowData
=
taosArrayGetP
(
colsFormat
,
r
);
if
(
format
){
rowDataSize
=
taosArrayGetSize
(
rowData
);
}
else
{
rowData
=
taosArrayGetP
(
cols
,
r
);
}
// 1. set the parsed value from sql string
...
...
source/libs/planner/src/planner.c
浏览文件 @
33c191c2
...
...
@@ -224,7 +224,7 @@ static int32_t calcConstList(SNodeList* pList) {
}
static
bool
isEmptyResultCond
(
SNode
**
pCond
)
{
if
(
QUERY_NODE_VALUE
!=
nodeType
(
*
pCond
))
{
if
(
NULL
==
*
pCond
||
QUERY_NODE_VALUE
!=
nodeType
(
*
pCond
))
{
return
false
;
}
if
(((
SValueNode
*
)
*
pCond
)
->
datum
.
b
)
{
...
...
source/libs/scalar/CMakeLists.txt
浏览文件 @
33c191c2
...
...
@@ -8,7 +8,7 @@ target_include_directories(
)
target_link_libraries
(
scalar
PRIVATE os util common nodes function qcom
PRIVATE os util common nodes function qcom
vnode
)
if
(
${
BUILD_TEST
}
)
...
...
source/libs/scalar/inc/sclInt.h
浏览文件 @
33c191c2
...
...
@@ -26,6 +26,7 @@ typedef struct SScalarCtx {
int32_t
code
;
SArray
*
pBlockList
;
/* element is SSDataBlock* */
SHashObj
*
pRes
;
/* element is SScalarParam */
void
*
param
;
// additional parameter (meta actually) for acquire value such as tbname/tags values
}
SScalarCtx
;
...
...
@@ -49,6 +50,7 @@ typedef struct SScalarCtx {
int32_t
doConvertDataType
(
SValueNode
*
pValueNode
,
SScalarParam
*
out
);
SColumnInfoData
*
createColumnInfoData
(
SDataType
*
pType
,
int32_t
numOfRows
);
void
sclConvertToTsValueNode
(
int8_t
precision
,
SValueNode
*
valueNode
);
#define GET_PARAM_TYPE(_c) ((_c)->columnData->info.type)
#define GET_PARAM_BYTES(_c) ((_c)->columnData->info.bytes)
...
...
source/libs/scalar/src/filter.c
浏览文件 @
33c191c2
...
...
@@ -3505,19 +3505,6 @@ int32_t fltAddValueNodeToConverList(SFltTreeStat *stat, SValueNode* pNode) {
return
TSDB_CODE_SUCCESS
;
}
void
fltConvertToTsValueNode
(
SFltTreeStat
*
stat
,
SValueNode
*
valueNode
)
{
char
*
timeStr
=
valueNode
->
datum
.
p
;
if
(
convertStringToTimestamp
(
valueNode
->
node
.
resType
.
type
,
valueNode
->
datum
.
p
,
stat
->
precision
,
&
valueNode
->
datum
.
i
)
!=
TSDB_CODE_SUCCESS
)
{
valueNode
->
datum
.
i
=
0
;
}
taosMemoryFree
(
timeStr
);
valueNode
->
typeData
=
valueNode
->
datum
.
i
;
valueNode
->
node
.
resType
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
valueNode
->
node
.
resType
.
bytes
=
tDataTypes
[
TSDB_DATA_TYPE_TIMESTAMP
].
bytes
;
}
EDealRes
fltReviseRewriter
(
SNode
**
pNode
,
void
*
pContext
)
{
SFltTreeStat
*
stat
=
(
SFltTreeStat
*
)
pContext
;
...
...
@@ -3566,7 +3553,7 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) {
return
DEAL_RES_CONTINUE
;
}
fltConvertToTsValueNode
(
stat
,
valueNode
);
sclConvertToTsValueNode
(
stat
->
precision
,
valueNode
);
return
DEAL_RES_CONTINUE
;
}
...
...
@@ -3614,6 +3601,11 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) {
return
DEAL_RES_CONTINUE
;
}
if
(
FILTER_GET_FLAG
(
stat
->
info
->
options
,
FLT_OPTION_TIMESTAMP
)
&&
node
->
opType
>=
OP_TYPE_NOT_EQUAL
)
{
stat
->
scalarMode
=
true
;
return
DEAL_RES_CONTINUE
;
}
if
(
NULL
==
node
->
pRight
)
{
if
(
scalarGetOperatorParamNum
(
node
->
opType
)
>
1
)
{
fltError
(
"invalid operator, pRight:%p, nodeType:%d, opType:%d"
,
node
->
pRight
,
nodeType
(
node
),
node
->
opType
);
...
...
@@ -3695,7 +3687,7 @@ int32_t fltReviseNodes(SFilterInfo *pInfo, SNode** pNode, SFltTreeStat *pStat) {
for
(
int32_t
i
=
0
;
i
<
nodeNum
;
++
i
)
{
SValueNode
*
valueNode
=
*
(
SValueNode
**
)
taosArrayGet
(
pStat
->
nodeList
,
i
);
fltConvertToTsValueNode
(
pStat
,
valueNode
);
sclConvertToTsValueNode
(
pStat
->
precision
,
valueNode
);
}
_return:
...
...
source/libs/scalar/src/scalar.c
浏览文件 @
33c191c2
...
...
@@ -8,6 +8,7 @@
#include "tdatablock.h"
#include "scalar.h"
#include "tudf.h"
#include "ttime.h"
int32_t
scalarGetOperatorParamNum
(
EOperatorType
type
)
{
if
(
OP_TYPE_IS_NULL
==
type
||
OP_TYPE_IS_NOT_NULL
==
type
||
OP_TYPE_IS_TRUE
==
type
||
OP_TYPE_IS_NOT_TRUE
==
type
...
...
@@ -19,6 +20,19 @@ int32_t scalarGetOperatorParamNum(EOperatorType type) {
return
2
;
}
void
sclConvertToTsValueNode
(
int8_t
precision
,
SValueNode
*
valueNode
)
{
char
*
timeStr
=
valueNode
->
datum
.
p
;
if
(
convertStringToTimestamp
(
valueNode
->
node
.
resType
.
type
,
valueNode
->
datum
.
p
,
precision
,
&
valueNode
->
datum
.
i
)
!=
TSDB_CODE_SUCCESS
)
{
valueNode
->
datum
.
i
=
0
;
}
taosMemoryFree
(
timeStr
);
valueNode
->
node
.
resType
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
valueNode
->
node
.
resType
.
bytes
=
tDataTypes
[
TSDB_DATA_TYPE_TIMESTAMP
].
bytes
;
}
SColumnInfoData
*
createColumnInfoData
(
SDataType
*
pType
,
int32_t
numOfRows
)
{
SColumnInfoData
*
pColumnData
=
taosMemoryCalloc
(
1
,
sizeof
(
SColumnInfoData
));
if
(
pColumnData
==
NULL
)
{
...
...
@@ -251,6 +265,7 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
*
rowNum
=
param
->
numOfRows
;
}
param
->
param
=
ctx
->
param
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -535,7 +550,7 @@ EDealRes sclRewriteBasedOnOptr(SNode** pNode, SScalarCtx *ctx, EOperatorType opT
}
EDealRes
sclRewrite
OperatorForNullValue
(
SNode
**
pNode
,
SScalarCtx
*
ctx
)
{
EDealRes
sclRewrite
NonConstOperator
(
SNode
**
pNode
,
SScalarCtx
*
ctx
)
{
SOperatorNode
*
node
=
(
SOperatorNode
*
)
*
pNode
;
if
(
node
->
pLeft
&&
(
QUERY_NODE_VALUE
==
nodeType
(
node
->
pLeft
)))
{
...
...
@@ -543,6 +558,11 @@ EDealRes sclRewriteOperatorForNullValue(SNode** pNode, SScalarCtx *ctx) {
if
(
SCL_IS_NULL_VALUE_NODE
(
valueNode
)
&&
(
node
->
opType
!=
OP_TYPE_IS_NULL
&&
node
->
opType
!=
OP_TYPE_IS_NOT_NULL
))
{
return
sclRewriteBasedOnOptr
(
pNode
,
ctx
,
node
->
opType
);
}
if
(
IS_STR_DATA_TYPE
(
valueNode
->
node
.
resType
.
type
)
&&
node
->
pRight
&&
nodesIsExprNode
(
node
->
pRight
)
&&
((
SExprNode
*
)
node
->
pRight
)
->
resType
.
type
==
TSDB_DATA_TYPE_TIMESTAMP
)
{
sclConvertToTsValueNode
(((
SExprNode
*
)
node
->
pRight
)
->
resType
.
precision
,
valueNode
);
}
}
if
(
node
->
pRight
&&
(
QUERY_NODE_VALUE
==
nodeType
(
node
->
pRight
)))
{
...
...
@@ -550,6 +570,11 @@ EDealRes sclRewriteOperatorForNullValue(SNode** pNode, SScalarCtx *ctx) {
if
(
SCL_IS_NULL_VALUE_NODE
(
valueNode
)
&&
(
node
->
opType
!=
OP_TYPE_IS_NULL
&&
node
->
opType
!=
OP_TYPE_IS_NOT_NULL
))
{
return
sclRewriteBasedOnOptr
(
pNode
,
ctx
,
node
->
opType
);
}
if
(
IS_STR_DATA_TYPE
(
valueNode
->
node
.
resType
.
type
)
&&
node
->
pLeft
&&
nodesIsExprNode
(
node
->
pLeft
)
&&
((
SExprNode
*
)
node
->
pLeft
)
->
resType
.
type
==
TSDB_DATA_TYPE_TIMESTAMP
)
{
sclConvertToTsValueNode
(((
SExprNode
*
)
node
->
pLeft
)
->
resType
.
precision
,
valueNode
);
}
}
if
(
node
->
pRight
&&
(
QUERY_NODE_NODE_LIST
==
nodeType
(
node
->
pRight
)))
{
...
...
@@ -672,7 +697,7 @@ EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) {
SOperatorNode
*
node
=
(
SOperatorNode
*
)
*
pNode
;
if
((
!
SCL_IS_CONST_NODE
(
node
->
pLeft
))
||
(
!
SCL_IS_CONST_NODE
(
node
->
pRight
)))
{
return
sclRewrite
OperatorForNullValue
(
pNode
,
ctx
);
return
sclRewrite
NonConstOperator
(
pNode
,
ctx
);
}
SScalarParam
output
=
{.
columnData
=
taosMemoryCalloc
(
1
,
sizeof
(
SColumnInfoData
))};
...
...
@@ -885,7 +910,7 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
}
int32_t
code
=
0
;
SScalarCtx
ctx
=
{.
code
=
0
,
.
pBlockList
=
pBlockList
};
SScalarCtx
ctx
=
{.
code
=
0
,
.
pBlockList
=
pBlockList
,
.
param
=
pDst
->
param
};
// TODO: OPT performance
ctx
.
pRes
=
taosHashInit
(
SCL_DEFAULT_OP_NUM
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
...
...
source/libs/scalar/src/sclfunc.c
浏览文件 @
33c191c2
#include "function.h"
#include "scalar.h"
#include "tdatablock.h"
#include "ttime.h"
#include "sclInt.h"
#include "sclvector.h"
#include "tdatablock.h"
#include "tjson.h"
#include "ttime.h"
#include "vnode.h"
typedef
float
(
*
_float_fn
)(
float
);
typedef
double
(
*
_double_fn
)(
double
);
...
...
@@ -1512,6 +1513,21 @@ int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
int32_t
qTbnameFunction
(
SScalarParam
*
pInput
,
int32_t
inputNum
,
SScalarParam
*
pOutput
)
{
ASSERT
(
inputNum
==
1
);
colDataAppend
(
pOutput
->
columnData
,
pOutput
->
numOfRows
,
colDataGetData
(
pInput
->
columnData
,
0
),
false
);
SMetaReader
mr
=
{
0
};
metaReaderInit
(
&
mr
,
pInput
->
param
,
0
);
uint64_t
uid
=
*
(
uint64_t
*
)
colDataGetData
(
pInput
->
columnData
,
0
);
metaGetTableEntryByUid
(
&
mr
,
uid
);
char
str
[
TSDB_TABLE_FNAME_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
STR_TO_VARSTR
(
str
,
mr
.
me
.
name
);
metaReaderClear
(
&
mr
);
for
(
int32_t
i
=
0
;
i
<
pInput
->
numOfRows
;
++
i
)
{
colDataAppend
(
pOutput
->
columnData
,
pOutput
->
numOfRows
+
i
,
str
,
false
);
}
pOutput
->
numOfRows
+=
pInput
->
numOfRows
;
return
TSDB_CODE_SUCCESS
;
}
source/libs/wal/src/walRead.c
浏览文件 @
33c191c2
...
...
@@ -54,7 +54,7 @@ int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) {
return
0
;
}
static
int
32
_t
walReadSeekFilePos
(
SWalReadHandle
*
pRead
,
int64_t
fileFirstVer
,
int64_t
ver
)
{
static
int
64
_t
walReadSeekFilePos
(
SWalReadHandle
*
pRead
,
int64_t
fileFirstVer
,
int64_t
ver
)
{
int64_t
ret
=
0
;
TdFilePtr
pIdxTFile
=
pRead
->
pReadIdxTFile
;
...
...
@@ -156,7 +156,7 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
void
walSetReaderCapacity
(
SWalReadHandle
*
pRead
,
int32_t
capacity
)
{
pRead
->
capacity
=
capacity
;
}
int32_t
walFetchHead
(
SWalReadHandle
*
pRead
,
int64_t
ver
,
SWalHead
*
pHead
)
{
int
32
_t
code
;
int
64
_t
code
;
// TODO: valid ver
if
(
ver
>
pRead
->
pWal
->
vers
.
commitVer
)
{
...
...
@@ -214,23 +214,24 @@ int32_t walFetchBody(SWalReadHandle *pRead, SWalHead **ppHead) {
return
-
1
;
}
*
ppHead
=
ptr
;
pReadHead
=
&
((
*
ppHead
)
->
head
);
pRead
->
capacity
=
pReadHead
->
bodyLen
;
}
if
(
pReadHead
->
bodyLen
!=
taosReadFile
(
pRead
->
pReadLogTFile
,
pReadHead
->
body
,
pReadHead
->
bodyLen
))
{
ASSERT
(
0
);
return
-
1
;
}
if
(
pReadHead
->
version
!=
ver
)
{
wError
(
"unexpected wal log version: %"
PRId64
", read request version:%"
PRId64
""
,
pRead
->
pHead
->
head
.
version
,
ver
);
wError
(
"wal fetch body error: %"
PRId64
", read request version:%"
PRId64
""
,
pRead
->
pHead
->
head
.
version
,
ver
);
pRead
->
curVersion
=
-
1
;
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
}
if
(
walValidBodyCksum
(
*
ppHead
)
!=
0
)
{
wError
(
"
unexpected wal log version
: % "
PRId64
", since body checksum not passed"
,
ver
);
wError
(
"
wal fetch body error
: % "
PRId64
", since body checksum not passed"
,
ver
);
pRead
->
curVersion
=
-
1
;
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
...
...
@@ -257,7 +258,7 @@ int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **p
}
int32_t
walReadWithHandle
(
SWalReadHandle
*
pRead
,
int64_t
ver
)
{
int
code
;
int
64_t
code
;
// TODO: check wal life
if
(
pRead
->
curVersion
!=
ver
)
{
if
(
walReadSeekVer
(
pRead
,
ver
)
<
0
)
{
...
...
source/libs/wal/src/walSeek.c
浏览文件 @
33c191c2
...
...
@@ -19,8 +19,8 @@
#include "tref.h"
#include "walInt.h"
static
int
walSeekWritePos
(
SWal
*
pWal
,
int64_t
ver
)
{
int
code
=
0
;
static
int
64_t
walSeekWritePos
(
SWal
*
pWal
,
int64_t
ver
)
{
int
64_t
code
=
0
;
TdFilePtr
pIdxTFile
=
pWal
->
pWriteIdxTFile
;
TdFilePtr
pLogTFile
=
pWal
->
pWriteLogTFile
;
...
...
@@ -45,7 +45,7 @@ static int walSeekWritePos(SWal* pWal, int64_t ver) {
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
return
code
;
return
0
;
}
int
walSetWrite
(
SWal
*
pWal
)
{
...
...
@@ -124,7 +124,7 @@ int walChangeWrite(SWal* pWal, int64_t ver) {
}
int
walSeekWriteVer
(
SWal
*
pWal
,
int64_t
ver
)
{
int
code
;
int
64_t
code
;
if
(
ver
==
pWal
->
vers
.
lastVer
)
{
return
0
;
}
...
...
tests/script/api/batchprepare.c
浏览文件 @
33c191c2
...
...
@@ -20,6 +20,11 @@ typedef struct {
bool
enclose
;
}
OperInfo
;
typedef
struct
{
char
*
funcName
;
int32_t
paramNum
;
}
FuncInfo
;
typedef
enum
{
BP_BIND_TAG
=
1
,
BP_BIND_COL
,
...
...
@@ -44,6 +49,13 @@ OperInfo operInfo[] = {
int32_t
operatorList
[]
=
{
0
,
1
,
2
,
3
,
4
,
5
,
6
,
7
};
int32_t
varoperatorList
[]
=
{
0
,
1
,
2
,
3
,
4
,
5
,
6
,
7
,
8
,
9
,
10
,
11
};
FuncInfo
funcInfo
[]
=
{
{
"count"
,
1
},
{
"sum"
,
1
},
{
"min"
,
1
},
{
"sin"
,
1
},
};
char
*
bpStbPrefix
=
"st"
;
char
*
bpTbPrefix
=
"t"
;
int32_t
bpDefaultStbId
=
1
;
...
...
@@ -154,7 +166,7 @@ CaseCfg gCase[] = {
{
"insert:AUTO1-FULL"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
true
,
true
,
insertAUTOTest1
,
10
,
10
,
2
,
0
,
0
,
0
,
1
,
-
1
},
{
"query:SUBT-COLUMN"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_QUERY
,
false
,
false
,
queryColumnTest
,
10
,
10
,
1
,
3
,
0
,
0
,
1
,
2
},
{
"query:SUBT-MISC"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_QUERY
,
false
,
false
,
queryMiscTest
,
10
,
10
,
1
,
3
,
0
,
0
,
1
,
2
},
{
"query:SUBT-MISC"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_QUERY
,
false
,
false
,
queryMiscTest
,
2
,
10
,
1
,
3
,
0
,
0
,
1
,
2
},
};
...
...
@@ -179,6 +191,8 @@ typedef struct {
int32_t
*
bindTagTypeList
;
int32_t
optrIdxListNum
;
int32_t
*
optrIdxList
;
int32_t
funcIdxListNum
;
int32_t
*
funcIdxList
;
int32_t
runTimes
;
int32_t
caseIdx
;
// static case idx
int32_t
caseNum
;
// num in static case list
...
...
@@ -186,7 +200,7 @@ typedef struct {
int32_t
caseRunNum
;
// total run case num
}
CaseCtrl
;
#if
1
#if
0
CaseCtrl gCaseCtrl = { // default
.bindNullNum = 0,
.printCreateTblSql = false,
...
...
@@ -203,6 +217,8 @@ CaseCtrl gCaseCtrl = { // default
.bindTagTypeList = NULL,
.optrIdxListNum = 0,
.optrIdxList = NULL,
.funcIdxListNum = 0,
.funcIdxList = NULL,
.checkParamNum = false,
.printRes = false,
.runTimes = 0,
...
...
@@ -241,7 +257,7 @@ CaseCtrl gCaseCtrl = {
};
#endif
#if
0
#if
1
CaseCtrl
gCaseCtrl
=
{
// query case with specified col&oper
.
bindNullNum
=
0
,
.
printCreateTblSql
=
false
,
...
...
@@ -255,14 +271,14 @@ CaseCtrl gCaseCtrl = { // query case with specified col&oper
.
optrIdxListNum
=
0
,
.
optrIdxList
=
NULL
,
.
checkParamNum
=
false
,
.printRes =
fals
e,
.
printRes
=
tru
e
,
.
runTimes
=
0
,
.
caseRunIdx
=
-
1
,
.
optrIdxListNum
=
0
,
.
optrIdxList
=
NULL
,
.
bindColTypeNum
=
0
,
.
bindColTypeList
=
NULL
,
.caseIdx = 2
3
,
.
caseIdx
=
2
4
,
.
caseNum
=
1
,
.
caseRunNum
=
1
,
};
...
...
@@ -513,66 +529,92 @@ void bpAppendOperatorParam(BindData *data, int32_t *len, int32_t dataType, int32
}
break
;
default:
printf
(
"!!!invalid paramNum:%d
\n
"
,
pInfo
->
paramNum
);
printf
(
"!!!invalid
operator
paramNum:%d
\n
"
,
pInfo
->
paramNum
);
exit
(
1
);
}
}
void
generateQueryCondSQL
(
BindData
*
data
,
int32_t
tblIdx
)
{
int32_t
len
=
sprintf
(
data
->
sql
,
"select * from %s%d where "
,
bpTbPrefix
,
tblIdx
);
if
(
!
gCurCase
->
fullCol
)
{
for
(
int
c
=
0
;
c
<
gCurCase
->
bindColNum
;
++
c
)
{
if
(
c
)
{
len
+=
sprintf
(
data
->
sql
+
len
,
" and "
);
void
bpAppendFunctionParam
(
BindData
*
data
,
int32_t
*
len
,
int32_t
dataType
,
int32_t
idx
)
{
FuncInfo
*
pInfo
=
NULL
;
if
(
gCaseCtrl
.
funcIdxListNum
>
0
)
{
pInfo
=
&
funcInfo
[
gCaseCtrl
.
funcIdxList
[
idx
]];
}
else
{
pInfo
=
&
funcInfo
[
rand
()
%
tListLen
(
funcInfo
)];
}
switch
(
data
->
pBind
[
c
].
buffer_type
)
{
switch
(
pInfo
->
paramNum
)
{
case
1
:
*
len
+=
sprintf
(
data
->
sql
+
*
len
,
" %s(?)"
,
pInfo
->
funcName
);
break
;
default:
printf
(
"!!!invalid function paramNum:%d
\n
"
,
pInfo
->
paramNum
);
exit
(
1
);
}
}
int32_t
bpAppendColumnName
(
BindData
*
data
,
int32_t
type
,
int32_t
len
)
{
switch
(
type
)
{
case
TSDB_DATA_TYPE_BOOL
:
len
+=
sprintf
(
data
->
sql
+
len
,
"booldata"
);
return
sprintf
(
data
->
sql
+
len
,
"booldata"
);
break
;
case
TSDB_DATA_TYPE_TINYINT
:
len
+=
sprintf
(
data
->
sql
+
len
,
"tinydata"
);
return
sprintf
(
data
->
sql
+
len
,
"tinydata"
);
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
len
+=
sprintf
(
data
->
sql
+
len
,
"smalldata"
);
return
sprintf
(
data
->
sql
+
len
,
"smalldata"
);
break
;
case
TSDB_DATA_TYPE_INT
:
len
+=
sprintf
(
data
->
sql
+
len
,
"intdata"
);
return
sprintf
(
data
->
sql
+
len
,
"intdata"
);
break
;
case
TSDB_DATA_TYPE_BIGINT
:
len
+=
sprintf
(
data
->
sql
+
len
,
"bigdata"
);
return
sprintf
(
data
->
sql
+
len
,
"bigdata"
);
break
;
case
TSDB_DATA_TYPE_FLOAT
:
len
+=
sprintf
(
data
->
sql
+
len
,
"floatdata"
);
return
sprintf
(
data
->
sql
+
len
,
"floatdata"
);
break
;
case
TSDB_DATA_TYPE_DOUBLE
:
len
+=
sprintf
(
data
->
sql
+
len
,
"doubledata"
);
return
sprintf
(
data
->
sql
+
len
,
"doubledata"
);
break
;
case
TSDB_DATA_TYPE_VARCHAR
:
len
+=
sprintf
(
data
->
sql
+
len
,
"binarydata"
);
return
sprintf
(
data
->
sql
+
len
,
"binarydata"
);
break
;
case
TSDB_DATA_TYPE_TIMESTAMP
:
len
+=
sprintf
(
data
->
sql
+
len
,
"ts"
);
return
sprintf
(
data
->
sql
+
len
,
"ts"
);
break
;
case
TSDB_DATA_TYPE_NCHAR
:
len
+=
sprintf
(
data
->
sql
+
len
,
"nchardata"
);
return
sprintf
(
data
->
sql
+
len
,
"nchardata"
);
break
;
case
TSDB_DATA_TYPE_UTINYINT
:
len
+=
sprintf
(
data
->
sql
+
len
,
"utinydata"
);
return
sprintf
(
data
->
sql
+
len
,
"utinydata"
);
break
;
case
TSDB_DATA_TYPE_USMALLINT
:
len
+=
sprintf
(
data
->
sql
+
len
,
"usmalldata"
);
return
sprintf
(
data
->
sql
+
len
,
"usmalldata"
);
break
;
case
TSDB_DATA_TYPE_UINT
:
len
+=
sprintf
(
data
->
sql
+
len
,
"uintdata"
);
return
sprintf
(
data
->
sql
+
len
,
"uintdata"
);
break
;
case
TSDB_DATA_TYPE_UBIGINT
:
len
+=
sprintf
(
data
->
sql
+
len
,
"ubigdata"
);
return
sprintf
(
data
->
sql
+
len
,
"ubigdata"
);
break
;
default:
printf
(
"!!!invalid col type:%d"
,
data
->
pBind
[
c
].
buffer_
type
);
printf
(
"!!!invalid col type:%d"
,
type
);
exit
(
1
);
}
return
0
;
}
void
generateQueryCondSQL
(
BindData
*
data
,
int32_t
tblIdx
)
{
int32_t
len
=
sprintf
(
data
->
sql
,
"select * from %s%d where "
,
bpTbPrefix
,
tblIdx
);
if
(
!
gCurCase
->
fullCol
)
{
for
(
int
c
=
0
;
c
<
gCurCase
->
bindColNum
;
++
c
)
{
if
(
c
)
{
len
+=
sprintf
(
data
->
sql
+
len
,
" and "
);
}
len
+=
bpAppendColumnName
(
data
,
data
->
pBind
[
c
].
buffer_type
,
len
);
bpAppendOperatorParam
(
data
,
&
len
,
data
->
pBind
[
c
].
buffer_type
,
c
);
}
}
...
...
@@ -582,64 +624,50 @@ void generateQueryCondSQL(BindData *data, int32_t tblIdx) {
}
}
void
bpGenerateConstInOpSQL
(
BindData
*
data
,
int32_t
tblIdx
)
{
int32_t
len
=
0
;
len
=
sprintf
(
data
->
sql
,
"select "
);
void
generateQueryMiscSQL
(
BindData
*
data
,
int32_t
tblIdx
)
{
int32_t
len
=
sprintf
(
data
->
sql
,
"select * from %s%d where "
,
bpTbPrefix
,
tblIdx
);
if
(
!
gCurCase
->
fullCol
)
{
for
(
int
c
=
0
;
c
<
gCurCase
->
bindColNum
;
++
c
)
{
if
(
c
)
{
len
+=
sprintf
(
data
->
sql
+
len
,
" and "
);
}
switch
(
data
->
pBind
[
c
].
buffer_type
)
{
case
TSDB_DATA_TYPE_BOOL
:
len
+=
sprintf
(
data
->
sql
+
len
,
"booldata"
);
break
;
case
TSDB_DATA_TYPE_TINYINT
:
len
+=
sprintf
(
data
->
sql
+
len
,
"tinydata"
);
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
len
+=
sprintf
(
data
->
sql
+
len
,
"smalldata"
);
break
;
case
TSDB_DATA_TYPE_INT
:
len
+=
sprintf
(
data
->
sql
+
len
,
"intdata"
);
break
;
case
TSDB_DATA_TYPE_BIGINT
:
len
+=
sprintf
(
data
->
sql
+
len
,
"bigdata"
);
break
;
case
TSDB_DATA_TYPE_FLOAT
:
len
+=
sprintf
(
data
->
sql
+
len
,
"floatdata"
);
break
;
case
TSDB_DATA_TYPE_DOUBLE
:
len
+=
sprintf
(
data
->
sql
+
len
,
"doubledata"
);
break
;
case
TSDB_DATA_TYPE_VARCHAR
:
len
+=
sprintf
(
data
->
sql
+
len
,
"binarydata"
);
break
;
case
TSDB_DATA_TYPE_TIMESTAMP
:
len
+=
sprintf
(
data
->
sql
+
len
,
"ts"
);
break
;
case
TSDB_DATA_TYPE_NCHAR
:
len
+=
sprintf
(
data
->
sql
+
len
,
"nchardata"
);
break
;
case
TSDB_DATA_TYPE_UTINYINT
:
len
+=
sprintf
(
data
->
sql
+
len
,
"utinydata"
);
break
;
case
TSDB_DATA_TYPE_USMALLINT
:
len
+=
sprintf
(
data
->
sql
+
len
,
"usmalldata"
);
break
;
case
TSDB_DATA_TYPE_UINT
:
len
+=
sprintf
(
data
->
sql
+
len
,
"uintdata"
);
break
;
case
TSDB_DATA_TYPE_UBIGINT
:
len
+=
sprintf
(
data
->
sql
+
len
,
"ubigdata"
);
break
;
default:
printf
(
"!!!invalid col type:%d"
,
data
->
pBind
[
c
].
buffer_type
);
exit
(
1
);
len
+=
sprintf
(
data
->
sql
+
len
,
", "
);
}
len
+=
bpAppendColumnName
(
data
,
data
->
pBind
[
c
].
buffer_type
,
len
);
bpAppendOperatorParam
(
data
,
&
len
,
data
->
pBind
[
c
].
buffer_type
,
c
);
}
len
+=
sprintf
(
data
->
sql
+
len
,
" from %s%d"
,
bpTbPrefix
,
tblIdx
);
}
void
bpGenerateConstInFuncSQL
(
BindData
*
data
,
int32_t
tblIdx
)
{
int32_t
len
=
0
;
len
=
sprintf
(
data
->
sql
,
"select "
);
for
(
int
c
=
0
;
c
<
gCurCase
->
bindColNum
;
++
c
)
{
if
(
c
)
{
len
+=
sprintf
(
data
->
sql
+
len
,
", "
);
}
bpAppendFunctionParam
(
data
,
&
len
,
data
->
pBind
[
c
].
buffer_type
,
c
);
}
len
+=
sprintf
(
data
->
sql
+
len
,
" from %s%d"
,
bpTbPrefix
,
tblIdx
);
}
void
generateQueryMiscSQL
(
BindData
*
data
,
int32_t
tblIdx
)
{
switch
(
tblIdx
)
{
case
0
:
bpGenerateConstInOpSQL
(
data
,
tblIdx
);
break
;
case
1
:
//TODO FILL TEST
default:
bpGenerateConstInFuncSQL
(
data
,
tblIdx
);
break
;
}
if
(
gCaseCtrl
.
printStmtSql
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录