Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
18da95ad
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
18da95ad
编写于
4月 02, 2020
作者:
S
slguan
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' of
https://github.com/taosdata/TDengine
into develop
上级
c06fb4d9
d099fa7a
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
629 addition
and
304 deletion
+629
-304
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+0
-1
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+3
-4
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+9
-16
src/client/src/tscServer.c
src/client/src/tscServer.c
+1
-1
src/client/src/tscSql.c
src/client/src/tscSql.c
+3
-11
src/inc/taoserror.h
src/inc/taoserror.h
+2
-2
src/mnode/src/mgmtShell.c
src/mnode/src/mgmtShell.c
+20
-1
src/query/inc/qast.h
src/query/inc/qast.h
+17
-23
src/query/inc/qsqlparser.h
src/query/inc/qsqlparser.h
+16
-32
src/query/src/qast.c
src/query/src/qast.c
+90
-102
src/query/src/qparserImpl.c
src/query/src/qparserImpl.c
+173
-2
src/query/src/queryExecutor.c
src/query/src/queryExecutor.c
+4
-4
src/query/src/queryUtil.c
src/query/src/queryUtil.c
+0
-1
src/vnode/tsdb/src/tsdbRead.c
src/vnode/tsdb/src/tsdbRead.c
+291
-104
未找到文件。
src/client/inc/tscUtil.h
浏览文件 @
18da95ad
...
...
@@ -132,7 +132,6 @@ void tscFieldInfoSetExpr(SFieldInfo* pFieldInfo, int32_t index, SSqlExpr* pExpr)
void
tscFieldInfoSetBinExpr
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
,
SSqlFunctionExpr
*
pExpr
);
void
tscFieldInfoCalOffset
(
SQueryInfo
*
pQueryInfo
);
void
tscFieldInfoUpdateOffsetForInterResult
(
SQueryInfo
*
pQueryInfo
);
void
tscFieldInfoCopy
(
SFieldInfo
*
src
,
SFieldInfo
*
dst
,
const
int32_t
*
indexList
,
int32_t
size
);
void
tscFieldInfoCopyAll
(
SFieldInfo
*
dst
,
SFieldInfo
*
src
);
...
...
src/client/src/tscAsync.c
浏览文件 @
18da95ad
...
...
@@ -369,7 +369,7 @@ void tscQueueAsyncRes(SSqlObj *pSql) {
tscTrace
(
"%p SqlObj is freed, not add into queue async res"
,
pSql
);
return
;
}
else
{
tscError
(
"%p add into queued async res, code:%
d"
,
pSql
,
pSql
->
res
.
code
);
tscError
(
"%p add into queued async res, code:%
s"
,
pSql
,
tstrerror
(
pSql
->
res
.
code
)
);
}
SSchedMsg
schedMsg
;
...
...
@@ -410,7 +410,6 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
pSql
->
fp
=
NULL
;
if
(
code
!=
0
)
{
code
=
abs
(
code
);
pRes
->
code
=
code
;
tscTrace
(
"%p failed to renew tableMeta"
,
pSql
);
tsem_post
(
&
pSql
->
rspSem
);
...
...
@@ -432,8 +431,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
return
;
}
if
(
code
!=
0
)
{
pRes
->
code
=
(
uint8_t
)
abs
(
code
)
;
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pRes
->
code
=
code
;
tscQueueAsyncRes
(
pSql
);
return
;
}
...
...
src/client/src/tscSQLParser.c
浏览文件 @
18da95ad
...
...
@@ -1213,11 +1213,11 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
int32_t
ret
=
tSQLBinaryExprCreateFromSqlExpr
(
&
pNode
,
pItem
->
pNode
,
&
pBinExprInfo
->
numOfCols
,
&
pColIndex
,
&
pQueryInfo
->
exprsInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
tSQLBinaryExprDestroy
(
&
pNode
->
pExpr
,
NULL
);
tSQLBinaryExprDestroy
(
&
pNode
,
NULL
);
return
invalidSqlErrMsg
(
pQueryInfo
->
msg
,
"invalid expression in select clause"
);
}
pBinExprInfo
->
pBinExpr
=
pNode
->
pExpr
;
pBinExprInfo
->
pBinExpr
=
pNode
;
pBinExprInfo
->
pReqColumns
=
pColIndex
;
for
(
int32_t
k
=
0
;
k
<
pBinExprInfo
->
numOfCols
;
++
k
)
{
...
...
@@ -5850,28 +5850,21 @@ static int32_t tSQLBinaryExprCreateFromSqlExpr(tSQLSyntaxNode **pExpr, tSQLExpr*
return
TSDB_CODE_SUCCESS
;
}
(
*
pExpr
)
->
colId
=
-
1
;
*
pColIndex
=
realloc
(
*
pColIndex
,
(
++
(
*
num
))
*
sizeof
(
SColIndexEx
));
memset
(
&
(
*
pColIndex
)[(
*
num
)
-
1
],
0
,
sizeof
(
SColIndexEx
));
strncpy
((
*
pColIndex
)[(
*
num
)
-
1
].
name
,
pAst
->
operand
.
z
,
pAst
->
operand
.
n
);
}
else
{
tSQLBinaryExpr
*
pBinExpr
=
(
tSQLBinaryExpr
*
)
calloc
(
1
,
sizeof
(
tSQLBinaryExpr
));
pBinExpr
->
filterOnPrimaryKey
=
false
;
pBinExpr
->
pLeft
=
pLeft
;
pBinExpr
->
pRight
=
pRight
;
*
pExpr
=
(
tSQLSyntaxNode
*
)
calloc
(
1
,
sizeof
(
tSQLSyntaxNode
));
(
*
pExpr
)
->
_node
.
hasPK
=
false
;
(
*
pExpr
)
->
_node
.
pLeft
=
pLeft
;
(
*
pExpr
)
->
_node
.
pRight
=
pRight
;
SSQLToken
t
=
{.
type
=
pAst
->
nSQLOptr
};
pBinExpr
->
nSQLBinaryO
ptr
=
getBinaryExprOptr
(
&
t
);
(
*
pExpr
)
->
_node
.
o
ptr
=
getBinaryExprOptr
(
&
t
);
assert
(
pBinExpr
->
nSQLBinaryO
ptr
!=
0
);
assert
(
(
*
pExpr
)
->
_node
.
o
ptr
!=
0
);
(
*
pExpr
)
=
malloc
(
sizeof
(
tSQLSyntaxNode
));
(
*
pExpr
)
->
nodeType
=
TSQL_NODE_EXPR
;
(
*
pExpr
)
->
pExpr
=
pBinExpr
;
(
*
pExpr
)
->
colId
=
-
1
;
if
(
pBinExpr
->
nSQLBinaryOptr
==
TSDB_BINARY_OP_DIVIDE
)
{
if
((
*
pExpr
)
->
_node
.
optr
==
TSDB_BINARY_OP_DIVIDE
)
{
if
(
pRight
->
nodeType
==
TSQL_NODE_VALUE
)
{
if
(
pRight
->
pVal
->
nType
==
TSDB_DATA_TYPE_INT
&&
pRight
->
pVal
->
i64Key
==
0
)
{
return
TSDB_CODE_INVALID_SQL
;
...
...
src/client/src/tscServer.c
浏览文件 @
18da95ad
...
...
@@ -325,7 +325,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
if
(
rpcMsg
->
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
void
*
taosres
=
tscKeepConn
[
pCmd
->
command
]
?
pSql
:
NULL
;
rpcMsg
->
code
=
pRes
->
code
?
-
pRes
->
code
:
pRes
->
numOfRows
;
rpcMsg
->
code
=
pRes
->
code
?
pRes
->
code
:
pRes
->
numOfRows
;
tscTrace
(
"%p Async SQL result:%s res:%p"
,
pSql
,
tstrerror
(
pRes
->
code
),
taosres
);
...
...
src/client/src/tscSql.c
浏览文件 @
18da95ad
...
...
@@ -867,7 +867,7 @@ int taos_errno(TAOS *taos) {
return
code
;
}
static
bool
validErrorCode
(
int32_t
code
)
{
return
code
>=
TSDB_CODE_SUCCESS
&&
code
<
TSDB_CODE_MAX_ERROR_CODE
;
}
//
static bool validErrorCode(int32_t code) { return code >= TSDB_CODE_SUCCESS && code < TSDB_CODE_MAX_ERROR_CODE; }
/*
* In case of invalid sql error, additional information is attached to explain
...
...
@@ -890,23 +890,15 @@ static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd *pCmd) {
char
*
taos_errstr
(
TAOS
*
taos
)
{
STscObj
*
pObj
=
(
STscObj
*
)
taos
;
uint8_t
code
;
if
(
pObj
==
NULL
||
pObj
->
signature
!=
pObj
)
return
(
char
*
)
tstrerror
(
globalCode
);
SSqlObj
*
pSql
=
pObj
->
pSql
;
if
(
validErrorCode
(
pSql
->
res
.
code
))
{
code
=
pSql
->
res
.
code
;
}
else
{
code
=
TSDB_CODE_OTHERS
;
// unknown error
}
if
(
hasAdditionalErrorInfo
(
code
,
&
pSql
->
cmd
))
{
if
(
hasAdditionalErrorInfo
(
pSql
->
res
.
code
,
&
pSql
->
cmd
))
{
return
pSql
->
cmd
.
payload
;
}
else
{
return
(
char
*
)
tstrerror
(
code
);
return
(
char
*
)
tstrerror
(
pSql
->
res
.
code
);
}
}
...
...
src/inc/taoserror.h
浏览文件 @
18da95ad
...
...
@@ -80,7 +80,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ALREADY_EXIST, 0, 34, "table already ex
TAOS_DEFINE_ERROR
(
TSDB_CODE_INVALID_USER
,
0
,
35
,
"invalid user"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_INVALID_ACCT
,
0
,
36
,
"invalid account"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_INVALID_PASS
,
0
,
37
,
"invalid password"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DB_NOT_SELECTED
,
0
,
38
,
"d
o
not selected"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DB_NOT_SELECTED
,
0
,
38
,
"d
b
not selected"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MEMORY_CORRUPTED
,
0
,
39
,
"memory corrupted"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_USER_ALREADY_EXIST
,
0
,
40
,
"user already exist"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_NO_RIGHTS
,
0
,
41
,
"no rights"
)
...
...
@@ -118,7 +118,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MONITOR_DB_FORBIDDEN, 0, 72, "monitor db forbi
TAOS_DEFINE_ERROR
(
TSDB_CODE_NO_DISK_PERMISSIONS
,
0
,
73
,
"no disk permissions"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VG_INIT_FAILED
,
0
,
74
,
"vg init failed"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DATA_ALREADY_IMPORTED
,
0
,
75
,
"data already imported"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_OPS_NOT_SUPPORT
,
0
,
76
,
"ops not support"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_OPS_NOT_SUPPORT
,
0
,
76
,
"op
eration
s not support"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_INVALID_QUERY_ID
,
0
,
77
,
"invalid query id"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_INVALID_STREAM_ID
,
0
,
78
,
"invalid stream id"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_INVALID_CONNECTION
,
0
,
79
,
"invalid connection"
)
...
...
src/mnode/src/mgmtShell.c
浏览文件 @
18da95ad
...
...
@@ -50,6 +50,7 @@ static void mgmtProcessShowMsg(SQueuedMsg *queuedMsg);
static
void
mgmtProcessRetrieveMsg
(
SQueuedMsg
*
queuedMsg
);
static
void
mgmtProcessHeartBeatMsg
(
SQueuedMsg
*
queuedMsg
);
static
void
mgmtProcessConnectMsg
(
SQueuedMsg
*
queuedMsg
);
static
void
mgmtProcessUseMsg
(
SQueuedMsg
*
queuedMsg
);
static
void
*
tsMgmtShellRpc
=
NULL
;
static
void
*
tsMgmtTranQhandle
=
NULL
;
...
...
@@ -62,7 +63,8 @@ int32_t mgmtInitShell() {
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_RETRIEVE
,
mgmtProcessRetrieveMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CM_HEARTBEAT
,
mgmtProcessHeartBeatMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CM_CONNECT
,
mgmtProcessConnectMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CM_USE_DB
,
mgmtProcessUseMsg
);
tsMgmtTranQhandle
=
taosInitScheduler
(
tsMaxShellConns
,
1
,
"mnodeT"
);
int32_t
numOfThreads
=
tsNumOfCores
*
tsNumOfThreadsPerCore
/
4
.
0
;
...
...
@@ -435,6 +437,23 @@ connect_over:
rpcSendResponse
(
&
rpcRsp
);
}
static
void
mgmtProcessUseMsg
(
SQueuedMsg
*
pMsg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
pMsg
->
thandle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
SCMUseDbMsg
*
pUseDbMsg
=
pMsg
->
pCont
;
// todo check for priority of current user
SDbObj
*
pDbObj
=
mgmtGetDb
(
pUseDbMsg
->
db
);
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
pDbObj
==
NULL
)
{
code
=
TSDB_CODE_INVALID_DB
;
}
rpcRsp
.
code
=
code
;
rpcSendResponse
(
&
rpcRsp
);
}
/**
* check if we need to add mgmtProcessTableMetaMsg into tranQueue, which will be executed one-by-one.
*/
...
...
src/query/inc/qast.h
浏览文件 @
18da95ad
...
...
@@ -27,7 +27,7 @@ extern "C" {
#include "taosdef.h"
#include "tvariant.h"
struct
tSQL
BinaryExpr
;
struct
tSQL
SyntaxNode
;
struct
SSchema
;
struct
tSkipList
;
struct
tSkipListNode
;
...
...
@@ -62,44 +62,38 @@ typedef struct SBinaryFilterSupp {
typedef
struct
tSQLSyntaxNode
{
uint8_t
nodeType
;
int16_t
colId
;
// for schema, the id of column
union
{
struct
tSQLBinaryExpr
*
pExpr
;
struct
SSchema
*
pSchema
;
tVariant
*
pVal
;
struct
{
uint8_t
optr
;
// filter operator
uint8_t
hasPK
;
// 0: do not contain primary filter, 1: contain
void
*
info
;
// support filter operation on this expression only available for leaf node
struct
tSQLSyntaxNode
*
pLeft
;
// left child pointer
struct
tSQLSyntaxNode
*
pRight
;
// right child pointer
}
_node
;
struct
SSchema
*
pSchema
;
tVariant
*
pVal
;
};
}
tSQLSyntaxNode
;
typedef
struct
tSQLBinaryExpr
{
uint8_t
nSQLBinaryOptr
;
// filter operator
uint8_t
filterOnPrimaryKey
;
// 0: do not contain primary filter, 1: contain
/*
* provide the information to support filter operation on this expression
* only available for leaf node
*/
void
*
info
;
tSQLSyntaxNode
*
pLeft
;
// left child pointer
tSQLSyntaxNode
*
pRight
;
// right child pointer
}
tSQLBinaryExpr
;
typedef
struct
tQueryResultset
{
void
**
pRes
;
int64_t
num
;
}
tQueryResultset
;
void
tSQLBinaryExprFromString
(
tSQL
BinaryExpr
**
pExpr
,
SSchema
*
pSchema
,
int32_t
numOfCols
,
char
*
src
,
int32_t
len
);
void
tSQLBinaryExprFromString
(
tSQL
SyntaxNode
**
pExpr
,
SSchema
*
pSchema
,
int32_t
numOfCols
,
char
*
src
,
int32_t
len
);
void
tSQLBinaryExprToString
(
tSQL
BinaryExpr
*
pExpr
,
char
*
dst
,
int32_t
*
len
);
void
tSQLBinaryExprToString
(
tSQL
SyntaxNode
*
pExpr
,
char
*
dst
,
int32_t
*
len
);
void
tSQLBinaryExprDestroy
(
tSQL
BinaryExpr
**
pExprs
,
void
(
*
fp
)(
void
*
));
void
tSQLBinaryExprDestroy
(
tSQL
SyntaxNode
**
pExprs
,
void
(
*
fp
)(
void
*
));
void
tSQLBinaryExprTraverse
(
tSQL
BinaryExpr
*
pExpr
,
SSkipList
*
pSkipList
,
SArray
*
result
,
SBinaryFilterSupp
*
param
);
void
tSQLBinaryExprTraverse
(
tSQL
SyntaxNode
*
pExpr
,
SSkipList
*
pSkipList
,
SArray
*
result
,
SBinaryFilterSupp
*
param
);
void
tSQLBinaryExprCalcTraverse
(
tSQL
BinaryExpr
*
pExprs
,
int32_t
numOfRows
,
char
*
pOutput
,
void
*
param
,
int32_t
order
,
void
tSQLBinaryExprCalcTraverse
(
tSQL
SyntaxNode
*
pExprs
,
int32_t
numOfRows
,
char
*
pOutput
,
void
*
param
,
int32_t
order
,
char
*
(
*
cb
)(
void
*
,
char
*
,
int32_t
));
void
tSQLBinaryExprTrv
(
tSQL
BinaryExpr
*
pExprs
,
int32_t
*
val
,
int16_t
*
ids
);
void
tSQLBinaryExprTrv
(
tSQL
SyntaxNode
*
pExprs
,
int32_t
*
val
,
int16_t
*
ids
);
void
tQueryResultClean
(
tQueryResultset
*
pRes
);
uint8_t
getBinaryExprOptr
(
SSQLToken
*
pToken
);
...
...
src/query/inc/qsqlparser.h
浏览文件 @
18da95ad
...
...
@@ -20,6 +20,7 @@
extern
"C"
{
#endif
#include <tstrbuild.h>
#include "taos.h"
#include "taosmsg.h"
#include "tstoken.h"
...
...
@@ -187,38 +188,19 @@ typedef struct SSqlInfo {
}
SSqlInfo
;
typedef
struct
tSQLExpr
{
/*
* for single operand:
* TK_ALL
* TK_ID
* TK_SUM
* TK_AVG
* TK_MIN
* TK_MAX
* TK_FIRST
* TK_LAST
* TK_BOTTOM
* TK_TOP
* TK_STDDEV
* TK_PERCENTILE
*
* for binary operand:
* TK_LESS
* TK_LARGE
* TK_EQUAL etc...
*/
uint32_t
nSQLOptr
;
// TK_FUNCTION: sql function, TK_LE: less than(binary expr)
// TK_FUNCTION: sql function, TK_LE: less than(binary expr)
uint32_t
nSQLOptr
;
// the full sql string of function(col, param), which is actually the raw
// field name, since the function name is kept in nSQLOptr already
SSQLToken
operand
;
struct
tSQLExprList
*
pParam
;
// function parameters
SSQLToken
operand
;
SSQLToken
colInfo
;
// field id
tVariant
val
;
// value only for string, float, int
SSQLToken
colInfo
;
// field i
d
tVariant
val
;
// value only for string, float, int
struct
tSQLExpr
*
pLeft
;
// left chil
d
struct
tSQLExpr
*
pRight
;
// right child
struct
tSQLExpr
*
pLeft
;
// left child
struct
tSQLExpr
*
pRight
;
// right child
struct
tSQLExprList
*
pParam
;
// function parameters
}
tSQLExpr
;
// used in select clause. select <tSQLExprList> from xxx
...
...
@@ -326,18 +308,20 @@ void tSQLSetColumnInfo(TAOS_FIELD *pField, SSQLToken *pName, TAOS_FIELD *pType);
void
tSQLSetColumnType
(
TAOS_FIELD
*
pField
,
SSQLToken
*
pToken
);
void
*
ParseAlloc
(
void
*
(
*
mallocProc
)(
size_t
));
// convert the sql filter expression into binary data
int32_t
tSQLExprToBinary
(
tSQLExpr
*
pExpr
,
SStringBuilder
*
sb
);
enum
{
TSQL_NODE_TYPE_EXPR
=
0x1
,
TSQL_NODE_TYPE_ID
=
0x2
,
TSQL_NODE_TYPE_EXPR
=
0x1
,
TSQL_NODE_TYPE_ID
=
0x2
,
TSQL_NODE_TYPE_VALUE
=
0x4
,
};
#define NON_ARITHMEIC_EXPR 0
#define NORMAL_ARITHMETIC 1
#define AGG_ARIGHTMEIC 2
#define NORMAL_ARITHMETIC
1
#define AGG_ARIGHTMEIC
2
int32_t
tSQLParse
(
SSqlInfo
*
pSQLInfo
,
const
char
*
pSql
);
...
...
src/query/src/qast.c
浏览文件 @
18da95ad
...
...
@@ -134,10 +134,8 @@ static tSQLSyntaxNode *tSQLSyntaxNodeCreate(SSchema *pSchema, int32_t numOfCols,
pNode
->
nodeType
=
TSQL_NODE_COL
;
if
(
pToken
->
type
==
TK_ID
)
{
pNode
->
colId
=
(
int16_t
)
pSchema
[
i
].
colId
;
memcpy
(
pNode
->
pSchema
,
&
pSchema
[
i
],
sizeof
(
SSchema
));
}
else
{
pNode
->
colId
=
-
1
;
pNode
->
pSchema
->
type
=
TSDB_DATA_TYPE_BINARY
;
pNode
->
pSchema
->
bytes
=
TSDB_TABLE_NAME_LEN
;
strcpy
(
pNode
->
pSchema
->
name
,
TSQL_TBNAME_L
);
...
...
@@ -152,7 +150,6 @@ static tSQLSyntaxNode *tSQLSyntaxNodeCreate(SSchema *pSchema, int32_t numOfCols,
toTSDBType
(
pToken
->
type
);
tVariantCreate
(
pNode
->
pVal
,
pToken
);
pNode
->
nodeType
=
TSQL_NODE_VALUE
;
pNode
->
colId
=
-
1
;
}
return
pNode
;
...
...
@@ -194,36 +191,31 @@ uint8_t getBinaryExprOptr(SSQLToken *pToken) {
}
// previous generated expr is reduced as the left child
static
tSQLSyntaxNode
*
parseRemainStr
(
char
*
pstr
,
tSQL
BinaryExpr
*
pExpr
,
SSchema
*
pSchema
,
int32_t
optr
,
static
tSQLSyntaxNode
*
parseRemainStr
(
char
*
pstr
,
tSQL
SyntaxNode
*
pExpr
,
SSchema
*
pSchema
,
int32_t
optr
,
int32_t
numOfCols
,
int32_t
*
i
)
{
// set the previous generated node as the left child of new root
tSQLSyntaxNode
*
pLeft
=
malloc
(
sizeof
(
tSQLSyntaxNode
));
pLeft
->
nodeType
=
TSQL_NODE_EXPR
;
pLeft
->
pExpr
=
pExpr
;
pExpr
->
nodeType
=
TSQL_NODE_EXPR
;
// remain is the right child
tSQLSyntaxNode
*
pRight
=
createSyntaxTree
(
pSchema
,
numOfCols
,
pstr
,
i
);
if
(
pRight
==
NULL
||
(
pRight
->
nodeType
==
TSQL_NODE_COL
&&
p
Left
->
nodeType
!=
TSQL_NODE_VALUE
)
||
(
p
Left
->
nodeType
==
TSQL_NODE_VALUE
&&
pRight
->
nodeType
!=
TSQL_NODE_COL
))
{
tSQLSyntaxNodeDestroy
(
p
Left
,
NULL
);
if
(
pRight
==
NULL
||
(
pRight
->
nodeType
==
TSQL_NODE_COL
&&
p
Expr
->
nodeType
!=
TSQL_NODE_VALUE
)
||
(
p
Expr
->
nodeType
==
TSQL_NODE_VALUE
&&
pRight
->
nodeType
!=
TSQL_NODE_COL
))
{
tSQLSyntaxNodeDestroy
(
p
Expr
,
NULL
);
tSQLSyntaxNodeDestroy
(
pRight
,
NULL
);
return
NULL
;
}
tSQL
BinaryExpr
*
pNewExpr
=
(
tSQLBinaryExpr
*
)
calloc
(
1
,
sizeof
(
tSQLBinaryExpr
));
tSQL
SyntaxNode
*
pNewExpr
=
(
tSQLSyntaxNode
*
)
calloc
(
1
,
sizeof
(
tSQLSyntaxNode
));
uint8_t
k
=
optr
;
reviseBinaryExprIfNecessary
(
&
pLeft
,
&
pRight
,
&
k
);
pNewExpr
->
pLeft
=
pLeft
;
pNewExpr
->
pRight
=
pRight
;
pNewExpr
->
nSQLBinaryOptr
=
k
;
pNewExpr
->
filterOnPrimaryKey
=
isQueryOnPrimaryKey
(
pSchema
[
0
].
name
,
pLeft
,
pRight
);
reviseBinaryExprIfNecessary
(
&
pExpr
,
&
pRight
,
&
k
);
pNewExpr
->
_node
.
pLeft
=
pExpr
;
pNewExpr
->
_node
.
pRight
=
pRight
;
pNewExpr
->
_node
.
optr
=
k
;
tSQLSyntaxNode
*
pn
=
malloc
(
sizeof
(
tSQLSyntaxNode
));
pn
->
nodeType
=
TSQL_NODE_EXPR
;
pn
->
pExpr
=
pNewExpr
;
pNewExpr
->
_node
.
hasPK
=
isQueryOnPrimaryKey
(
pSchema
[
0
].
name
,
pExpr
,
pRight
);
pNewExpr
->
nodeType
=
TSQL_NODE_EXPR
;
return
p
n
;
return
p
NewExpr
;
}
uint8_t
isQueryOnPrimaryKey
(
const
char
*
primaryColumnName
,
const
tSQLSyntaxNode
*
pLeft
,
const
tSQLSyntaxNode
*
pRight
)
{
...
...
@@ -232,8 +224,8 @@ uint8_t isQueryOnPrimaryKey(const char *primaryColumnName, const tSQLSyntaxNode
return
(
strcmp
(
primaryColumnName
,
pLeft
->
pSchema
->
name
)
==
0
)
?
1
:
0
;
}
else
{
// if any children have query on primary key, their parents are also keep this value
return
((
pLeft
->
nodeType
==
TSQL_NODE_EXPR
&&
pLeft
->
pExpr
->
filterOnPrimaryKey
==
1
)
||
(
pRight
->
nodeType
==
TSQL_NODE_EXPR
&&
pRight
->
pExpr
->
filterOnPrimaryKey
==
1
))
==
true
return
((
pLeft
->
nodeType
==
TSQL_NODE_EXPR
&&
pLeft
->
_node
.
hasPK
==
1
)
||
(
pRight
->
nodeType
==
TSQL_NODE_EXPR
&&
pRight
->
_node
.
hasPK
==
1
))
==
true
?
1
:
0
;
}
...
...
@@ -308,22 +300,20 @@ static tSQLSyntaxNode *createSyntaxTree(SSchema *pSchema, int32_t numOfCols, cha
}
/* create binary expr as the child of new parent node */
tSQL
BinaryExpr
*
pBinExpr
=
(
tSQLBinaryExpr
*
)
calloc
(
1
,
sizeof
(
tSQLBinaryExpr
));
tSQL
SyntaxNode
*
pBinExpr
=
(
tSQLSyntaxNode
*
)
calloc
(
1
,
sizeof
(
tSQLSyntaxNode
));
reviseBinaryExprIfNecessary
(
&
pLeft
,
&
pRight
,
&
optr
);
pBinExpr
->
filterOnPrimaryKey
=
isQueryOnPrimaryKey
(
pSchema
[
0
].
name
,
pLeft
,
pRight
);
pBinExpr
->
pLeft
=
pLeft
;
pBinExpr
->
pRight
=
pRight
;
pBinExpr
->
nSQLBinaryO
ptr
=
optr
;
pBinExpr
->
_node
.
hasPK
=
isQueryOnPrimaryKey
(
pSchema
[
0
].
name
,
pLeft
,
pRight
);
pBinExpr
->
_node
.
pLeft
=
pLeft
;
pBinExpr
->
_node
.
pRight
=
pRight
;
pBinExpr
->
_node
.
o
ptr
=
optr
;
t0
=
tStrGetToken
(
str
,
i
,
true
,
0
,
NULL
);
if
(
t0
.
n
==
0
||
t0
.
type
==
TK_RP
)
{
tSQLSyntaxNode
*
pn
=
malloc
(
sizeof
(
tSQLSyntaxNode
));
pn
->
nodeType
=
TSQL_NODE_EXPR
;
pn
->
pExpr
=
pBinExpr
;
pn
->
colId
=
-
1
;
return
pn
;
pBinExpr
->
nodeType
=
TSQL_NODE_EXPR
;
return
pBinExpr
;
}
else
{
uint8_t
localOptr
=
getBinaryExprOptr
(
&
t0
);
if
(
localOptr
==
0
)
{
...
...
@@ -336,7 +326,7 @@ static tSQLSyntaxNode *createSyntaxTree(SSchema *pSchema, int32_t numOfCols, cha
}
}
void
tSQLBinaryExprFromString
(
tSQL
BinaryExpr
**
pExpr
,
SSchema
*
pSchema
,
int32_t
numOfCols
,
char
*
src
,
int32_t
len
)
{
void
tSQLBinaryExprFromString
(
tSQL
SyntaxNode
**
pExpr
,
SSchema
*
pSchema
,
int32_t
numOfCols
,
char
*
src
,
int32_t
len
)
{
*
pExpr
=
NULL
;
if
(
len
<=
0
||
src
==
NULL
||
pSchema
==
NULL
||
numOfCols
<=
0
)
{
return
;
...
...
@@ -344,11 +334,9 @@ void tSQLBinaryExprFromString(tSQLBinaryExpr **pExpr, SSchema *pSchema, int32_t
int32_t
pos
=
0
;
tSQLSyntaxNode
*
pStxNode
=
createSyntaxTree
(
pSchema
,
numOfCols
,
src
,
&
pos
);
if
(
pStxNode
!=
NULL
)
{
assert
(
pStxNode
->
nodeType
==
TSQL_NODE_EXPR
);
*
pExpr
=
pStxNode
->
pExpr
;
free
(
pStxNode
);
*
pExpr
=
createSyntaxTree
(
pSchema
,
numOfCols
,
src
,
&
pos
);
if
(
*
pExpr
!=
NULL
)
{
assert
((
*
pExpr
)
->
nodeType
==
TSQL_NODE_EXPR
);
}
}
...
...
@@ -356,7 +344,7 @@ int32_t tSQLBinaryExprToStringImpl(tSQLSyntaxNode *pNode, char *dst, uint8_t typ
int32_t
len
=
0
;
if
(
type
==
TSQL_NODE_EXPR
)
{
*
dst
=
'('
;
tSQLBinaryExprToString
(
pNode
->
pExpr
,
dst
+
1
,
&
len
);
tSQLBinaryExprToString
(
pNode
,
dst
+
1
,
&
len
);
len
+=
2
;
*
(
dst
+
len
-
1
)
=
')'
;
}
else
if
(
type
==
TSQL_NODE_COL
)
{
...
...
@@ -418,21 +406,21 @@ static char *tSQLOptrToString(uint8_t optr, char *dst) {
return
dst
;
}
void
tSQLBinaryExprToString
(
tSQL
BinaryExpr
*
pExpr
,
char
*
dst
,
int32_t
*
len
)
{
void
tSQLBinaryExprToString
(
tSQL
SyntaxNode
*
pExpr
,
char
*
dst
,
int32_t
*
len
)
{
if
(
pExpr
==
NULL
)
{
*
dst
=
0
;
*
len
=
0
;
return
;
}
int32_t
lhs
=
tSQLBinaryExprToStringImpl
(
pExpr
->
pLeft
,
dst
,
pExpr
->
pLeft
->
nodeType
);
int32_t
lhs
=
tSQLBinaryExprToStringImpl
(
pExpr
->
_node
.
pLeft
,
dst
,
pExpr
->
_node
.
pLeft
->
nodeType
);
dst
+=
lhs
;
*
len
=
lhs
;
char
*
start
=
tSQLOptrToString
(
pExpr
->
nSQLBinaryO
ptr
,
dst
);
char
*
start
=
tSQLOptrToString
(
pExpr
->
_node
.
o
ptr
,
dst
);
*
len
+=
(
start
-
dst
);
*
len
+=
tSQLBinaryExprToStringImpl
(
pExpr
->
pRight
,
start
,
pExpr
->
pRight
->
nodeType
);
*
len
+=
tSQLBinaryExprToStringImpl
(
pExpr
->
_node
.
pRight
,
start
,
pExpr
->
_node
.
pRight
->
nodeType
);
}
static
void
UNUSED_FUNC
destroySyntaxTree
(
tSQLSyntaxNode
*
pNode
)
{
tSQLSyntaxNodeDestroy
(
pNode
,
NULL
);
}
...
...
@@ -443,7 +431,7 @@ static void tSQLSyntaxNodeDestroy(tSQLSyntaxNode *pNode, void (*fp)(void *)) {
}
if
(
pNode
->
nodeType
==
TSQL_NODE_EXPR
)
{
tSQLBinaryExprDestroy
(
&
pNode
->
pExpr
,
fp
);
tSQLBinaryExprDestroy
(
&
pNode
,
fp
);
}
else
if
(
pNode
->
nodeType
==
TSQL_NODE_VALUE
)
{
tVariantDestroy
(
pNode
->
pVal
);
}
...
...
@@ -451,16 +439,16 @@ static void tSQLSyntaxNodeDestroy(tSQLSyntaxNode *pNode, void (*fp)(void *)) {
free
(
pNode
);
}
void
tSQLBinaryExprDestroy
(
tSQL
BinaryExpr
**
pExpr
,
void
(
*
fp
)(
void
*
))
{
void
tSQLBinaryExprDestroy
(
tSQL
SyntaxNode
**
pExpr
,
void
(
*
fp
)(
void
*
))
{
if
(
*
pExpr
==
NULL
)
{
return
;
}
tSQLSyntaxNodeDestroy
((
*
pExpr
)
->
pLeft
,
fp
);
tSQLSyntaxNodeDestroy
((
*
pExpr
)
->
pRight
,
fp
);
tSQLSyntaxNodeDestroy
((
*
pExpr
)
->
_node
.
pLeft
,
fp
);
tSQLSyntaxNodeDestroy
((
*
pExpr
)
->
_node
.
pRight
,
fp
);
if
(
fp
!=
NULL
)
{
fp
((
*
pExpr
)
->
info
);
fp
((
*
pExpr
)
->
_node
.
info
);
}
free
(
*
pExpr
);
...
...
@@ -650,13 +638,13 @@ int32_t intersect(tQueryResultset *pLeft, tQueryResultset *pRight, tQueryResults
/*
* traverse the result and apply the function to each item to check if the item is qualified or not
*/
static
UNUSED_FUNC
void
tSQLListTraverseOnResult
(
struct
tSQL
BinaryExpr
*
pExpr
,
__result_filter_fn_t
fp
,
tQueryResultset
*
pResult
)
{
assert
(
pExpr
->
pLeft
->
nodeType
==
TSQL_NODE_COL
&&
pExpr
->
pRight
->
nodeType
==
TSQL_NODE_VALUE
);
static
UNUSED_FUNC
void
tSQLListTraverseOnResult
(
struct
tSQL
SyntaxNode
*
pExpr
,
__result_filter_fn_t
fp
,
tQueryResultset
*
pResult
)
{
assert
(
pExpr
->
_node
.
pLeft
->
nodeType
==
TSQL_NODE_COL
&&
pExpr
->
_node
.
pRight
->
nodeType
==
TSQL_NODE_VALUE
);
// brutal force scan the result list and check for each item in the list
int64_t
num
=
pResult
->
num
;
for
(
int32_t
i
=
0
,
j
=
0
;
i
<
pResult
->
num
;
++
i
)
{
if
(
fp
==
NULL
||
(
fp
(
pResult
->
pRes
[
i
],
pExpr
->
info
)
==
true
))
{
if
(
fp
==
NULL
||
(
fp
(
pResult
->
pRes
[
i
],
pExpr
->
_node
.
info
)
==
true
))
{
pResult
->
pRes
[
j
++
]
=
pResult
->
pRes
[
i
];
}
else
{
num
--
;
...
...
@@ -666,27 +654,27 @@ static UNUSED_FUNC void tSQLListTraverseOnResult(struct tSQLBinaryExpr *pExpr, _
pResult
->
num
=
num
;
}
static
bool
filterItem
(
tSQL
BinaryExpr
*
pExpr
,
const
void
*
pItem
,
SBinaryFilterSupp
*
param
)
{
tSQLSyntaxNode
*
pLeft
=
pExpr
->
pLeft
;
tSQLSyntaxNode
*
pRight
=
pExpr
->
pRight
;
static
bool
filterItem
(
tSQL
SyntaxNode
*
pExpr
,
const
void
*
pItem
,
SBinaryFilterSupp
*
param
)
{
tSQLSyntaxNode
*
pLeft
=
pExpr
->
_node
.
pLeft
;
tSQLSyntaxNode
*
pRight
=
pExpr
->
_node
.
pRight
;
/*
* non-leaf nodes, recursively traverse the syntax tree in the post-root order
*/
if
(
pLeft
->
nodeType
==
TSQL_NODE_EXPR
&&
pRight
->
nodeType
==
TSQL_NODE_EXPR
)
{
if
(
pExpr
->
nSQLBinaryO
ptr
==
TSDB_RELATION_OR
)
{
// or
if
(
filterItem
(
pLeft
->
pExpr
,
pItem
,
param
))
{
if
(
pExpr
->
_node
.
o
ptr
==
TSDB_RELATION_OR
)
{
// or
if
(
filterItem
(
pLeft
,
pItem
,
param
))
{
return
true
;
}
// left child does not satisfy the query condition, try right child
return
filterItem
(
pRight
->
pExpr
,
pItem
,
param
);
return
filterItem
(
pRight
,
pItem
,
param
);
}
else
{
// and
if
(
!
filterItem
(
pLeft
->
pExpr
,
pItem
,
param
))
{
if
(
!
filterItem
(
pLeft
,
pItem
,
param
))
{
return
false
;
}
return
filterItem
(
pRight
->
pExpr
,
pItem
,
param
);
return
filterItem
(
pRight
,
pItem
,
param
);
}
}
...
...
@@ -694,7 +682,7 @@ static bool filterItem(tSQLBinaryExpr *pExpr, const void *pItem, SBinaryFilterSu
assert
(
pLeft
->
nodeType
==
TSQL_NODE_COL
&&
pRight
->
nodeType
==
TSQL_NODE_VALUE
);
param
->
setupInfoFn
(
pExpr
,
param
->
pExtInfo
);
return
param
->
fp
(
pItem
,
pExpr
->
info
);
return
param
->
fp
(
pItem
,
pExpr
->
_node
.
info
);
}
/**
...
...
@@ -707,7 +695,7 @@ static bool filterItem(tSQLBinaryExpr *pExpr, const void *pItem, SBinaryFilterSu
* @param pSchema tag schemas
* @param fp filter callback function
*/
static
void
tSQLBinaryTraverseOnResult
(
tSQL
BinaryExpr
*
pExpr
,
SArray
*
pResult
,
SBinaryFilterSupp
*
param
)
{
static
void
tSQLBinaryTraverseOnResult
(
tSQL
SyntaxNode
*
pExpr
,
SArray
*
pResult
,
SBinaryFilterSupp
*
param
)
{
size_t
size
=
taosArrayGetSize
(
pResult
);
SArray
*
array
=
taosArrayInit
(
size
,
POINTER_BYTES
);
...
...
@@ -722,7 +710,7 @@ static void tSQLBinaryTraverseOnResult(tSQLBinaryExpr *pExpr, SArray *pResult, S
taosArrayCopy
(
pResult
,
array
);
}
static
void
tSQLBinaryTraverseOnSkipList
(
tSQL
BinaryExpr
*
pExpr
,
SArray
*
pResult
,
SSkipList
*
pSkipList
,
static
void
tSQLBinaryTraverseOnSkipList
(
tSQL
SyntaxNode
*
pExpr
,
SArray
*
pResult
,
SSkipList
*
pSkipList
,
SBinaryFilterSupp
*
param
)
{
SSkipListIterator
*
iter
=
tSkipListCreateIter
(
pSkipList
);
...
...
@@ -736,17 +724,17 @@ static void tSQLBinaryTraverseOnSkipList(tSQLBinaryExpr *pExpr, SArray *pResult,
}
// post-root order traverse syntax tree
void
tSQLBinaryExprTraverse
(
tSQL
BinaryExpr
*
pExpr
,
SSkipList
*
pSkipList
,
SArray
*
result
,
SBinaryFilterSupp
*
param
)
{
void
tSQLBinaryExprTraverse
(
tSQL
SyntaxNode
*
pExpr
,
SSkipList
*
pSkipList
,
SArray
*
result
,
SBinaryFilterSupp
*
param
)
{
if
(
pExpr
==
NULL
)
{
return
;
}
tSQLSyntaxNode
*
pLeft
=
pExpr
->
pLeft
;
tSQLSyntaxNode
*
pRight
=
pExpr
->
pRight
;
tSQLSyntaxNode
*
pLeft
=
pExpr
->
_node
.
pLeft
;
tSQLSyntaxNode
*
pRight
=
pExpr
->
_node
.
pRight
;
// recursive traverse left child branch
if
(
pLeft
->
nodeType
==
TSQL_NODE_EXPR
||
pRight
->
nodeType
==
TSQL_NODE_EXPR
)
{
uint8_t
weight
=
pLeft
->
pExpr
->
filterOnPrimaryKey
+
pRight
->
pExpr
->
filterOnPrimaryKey
;
uint8_t
weight
=
pLeft
->
_node
.
hasPK
+
pRight
->
_node
.
hasPK
;
if
(
weight
==
0
&&
taosArrayGetSize
(
result
)
>
0
&&
pSkipList
==
NULL
)
{
/**
...
...
@@ -762,16 +750,16 @@ void tSQLBinaryExprTraverse(tSQLBinaryExpr *pExpr, SSkipList *pSkipList, SArray
*/
assert
(
taosArrayGetSize
(
result
)
==
0
);
tSQLBinaryTraverseOnSkipList
(
pExpr
,
result
,
pSkipList
,
param
);
}
else
if
(
weight
==
2
||
(
weight
==
1
&&
pExpr
->
nSQLBinaryO
ptr
==
TSDB_RELATION_OR
))
{
}
else
if
(
weight
==
2
||
(
weight
==
1
&&
pExpr
->
_node
.
o
ptr
==
TSDB_RELATION_OR
))
{
tQueryResultset
rLeft
=
{
0
};
tQueryResultset
rRight
=
{
0
};
tSQLBinaryExprTraverse
(
pLeft
->
pExpr
,
pSkipList
,
&
rLeft
,
param
);
tSQLBinaryExprTraverse
(
pRight
->
pExpr
,
pSkipList
,
&
rRight
,
param
);
tSQLBinaryExprTraverse
(
pLeft
,
pSkipList
,
&
rLeft
,
param
);
tSQLBinaryExprTraverse
(
pRight
,
pSkipList
,
&
rRight
,
param
);
if
(
pExpr
->
nSQLBinaryO
ptr
==
TSDB_RELATION_AND
)
{
// CROSS
if
(
pExpr
->
_node
.
o
ptr
==
TSDB_RELATION_AND
)
{
// CROSS
intersect
(
&
rLeft
,
&
rRight
,
result
);
}
else
if
(
pExpr
->
nSQLBinaryO
ptr
==
TSDB_RELATION_OR
)
{
// or
}
else
if
(
pExpr
->
_node
.
o
ptr
==
TSDB_RELATION_OR
)
{
// or
merge
(
&
rLeft
,
&
rRight
,
result
);
}
else
{
assert
(
false
);
...
...
@@ -786,16 +774,16 @@ void tSQLBinaryExprTraverse(tSQLBinaryExpr *pExpr, SSkipList *pSkipList, SArray
* first, we filter results based on the skiplist index, which is the initial filter stage,
* then, we conduct the secondary filter operation based on the result from the initial filter stage.
*/
assert
(
pExpr
->
nSQLBinaryO
ptr
==
TSDB_RELATION_AND
);
assert
(
pExpr
->
_node
.
o
ptr
==
TSDB_RELATION_AND
);
tSQL
BinaryExpr
*
pFirst
=
NULL
;
tSQL
BinaryExpr
*
pSecond
=
NULL
;
if
(
pLeft
->
pExpr
->
filterOnPrimaryKey
==
1
)
{
pFirst
=
pLeft
->
pExpr
;
pSecond
=
pRight
->
pExpr
;
tSQL
SyntaxNode
*
pFirst
=
NULL
;
tSQL
SyntaxNode
*
pSecond
=
NULL
;
if
(
pLeft
->
_node
.
hasPK
==
1
)
{
pFirst
=
pLeft
;
pSecond
=
pRight
;
}
else
{
pFirst
=
pRight
->
pExpr
;
pSecond
=
pLeft
->
pExpr
;
pFirst
=
pRight
;
pSecond
=
pLeft
;
}
assert
(
pFirst
!=
pSecond
&&
pFirst
!=
NULL
&&
pSecond
!=
NULL
);
...
...
@@ -822,25 +810,25 @@ void tSQLBinaryExprTraverse(tSQLBinaryExpr *pExpr, SSkipList *pSkipList, SArray
}
}
void
tSQLBinaryExprCalcTraverse
(
tSQL
BinaryExpr
*
pExprs
,
int32_t
numOfRows
,
char
*
pOutput
,
void
*
param
,
int32_t
order
,
void
tSQLBinaryExprCalcTraverse
(
tSQL
SyntaxNode
*
pExprs
,
int32_t
numOfRows
,
char
*
pOutput
,
void
*
param
,
int32_t
order
,
char
*
(
*
getSourceDataBlock
)(
void
*
,
char
*
,
int32_t
))
{
if
(
pExprs
==
NULL
)
{
return
;
}
tSQLSyntaxNode
*
pLeft
=
pExprs
->
pLeft
;
tSQLSyntaxNode
*
pRight
=
pExprs
->
pRight
;
tSQLSyntaxNode
*
pLeft
=
pExprs
->
_node
.
pLeft
;
tSQLSyntaxNode
*
pRight
=
pExprs
->
_node
.
pRight
;
/* the left output has result from the left child syntax tree */
char
*
pLeftOutput
=
(
char
*
)
malloc
(
sizeof
(
int64_t
)
*
numOfRows
);
if
(
pLeft
->
nodeType
==
TSQL_NODE_EXPR
)
{
tSQLBinaryExprCalcTraverse
(
pLeft
->
pExpr
,
numOfRows
,
pLeftOutput
,
param
,
order
,
getSourceDataBlock
);
tSQLBinaryExprCalcTraverse
(
pLeft
,
numOfRows
,
pLeftOutput
,
param
,
order
,
getSourceDataBlock
);
}
/* the right output has result from the right child syntax tree */
char
*
pRightOutput
=
malloc
(
sizeof
(
int64_t
)
*
numOfRows
);
if
(
pRight
->
nodeType
==
TSQL_NODE_EXPR
)
{
tSQLBinaryExprCalcTraverse
(
pRight
->
pExpr
,
numOfRows
,
pRightOutput
,
param
,
order
,
getSourceDataBlock
);
tSQLBinaryExprCalcTraverse
(
pRight
,
numOfRows
,
pRightOutput
,
param
,
order
,
getSourceDataBlock
);
}
if
(
pLeft
->
nodeType
==
TSQL_NODE_EXPR
)
{
...
...
@@ -849,51 +837,51 @@ void tSQLBinaryExprCalcTraverse(tSQLBinaryExpr *pExprs, int32_t numOfRows, char
* exprLeft + exprRight
* the type of returned value of one expression is always double float precious
*/
_bi_consumer_fn_t
fp
=
tGetBiConsumerFn
(
TSDB_DATA_TYPE_DOUBLE
,
TSDB_DATA_TYPE_DOUBLE
,
pExprs
->
nSQLBinaryO
ptr
);
_bi_consumer_fn_t
fp
=
tGetBiConsumerFn
(
TSDB_DATA_TYPE_DOUBLE
,
TSDB_DATA_TYPE_DOUBLE
,
pExprs
->
_node
.
o
ptr
);
fp
(
pLeftOutput
,
pRightOutput
,
numOfRows
,
numOfRows
,
pOutput
,
order
);
}
else
if
(
pRight
->
nodeType
==
TSQL_NODE_COL
)
{
// exprLeft + columnRight
_bi_consumer_fn_t
fp
=
tGetBiConsumerFn
(
TSDB_DATA_TYPE_DOUBLE
,
pRight
->
pSchema
->
type
,
pExprs
->
nSQLBinaryO
ptr
);
_bi_consumer_fn_t
fp
=
tGetBiConsumerFn
(
TSDB_DATA_TYPE_DOUBLE
,
pRight
->
pSchema
->
type
,
pExprs
->
_node
.
o
ptr
);
// set input buffer
char
*
pInputData
=
getSourceDataBlock
(
param
,
pRight
->
pSchema
->
name
,
pRight
->
colId
);
char
*
pInputData
=
getSourceDataBlock
(
param
,
pRight
->
pSchema
->
name
,
pRight
->
pSchema
->
colId
);
fp
(
pLeftOutput
,
pInputData
,
numOfRows
,
numOfRows
,
pOutput
,
order
);
}
else
if
(
pRight
->
nodeType
==
TSQL_NODE_VALUE
)
{
// exprLeft + 12
_bi_consumer_fn_t
fp
=
tGetBiConsumerFn
(
TSDB_DATA_TYPE_DOUBLE
,
pRight
->
pVal
->
nType
,
pExprs
->
nSQLBinaryO
ptr
);
_bi_consumer_fn_t
fp
=
tGetBiConsumerFn
(
TSDB_DATA_TYPE_DOUBLE
,
pRight
->
pVal
->
nType
,
pExprs
->
_node
.
o
ptr
);
fp
(
pLeftOutput
,
&
pRight
->
pVal
->
i64Key
,
numOfRows
,
1
,
pOutput
,
order
);
}
}
else
if
(
pLeft
->
nodeType
==
TSQL_NODE_COL
)
{
// column data specified on left-hand-side
char
*
pLeftInputData
=
getSourceDataBlock
(
param
,
pLeft
->
pSchema
->
name
,
pLeft
->
colId
);
char
*
pLeftInputData
=
getSourceDataBlock
(
param
,
pLeft
->
pSchema
->
name
,
pLeft
->
pSchema
->
colId
);
if
(
pRight
->
nodeType
==
TSQL_NODE_EXPR
)
{
// columnLeft + expr2
_bi_consumer_fn_t
fp
=
tGetBiConsumerFn
(
pLeft
->
pSchema
->
type
,
TSDB_DATA_TYPE_DOUBLE
,
pExprs
->
nSQLBinaryO
ptr
);
_bi_consumer_fn_t
fp
=
tGetBiConsumerFn
(
pLeft
->
pSchema
->
type
,
TSDB_DATA_TYPE_DOUBLE
,
pExprs
->
_node
.
o
ptr
);
fp
(
pLeftInputData
,
pRightOutput
,
numOfRows
,
numOfRows
,
pOutput
,
order
);
}
else
if
(
pRight
->
nodeType
==
TSQL_NODE_COL
)
{
// columnLeft + columnRight
// column data specified on right-hand-side
char
*
pRightInputData
=
getSourceDataBlock
(
param
,
pRight
->
pSchema
->
name
,
pRight
->
colId
);
char
*
pRightInputData
=
getSourceDataBlock
(
param
,
pRight
->
pSchema
->
name
,
pRight
->
pSchema
->
colId
);
_bi_consumer_fn_t
fp
=
tGetBiConsumerFn
(
pLeft
->
pSchema
->
type
,
pRight
->
pSchema
->
type
,
pExprs
->
nSQLBinaryO
ptr
);
_bi_consumer_fn_t
fp
=
tGetBiConsumerFn
(
pLeft
->
pSchema
->
type
,
pRight
->
pSchema
->
type
,
pExprs
->
_node
.
o
ptr
);
fp
(
pLeftInputData
,
pRightInputData
,
numOfRows
,
numOfRows
,
pOutput
,
order
);
}
else
if
(
pRight
->
nodeType
==
TSQL_NODE_VALUE
)
{
// columnLeft + 12
_bi_consumer_fn_t
fp
=
tGetBiConsumerFn
(
pLeft
->
pSchema
->
type
,
pRight
->
pVal
->
nType
,
pExprs
->
nSQLBinaryO
ptr
);
_bi_consumer_fn_t
fp
=
tGetBiConsumerFn
(
pLeft
->
pSchema
->
type
,
pRight
->
pVal
->
nType
,
pExprs
->
_node
.
o
ptr
);
fp
(
pLeftInputData
,
&
pRight
->
pVal
->
i64Key
,
numOfRows
,
1
,
pOutput
,
order
);
}
}
else
{
// column data specified on left-hand-side
if
(
pRight
->
nodeType
==
TSQL_NODE_EXPR
)
{
// 12 + expr2
_bi_consumer_fn_t
fp
=
tGetBiConsumerFn
(
pLeft
->
pVal
->
nType
,
TSDB_DATA_TYPE_DOUBLE
,
pExprs
->
nSQLBinaryO
ptr
);
_bi_consumer_fn_t
fp
=
tGetBiConsumerFn
(
pLeft
->
pVal
->
nType
,
TSDB_DATA_TYPE_DOUBLE
,
pExprs
->
_node
.
o
ptr
);
fp
(
&
pLeft
->
pVal
->
i64Key
,
pRightOutput
,
1
,
numOfRows
,
pOutput
,
order
);
}
else
if
(
pRight
->
nodeType
==
TSQL_NODE_COL
)
{
// 12 + columnRight
// column data specified on right-hand-side
char
*
pRightInputData
=
getSourceDataBlock
(
param
,
pRight
->
pSchema
->
name
,
pRight
->
colId
);
_bi_consumer_fn_t
fp
=
tGetBiConsumerFn
(
pLeft
->
pVal
->
nType
,
pRight
->
pSchema
->
type
,
pExprs
->
nSQLBinaryO
ptr
);
char
*
pRightInputData
=
getSourceDataBlock
(
param
,
pRight
->
pSchema
->
name
,
pRight
->
pSchema
->
colId
);
_bi_consumer_fn_t
fp
=
tGetBiConsumerFn
(
pLeft
->
pVal
->
nType
,
pRight
->
pSchema
->
type
,
pExprs
->
_node
.
o
ptr
);
fp
(
&
pLeft
->
pVal
->
i64Key
,
pRightInputData
,
1
,
numOfRows
,
pOutput
,
order
);
}
else
if
(
pRight
->
nodeType
==
TSQL_NODE_VALUE
)
{
// 12 + 12
_bi_consumer_fn_t
fp
=
tGetBiConsumerFn
(
pLeft
->
pVal
->
nType
,
pRight
->
pVal
->
nType
,
pExprs
->
nSQLBinaryO
ptr
);
_bi_consumer_fn_t
fp
=
tGetBiConsumerFn
(
pLeft
->
pVal
->
nType
,
pRight
->
pVal
->
nType
,
pExprs
->
_node
.
o
ptr
);
fp
(
&
pLeft
->
pVal
->
i64Key
,
&
pRight
->
pVal
->
i64Key
,
1
,
1
,
pOutput
,
order
);
}
}
...
...
@@ -902,24 +890,24 @@ void tSQLBinaryExprCalcTraverse(tSQLBinaryExpr *pExprs, int32_t numOfRows, char
free
(
pRightOutput
);
}
void
tSQLBinaryExprTrv
(
tSQL
BinaryExpr
*
pExprs
,
int32_t
*
val
,
int16_t
*
ids
)
{
void
tSQLBinaryExprTrv
(
tSQL
SyntaxNode
*
pExprs
,
int32_t
*
val
,
int16_t
*
ids
)
{
if
(
pExprs
==
NULL
)
{
return
;
}
tSQLSyntaxNode
*
pLeft
=
pExprs
->
pLeft
;
tSQLSyntaxNode
*
pRight
=
pExprs
->
pRight
;
tSQLSyntaxNode
*
pLeft
=
pExprs
->
_node
.
pLeft
;
tSQLSyntaxNode
*
pRight
=
pExprs
->
_node
.
pRight
;
// recursive traverse left child branch
if
(
pLeft
->
nodeType
==
TSQL_NODE_EXPR
)
{
tSQLBinaryExprTrv
(
pLeft
->
pExpr
,
val
,
ids
);
tSQLBinaryExprTrv
(
pLeft
,
val
,
ids
);
}
else
if
(
pLeft
->
nodeType
==
TSQL_NODE_COL
)
{
ids
[
*
val
]
=
pLeft
->
pSchema
->
colId
;
(
*
val
)
+=
1
;
}
if
(
pRight
->
nodeType
==
TSQL_NODE_EXPR
)
{
tSQLBinaryExprTrv
(
pRight
->
pExpr
,
val
,
ids
);
tSQLBinaryExprTrv
(
pRight
,
val
,
ids
);
}
else
if
(
pRight
->
nodeType
==
TSQL_NODE_COL
)
{
ids
[
*
val
]
=
pRight
->
pSchema
->
colId
;
(
*
val
)
+=
1
;
...
...
src/query/src/qparserImpl.c
浏览文件 @
18da95ad
...
...
@@ -13,17 +13,18 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <qsqltype.h>
#include "os.h"
#include "qsqlparser.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tglobalcfg.h"
#include "tlog.h"
#include "tstoken.h"
#include "ttime.h"
#include "ttokendef.h"
#include "taosdef.h"
#include "tutil.h"
#include "qsqltype.h"
#include "tstrbuild.h"
int32_t
tSQLParse
(
SSqlInfo
*
pSQLInfo
,
const
char
*
pStr
)
{
void
*
pParser
=
ParseAlloc
(
malloc
);
...
...
@@ -900,3 +901,173 @@ void setDefaultCreateDbOption(SCreateDBInfo *pDBInfo) {
memset
(
&
pDBInfo
->
precision
,
0
,
sizeof
(
SSQLToken
));
}
static
bool
isExprLeafNode
(
tSQLExpr
*
pExpr
)
{
return
(
pExpr
->
pRight
==
NULL
&&
pExpr
->
pLeft
==
NULL
)
&&
(
pExpr
->
nSQLOptr
==
TK_ID
||
(
pExpr
->
nSQLOptr
>=
TK_BOOL
&&
pExpr
->
nSQLOptr
<=
TK_NCHAR
)
||
pExpr
->
nSQLOptr
==
TK_SET
);
}
static
bool
isExprParentOfLeafNode
(
tSQLExpr
*
pExpr
)
{
return
(
pExpr
->
pLeft
!=
NULL
&&
pExpr
->
pRight
!=
NULL
)
&&
(
isExprLeafNode
(
pExpr
->
pLeft
)
&&
isExprLeafNode
(
pExpr
->
pRight
));
}
static
int32_t
tSQLExprNodeToString
(
tSQLExpr
*
pExpr
,
SStringBuilder
*
pBuilder
)
{
if
(
pExpr
->
nSQLOptr
==
TK_ID
)
{
// column name
// strncpy(*str, pExpr->colInfo.z, pExpr->colInfo.n);
// *str += pExpr->colInfo.n;
}
else
if
(
pExpr
->
nSQLOptr
>=
TK_BOOL
&&
pExpr
->
nSQLOptr
<=
TK_STRING
)
{
// value
// *str += tVariantToString(&pExpr->val, *str);
// taosStringBuilderAppendStringLen()
}
else
if
(
pExpr
->
nSQLOptr
>=
TK_COUNT
&&
pExpr
->
nSQLOptr
<=
TK_AVG_IRATE
)
{
taosStringBuilderAppendStringLen
(
pBuilder
,
pExpr
->
operand
.
z
,
pExpr
->
operand
.
n
);
}
else
{
// not supported operation
assert
(
false
);
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
optrToString
(
tSQLExpr
*
pExpr
,
char
**
exprString
)
{
const
char
*
le
=
"<="
;
const
char
*
ge
=
">="
;
const
char
*
ne
=
"<>"
;
const
char
*
likeOptr
=
"LIKE"
;
switch
(
pExpr
->
nSQLOptr
)
{
case
TK_LE
:
{
*
(
int16_t
*
)(
*
exprString
)
=
*
(
int16_t
*
)
le
;
*
exprString
+=
1
;
break
;
}
case
TK_GE
:
{
*
(
int16_t
*
)(
*
exprString
)
=
*
(
int16_t
*
)
ge
;
*
exprString
+=
1
;
break
;
}
case
TK_NE
:
{
*
(
int16_t
*
)(
*
exprString
)
=
*
(
int16_t
*
)
ne
;
*
exprString
+=
1
;
break
;
}
case
TK_LT
:
*
(
*
exprString
)
=
'<'
;
break
;
case
TK_GT
:
*
(
*
exprString
)
=
'>'
;
break
;
case
TK_EQ
:
*
(
*
exprString
)
=
'='
;
break
;
case
TK_PLUS
:
*
(
*
exprString
)
=
'+'
;
break
;
case
TK_MINUS
:
*
(
*
exprString
)
=
'-'
;
break
;
case
TK_STAR
:
*
(
*
exprString
)
=
'*'
;
break
;
case
TK_DIVIDE
:
*
(
*
exprString
)
=
'/'
;
break
;
case
TK_REM
:
*
(
*
exprString
)
=
'%'
;
break
;
case
TK_LIKE
:
{
int32_t
len
=
sprintf
(
*
exprString
,
" %s "
,
likeOptr
);
*
exprString
+=
(
len
-
1
);
break
;
}
default:
return
TSDB_CODE_INVALID_SQL
;
}
*
exprString
+=
1
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
tSQLExprLeafToBinary
(
tSQLExpr
*
pExpr
,
SStringBuilder
*
pBuilder
)
{
if
(
!
isExprParentOfLeafNode
(
pExpr
))
{
return
TSDB_CODE_INVALID_SQL
;
}
tSQLExpr
*
pLeft
=
pExpr
->
pLeft
;
tSQLExpr
*
pRight
=
pExpr
->
pRight
;
// if (addParentheses) {
// *(*output) = '(';
// *output += 1;
// }
tSQLExprNodeToString
(
pLeft
,
pBuilder
);
tSQLExprNodeToString
(
pRight
,
pBuilder
);
if
(
optrToString
(
pExpr
,
pBuilder
)
!=
TSDB_CODE_SUCCESS
)
{
return
TSDB_CODE_INVALID_SQL
;
}
// if (addParentheses) {
// *(*output) = ')';
// *output += 1;
// }
return
TSDB_CODE_SUCCESS
;
}
static
void
relToString
(
tSQLExpr
*
pExpr
,
SStringBuilder
*
pBuilder
)
{
assert
(
pExpr
->
nSQLOptr
==
TK_AND
||
pExpr
->
nSQLOptr
==
TK_OR
);
const
char
*
or
=
"OR"
;
const
char
*
and
=
"AND"
;
// if (pQueryInfo->tagCond.relType == TSQL_STABLE_QTYPE_COND) {
// if (pExpr->nSQLOptr == TK_AND) {
// strcpy(*str, and);
// *str += strlen(and);
// } else {
// strcpy(*str, or);
// *str += strlen(or);
// }
}
static
int32_t
doSQLExprToBinary
(
tSQLExpr
*
pExpr
,
SStringBuilder
*
pBuilder
)
{
if
(
pExpr
==
NULL
)
{
return
TSDB_CODE_SUCCESS
;
}
if
(
!
isExprParentOfLeafNode
(
pExpr
))
{
// *(*str) = '(';
// *str += 1;
int32_t
ret
=
doSQLExprToBinary
(
pExpr
->
pLeft
,
pBuilder
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
ret
;
}
ret
=
doSQLExprToBinary
(
pExpr
->
pRight
,
pBuilder
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
ret
;
}
relToString
(
pExpr
,
pBuilder
);
// *(*str) = ')';
// *str += 1;
return
ret
;
}
return
tSQLExprLeafToBinary
(
pExpr
,
pBuilder
);
}
// post order seralize to binary data
int32_t
tSQLExprToBinary
(
tSQLExpr
*
pExpr
,
SStringBuilder
*
pBuilder
)
{
assert
(
pExpr
!=
NULL
&&
pBuilder
!=
NULL
);
}
\ No newline at end of file
src/query/src/queryExecutor.c
浏览文件 @
18da95ad
...
...
@@ -2603,8 +2603,8 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
static
int64_t
doScanAllDataBlocks
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
int64_t
cnt
=
0
;
dTrace
(
"QInfo:%p query start, qrange:%"
PRId64
"-%"
PRId64
", lastkey:%"
PRId64
", order:%d"
,
GET_QINFO_ADDR
(
pRuntimeEnv
),
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
pQuery
->
lastKey
,
pQuery
->
order
.
order
);
...
...
@@ -3595,8 +3595,8 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
pQuery
->
window
.
ekey
=
ekey
;
STimeWindow
win
=
{.
skey
=
pQuery
->
window
.
skey
,
.
ekey
=
pQuery
->
window
.
ekey
};
tsdbResetQuery
(
pRuntimeEnv
->
pQueryHandle
,
&
win
,
current
,
pQuery
->
order
.
order
);
tsdbNextDataBlock
(
pRuntimeEnv
->
pQueryHandle
);
//
tsdbResetQuery(pRuntimeEnv->pQueryHandle, &win, current, pQuery->order.order);
//
tsdbNextDataBlock(pRuntimeEnv->pQueryHandle);
}
void
doFinalizeResult
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
...
...
@@ -5461,7 +5461,7 @@ static int32_t buildAirthmeticExprFromMsg(SSqlFunctionExpr *pExpr, SQueryTableMs
SSqlBinaryExprInfo
*
pBinaryExprInfo
=
&
pExpr
->
binExprInfo
;
SColumnInfo
*
pColMsg
=
pQueryMsg
->
colList
;
#if 0
tSQL
BinaryExpr
* pBinExpr = NULL;
tSQL
SyntaxNode
* pBinExpr = NULL;
SSchema* pSchema = toSchema(pQueryMsg, pColMsg, pQueryMsg->numOfCols);
dTrace("qmsg:%p create binary expr from string:%s", pQueryMsg, pExpr->pBase.arg[0].argValue.pz);
...
...
src/query/src/queryUtil.c
浏览文件 @
18da95ad
...
...
@@ -21,7 +21,6 @@
#include "ttime.h"
#include "qinterpolation.h"
//#include "tscJoinProcess.h"
#include "ttime.h"
#include "queryExecutor.h"
...
...
src/vnode/tsdb/src/tsdbRead.c
浏览文件 @
18da95ad
...
...
@@ -88,8 +88,12 @@ typedef struct STableCheckInfo {
bool
checkFirstFileBlock
;
SCompIdx
*
compIndex
;
SCompInfo
*
pCompInfo
;
SCompBlock
*
pBlock
;
SDataCols
*
pDataCols
;
SFileGroup
*
pFileGroup
;
SFileGroupIter
fileIter
;
SSkipListIterator
*
iter
;
}
STableCheckInfo
;
...
...
@@ -293,6 +297,8 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond
.
lastKey
=
pQueryHandle
->
window
.
skey
,
.
tableId
=
id
,
.
pTableObj
=
tsdbGetTableByUid
(
tsdbGetMeta
(
tsdb
),
id
.
uid
),
//todo this may be failed
.
compIndex
=
calloc
(
10000
,
sizeof
(
SCompIdx
)),
.
pCompInfo
=
calloc
(
1
,
1024
),
};
taosArrayPush
(
pQueryHandle
->
pTableCheckInfo
,
&
info
);
...
...
@@ -357,13 +363,18 @@ static int32_t getFileIdFromKey(TSKEY key) {
}
static
int32_t
getFileCompInfo
(
STableCheckInfo
*
pCheckInfo
,
SFileGroup
*
fileGroup
)
{
// check open file failed
if
(
fileGroup
->
files
[
TSDB_FILE_TYPE_HEAD
].
fd
==
FD_INITIALIZER
)
{
fileGroup
->
files
[
TSDB_FILE_TYPE_HEAD
].
fd
=
open
(
fileGroup
->
files
[
TSDB_FILE_TYPE_HEAD
].
fname
,
O_RDONLY
);
}
tsdbLoadCompIdx
(
fileGroup
,
pCheckInfo
->
compIndex
,
10000
);
// todo set dynamic max tables
SCompIdx
*
compIndex
=
&
pCheckInfo
->
compIndex
[
pCheckInfo
->
tableId
.
tid
];
if
(
compIndex
->
len
==
0
||
compIndex
->
numOfSuperBlocks
==
0
)
{
// no data block in this file, try next file
}
else
{
tsdbLoadCompBlocks
(
fileGroup
,
compIndex
,
pCheckInfo
->
p
Block
);
tsdbLoadCompBlocks
(
fileGroup
,
compIndex
,
pCheckInfo
->
p
CompInfo
);
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -396,68 +407,111 @@ static int32_t binarySearchForBlockImpl(SCompBlock *pBlock, int32_t numOfBlocks,
return
midSlot
;
}
static
SDataBlockInfo
getTrueBlockInfo
(
STsdbQueryHandle
*
pHandle
,
STableCheckInfo
*
pCheckInfo
)
{
S
DataBlockInfo
info
=
{{
0
},
0
}
;
static
SDataBlockInfo
getTrue
Data
BlockInfo
(
STsdbQueryHandle
*
pHandle
,
STableCheckInfo
*
pCheckInfo
)
{
S
CompBlock
*
pDiskBlock
=
&
pCheckInfo
->
pCompInfo
->
blocks
[
pHandle
->
cur
.
slot
]
;
SCompBlock
*
pDiskBlock
=
&
pCheckInfo
->
pBlock
[
pHandle
->
cur
.
slot
];
info
.
window
.
skey
=
pDiskBlock
->
keyFirst
;
info
.
window
.
ekey
=
pDiskBlock
->
keyLast
;
info
.
size
=
pDiskBlock
->
numOfPoints
;
info
.
numOfCols
=
pDiskBlock
->
numOfCols
;
SDataBlockInfo
info
=
{
.
window
=
{.
skey
=
pDiskBlock
->
keyFirst
,
.
ekey
=
pDiskBlock
->
keyLast
},
.
numOfCols
=
pDiskBlock
->
numOfCols
,
.
size
=
pDiskBlock
->
numOfPoints
,
.
sid
=
pCheckInfo
->
tableId
.
tid
,
.
uid
=
pCheckInfo
->
tableId
.
uid
,
};
return
info
;
}
SArray
*
getDefaultLoadColumns
(
STsdbQueryHandle
*
pQueryHandle
,
bool
loadTS
);
static
void
filterDataInDataBlock
(
STsdbQueryHandle
*
pQueryHandle
,
SDataCols
*
pCols
,
SArray
*
sa
);
static
bool
loadQualifiedDataFromFileBlock
(
STsdbQueryHandle
*
pQueryHandle
)
{
SQueryFilePos
*
cur
=
&
pQueryHandle
->
cur
;
STableCheckInfo
*
pCheckInfo
=
taosArrayGet
(
pQueryHandle
->
pTableCheckInfo
,
pQueryHandle
->
activeIndex
);
SCompBlock
*
pBlock
=
&
pCheckInfo
->
pCompInfo
->
blocks
[
cur
->
slot
];
SArray
*
sa
=
getDefaultLoadColumns
(
pQueryHandle
,
true
);
if
(
QUERY_IS_ASC_QUERY
(
pQueryHandle
->
order
))
{
if
(
pQueryHandle
->
window
.
ekey
<
pBlock
->
keyLast
)
{
SCompData
*
data
=
calloc
(
1
,
sizeof
(
SCompData
)
+
sizeof
(
SCompCol
)
*
pBlock
->
numOfCols
);
data
->
numOfCols
=
pBlock
->
numOfCols
;
data
->
uid
=
pCheckInfo
->
pTableObj
->
tableId
.
uid
;
pCheckInfo
->
pDataCols
=
tdNewDataCols
(
1000
,
2
,
4096
);
tdInitDataCols
(
pCheckInfo
->
pDataCols
,
pCheckInfo
->
pTableObj
->
schema
);
SFile
*
pFile
=
&
pCheckInfo
->
pFileGroup
->
files
[
TSDB_FILE_TYPE_DATA
];
if
(
pFile
->
fd
==
FD_INITIALIZER
)
{
pFile
->
fd
=
open
(
pFile
->
fname
,
O_RDONLY
);
}
if
(
tsdbLoadDataBlock
(
pFile
,
pBlock
,
1
,
pCheckInfo
->
pDataCols
,
data
)
==
0
)
{
//do something
}
}
}
else
{
if
(
pQueryHandle
->
window
.
ekey
>
pBlock
->
keyFirst
)
{
// loadDataBlockIntoMem_(pQueryHandle, pBlock, &pQueryHandle->pFields[cur->slot], cur->fileId, sa);
}
}
filterDataInDataBlock
(
pQueryHandle
,
pCheckInfo
->
pDataCols
,
sa
);
return
pQueryHandle
->
realNumOfRows
>
0
;
}
bool
moveToNextBlock
(
STsdbQueryHandle
*
pQueryHandle
,
int32_t
step
)
{
SQueryFilePos
*
cur
=
&
pQueryHandle
->
cur
;
if
(
pQueryHandle
->
cur
.
fid
>=
0
)
{
int32_t
fileIndex
=
-
1
;
/*
* 1. ascending order. The last data block of data file
* 2. descending order. The first block of file
*/
if
((
step
==
QUERY_ASC_FORWARD_STEP
&&
(
pQueryHandle
->
cur
.
slot
==
pQueryHandle
->
numOfBlocks
-
1
))
||
STableCheckInfo
*
pCheckInfo
=
taosArrayGet
(
pQueryHandle
->
pTableCheckInfo
,
pQueryHandle
->
activeIndex
);
int32_t
tid
=
pCheckInfo
->
tableId
.
tid
;
if
((
step
==
QUERY_ASC_FORWARD_STEP
&&
(
pQueryHandle
->
cur
.
slot
==
pCheckInfo
->
compIndex
[
tid
].
numOfSuperBlocks
-
1
))
||
(
step
==
QUERY_DESC_FORWARD_STEP
&&
(
pQueryHandle
->
cur
.
slot
==
0
)))
{
// temporarily keep the position value, in case of no data qualified when move forwards(backwards)
SQueryFilePos
save
=
pQueryHandle
->
cur
;
// fileIndex = getNextDataFileCompInfo(pQueryHandle, &pQueryHandle->cur, &pQueryHandle->vnodeFileInfo, step);
// first data block in the next file
if
(
fileIndex
>=
0
)
{
cur
->
slot
=
(
step
==
QUERY_ASC_FORWARD_STEP
)
?
0
:
pQueryHandle
->
numOfBlocks
-
1
;
cur
->
pos
=
(
step
==
QUERY_ASC_FORWARD_STEP
)
?
0
:
pQueryHandle
->
pBlock
[
cur
->
slot
].
numOfPoints
-
1
;
// return loadQaulifiedData(pQueryHandle);
}
else
{
// try data in cache
assert
(
cur
->
fid
==
-
1
);
if
(
step
==
QUERY_ASC_FORWARD_STEP
)
{
// TSKEY nextTimestamp =
// getQueryStartPositionInCache_rv(pQueryHandle, &pQueryHandle->cur.slot, &pQueryHandle->cur.pos, true);
// if (nextTimestamp < 0) {
// pQueryHandle->cur = save;
// }
//
// return (nextTimestamp > 0);
SFileGroup
*
fgroup
=
tsdbGetFileGroupNext
(
&
pCheckInfo
->
fileIter
);
int32_t
fid
=
-
1
;
if
(
fgroup
!=
NULL
)
{
if
((
fid
=
getFileCompInfo
(
pCheckInfo
,
fgroup
))
<
0
)
{
}
else
{
cur
->
slot
=
(
step
==
QUERY_ASC_FORWARD_STEP
)
?
0
:
pQueryHandle
->
numOfBlocks
-
1
;
cur
->
pos
=
(
step
==
QUERY_ASC_FORWARD_STEP
)
?
0
:
pQueryHandle
->
pBlock
[
cur
->
slot
].
numOfPoints
-
1
;
SCompBlock
*
pBlock
=
&
pCheckInfo
->
pCompInfo
->
blocks
[
cur
->
slot
];
SCompData
*
data
=
calloc
(
1
,
sizeof
(
SCompData
)
+
sizeof
(
SCompCol
)
*
pBlock
->
numOfCols
);
data
->
numOfCols
=
pBlock
->
numOfCols
;
data
->
uid
=
pCheckInfo
->
pTableObj
->
tableId
.
uid
;
cur
->
fid
=
fgroup
->
fileId
;
assert
(
cur
->
pos
>=
0
&&
cur
->
fid
>=
0
&&
cur
->
slot
>=
0
);
if
(
pBlock
->
keyFirst
>
pQueryHandle
->
window
.
ekey
)
{
// done
return
false
;
}
loadQualifiedDataFromFileBlock
(
pQueryHandle
);
return
true
;
}
// no data to check for desc order query, restore the saved position value
pQueryHandle
->
cur
=
save
;
return
false
;
}
else
{
// check data in cache
return
hasMoreDataInCacheForSingleModel
(
pQueryHandle
);
}
}
else
{
// next block in the same file
cur
->
slot
+=
step
;
SCompBlock
*
pBlock
=
&
pQueryHandle
->
pBlock
[
cur
->
slot
];
cur
->
pos
=
(
step
==
QUERY_ASC_FORWARD_STEP
)
?
0
:
pBlock
->
numOfPoints
-
1
;
return
loadQualifiedDataFromFileBlock
(
pQueryHandle
);
}
// next block in the same file
int32_t
fid
=
cur
->
fid
;
// fileIndex = vnodeGetVnodeHeaderFileIndex(&fid, pQueryHandle->order, &pQueryHandle->vnodeFileInfo);
cur
->
slot
+=
step
;
SCompBlock
*
pBlock
=
&
pQueryHandle
->
pBlock
[
cur
->
slot
];
cur
->
pos
=
(
step
==
QUERY_ASC_FORWARD_STEP
)
?
0
:
pBlock
->
numOfPoints
-
1
;
// return loadQaulifiedData(pQueryHandle);
}
else
{
// data in cache
return
hasMoreDataInCacheForSingleModel
(
pQueryHandle
);
}
...
...
@@ -523,12 +577,12 @@ int vnodeBinarySearchKey(char *pValue, int num, TSKEY key, int order) {
return
midPos
;
}
static
void
filterDataInDataBlock
(
STsdbQueryHandle
*
pQueryHandle
,
SArray
*
sa
)
{
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
// be included in the query time window will be discarded
static
void
filterDataInDataBlock
(
STsdbQueryHandle
*
pQueryHandle
,
SDataCols
*
pCols
,
SArray
*
sa
)
{
SQueryFilePos
*
cur
=
&
pQueryHandle
->
cur
;
STableCheckInfo
*
pCheckInfo
=
taosArrayGet
(
pQueryHandle
->
pTableCheckInfo
,
pQueryHandle
->
activeIndex
);
SDataBlockInfo
blockInfo
=
getTrueBlockInfo
(
pQueryHandle
,
pCheckInfo
);
SDataBlockInfo
blockInfo
=
getTrue
Data
BlockInfo
(
pQueryHandle
,
pCheckInfo
);
int32_t
endPos
=
cur
->
pos
;
if
(
QUERY_IS_ASC_QUERY
(
pQueryHandle
->
order
)
&&
pQueryHandle
->
window
.
ekey
>
blockInfo
.
window
.
ekey
)
{
...
...
@@ -538,7 +592,7 @@ static void filterDataInDataBlock(STsdbQueryHandle *pQueryHandle, SArray *sa) {
endPos
=
0
;
pQueryHandle
->
realNumOfRows
=
cur
->
pos
+
1
;
}
else
{
// endPos = vnodeBinarySearchKey(pQueryHandle->tsBuf->data, blockInfo.size
, pQueryHandle->window.ekey, pQueryHandle->order);
endPos
=
vnodeBinarySearchKey
(
pCols
->
cols
[
0
].
pData
,
pCols
->
numOfPoints
,
pQueryHandle
->
window
.
ekey
,
pQueryHandle
->
order
);
if
(
QUERY_IS_ASC_QUERY
(
pQueryHandle
->
order
))
{
if
(
endPos
<
cur
->
pos
)
{
...
...
@@ -560,19 +614,18 @@ static void filterDataInDataBlock(STsdbQueryHandle *pQueryHandle, SArray *sa) {
int32_t
start
=
MIN
(
cur
->
pos
,
endPos
);
// move the data block in the front to data block if needed
if
(
start
!=
0
)
{
int32_t
numOfCols
=
QH_GET_NUM_OF_COLS
(
pQueryHandle
);
int32_t
numOfCols
=
QH_GET_NUM_OF_COLS
(
pQueryHandle
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
sa
);
++
i
)
{
int16_t
colId
=
*
(
int16_t
*
)
taosArrayGet
(
sa
,
i
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
sa
);
++
i
)
{
int16_t
colId
=
*
(
int16_t
*
)
taosArrayGet
(
sa
,
i
);
for
(
int32_t
j
=
0
;
j
<
numOfCols
;
++
j
)
{
SColumnInfoEx
*
pCol
=
taosArrayGet
(
pQueryHandle
->
pColumns
,
j
);
for
(
int32_t
j
=
0
;
j
<
numOfCols
;
++
j
)
{
SColumnInfoEx
*
pCol
=
taosArrayGet
(
pQueryHandle
->
pColumns
,
j
);
if
(
pCol
->
info
.
colId
==
colId
)
{
memmove
(
pCol
->
pData
,
((
char
*
)
pCol
->
pData
)
+
pCol
->
info
.
bytes
*
start
,
pQueryHandle
->
realNumOfRows
*
pCol
->
info
.
bytes
);
break
;
}
if
(
pCol
->
info
.
colId
==
colId
)
{
SDataCol
*
pDataCol
=
&
pCols
->
cols
[
i
];
memmove
(
pCol
->
pData
,
pDataCol
->
pData
+
pCol
->
info
.
bytes
*
start
,
pQueryHandle
->
realNumOfRows
*
pCol
->
info
.
bytes
);
break
;
}
}
}
...
...
@@ -583,37 +636,123 @@ static void filterDataInDataBlock(STsdbQueryHandle *pQueryHandle, SArray *sa) {
cur
->
pos
=
endPos
;
}
static
SArray
*
getColumnIdList
(
STsdbQueryHandle
*
pQueryHandle
)
{
int32_t
numOfCols
=
QH_GET_NUM_OF_COLS
(
pQueryHandle
);
SArray
*
pIdList
=
taosArrayInit
(
numOfCols
,
sizeof
(
int16_t
));
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoEx
*
pCol
=
taosArrayGet
(
pQueryHandle
->
pColumns
,
i
);
taosArrayPush
(
pIdList
,
&
pCol
->
info
.
colId
);
}
return
pIdList
;
}
SArray
*
getDefaultLoadColumns
(
STsdbQueryHandle
*
pQueryHandle
,
bool
loadTS
)
{
SArray
*
pLocalIdList
=
getColumnIdList
(
pQueryHandle
);
// check if the primary time stamp column needs to load
int16_t
colId
=
*
(
int16_t
*
)
taosArrayGet
(
pLocalIdList
,
0
);
// the primary timestamp column does not be included in the the specified load column list, add it
if
(
loadTS
&&
colId
!=
0
)
{
int16_t
columnId
=
0
;
taosArrayInsert
(
pLocalIdList
,
0
,
&
columnId
);
}
return
pLocalIdList
;
}
static
int32_t
binarySearchForKey
(
char
*
pValue
,
int
num
,
TSKEY
key
,
int
order
)
{
int
firstPos
,
lastPos
,
midPos
=
-
1
;
int
numOfPoints
;
TSKEY
*
keyList
;
if
(
num
<=
0
)
return
-
1
;
keyList
=
(
TSKEY
*
)
pValue
;
firstPos
=
0
;
lastPos
=
num
-
1
;
if
(
order
==
0
)
{
// find the first position which is smaller than the key
while
(
1
)
{
if
(
key
>=
keyList
[
lastPos
])
return
lastPos
;
if
(
key
==
keyList
[
firstPos
])
return
firstPos
;
if
(
key
<
keyList
[
firstPos
])
return
firstPos
-
1
;
numOfPoints
=
lastPos
-
firstPos
+
1
;
midPos
=
(
numOfPoints
>>
1
)
+
firstPos
;
if
(
key
<
keyList
[
midPos
])
{
lastPos
=
midPos
-
1
;
}
else
if
(
key
>
keyList
[
midPos
])
{
firstPos
=
midPos
+
1
;
}
else
{
break
;
}
}
}
else
{
// find the first position which is bigger than the key
while
(
1
)
{
if
(
key
<=
keyList
[
firstPos
])
return
firstPos
;
if
(
key
==
keyList
[
lastPos
])
return
lastPos
;
if
(
key
>
keyList
[
lastPos
])
{
lastPos
=
lastPos
+
1
;
if
(
lastPos
>=
num
)
return
-
1
;
else
return
lastPos
;
}
numOfPoints
=
lastPos
-
firstPos
+
1
;
midPos
=
(
numOfPoints
>>
1
)
+
firstPos
;
if
(
key
<
keyList
[
midPos
])
{
lastPos
=
midPos
-
1
;
}
else
if
(
key
>
keyList
[
midPos
])
{
firstPos
=
midPos
+
1
;
}
else
{
break
;
}
}
}
return
midPos
;
}
static
bool
getQualifiedDataBlock
(
STsdbQueryHandle
*
pQueryHandle
,
STableCheckInfo
*
pCheckInfo
,
int32_t
type
)
{
STsdbFileH
*
pFileHandle
=
tsdbGetFile
(
pQueryHandle
->
pTsdb
);
int32_t
fid
=
getFileIdFromKey
(
pCheckInfo
->
lastKey
);
SFileGroup
*
fileGroup
=
tsdbSearchFGroup
(
pFileHandle
,
fid
);
if
(
fileGroup
==
NULL
)
{
return
false
;
}
tsdbInitFileGroupIter
(
pFileHandle
,
&
pCheckInfo
->
fileIter
,
TSDB_FGROUP_ITER_FORWARD
);
tsdbSeekFileGroupIter
(
&
pCheckInfo
->
fileIter
,
fid
);
pCheckInfo
->
pFileGroup
=
tsdbGetFileGroupNext
(
&
pCheckInfo
->
fileIter
);
SQueryFilePos
*
cur
=
&
pQueryHandle
->
cur
;
TSKEY
key
=
pCheckInfo
->
lastKey
;
int32_t
index
=
-
1
;
// todo add iterator for filegroup
int32_t
tid
=
pCheckInfo
->
tableId
.
tid
;
SFile
*
pFile
=
&
pCheckInfo
->
pFileGroup
->
files
[
TSDB_FILE_TYPE_DATA
];
while
(
1
)
{
if
((
fid
=
getFileCompInfo
(
pCheckInfo
,
f
ileGroup
))
<
0
)
{
if
((
fid
=
getFileCompInfo
(
pCheckInfo
,
pCheckInfo
->
pF
ileGroup
))
<
0
)
{
break
;
}
int32_t
tid
=
pCheckInfo
->
tableId
.
tid
;
index
=
binarySearchForBlockImpl
(
pCheckInfo
->
pBlock
,
pCheckInfo
->
compIndex
[
tid
].
numOfSuperBlocks
,
pQueryHandle
->
order
,
key
);
index
=
binarySearchForBlockImpl
(
pCheckInfo
->
pCompInfo
->
blocks
,
pCheckInfo
->
compIndex
[
tid
].
numOfSuperBlocks
,
pQueryHandle
->
order
,
key
);
if
(
type
==
QUERY_RANGE_GREATER_EQUAL
)
{
if
(
key
<=
pCheckInfo
->
p
Block
[
index
].
keyLast
)
{
if
(
key
<=
pCheckInfo
->
p
CompInfo
->
blocks
[
index
].
keyLast
)
{
break
;
}
else
{
index
=
-
1
;
}
}
else
{
if
(
key
>=
pCheckInfo
->
p
Block
[
index
].
keyFirst
)
{
if
(
key
>=
pCheckInfo
->
p
CompInfo
->
blocks
[
index
].
keyFirst
)
{
break
;
}
else
{
index
=
-
1
;
...
...
@@ -626,7 +765,7 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
return
false
;
}
assert
(
index
>=
0
&&
index
<
p
QueryHandle
->
numOf
Blocks
);
assert
(
index
>=
0
&&
index
<
p
CheckInfo
->
compIndex
[
tid
].
numOfSuper
Blocks
);
// load first data block into memory failed, caused by disk block error
bool
blockLoaded
=
false
;
...
...
@@ -635,10 +774,24 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
// todo no need to loaded at all
cur
->
slot
=
index
;
// sa = getDefaultLoadColumns(pQueryHandle, true);
if
(
tsdbLoadDataBlock
(
&
fileGroup
->
files
[
2
],
&
pCheckInfo
->
pBlock
[
cur
->
slot
],
1
,
fid
,
sa
)
==
0
)
{
blockLoaded
=
true
;
}
sa
=
getDefaultLoadColumns
(
pQueryHandle
,
true
);
SCompBlock
*
pBlock
=
&
pCheckInfo
->
pCompInfo
->
blocks
[
cur
->
slot
];
SCompData
*
data
=
calloc
(
1
,
sizeof
(
SCompData
)
+
sizeof
(
SCompCol
)
*
pBlock
->
numOfCols
);
data
->
numOfCols
=
pBlock
->
numOfCols
;
data
->
uid
=
pCheckInfo
->
pTableObj
->
tableId
.
uid
;
pCheckInfo
->
pDataCols
=
tdNewDataCols
(
1000
,
2
,
4096
);
tdInitDataCols
(
pCheckInfo
->
pDataCols
,
pCheckInfo
->
pTableObj
->
schema
);
if
(
pFile
->
fd
==
FD_INITIALIZER
)
{
pFile
->
fd
=
open
(
pFile
->
fname
,
O_RDONLY
);
}
if
(
tsdbLoadDataBlock
(
pFile
,
&
pCheckInfo
->
pCompInfo
->
blocks
[
cur
->
slot
],
1
,
pCheckInfo
->
pDataCols
,
data
)
==
0
)
{
blockLoaded
=
true
;
}
// dError("QInfo:%p fileId:%d total numOfBlks:%d blockId:%d load into memory failed due to error in disk files",
// GET_QINFO_ADDR(pQuery), pQuery->fileId, pQuery->numOfBlocks, blkIdx);
...
...
@@ -649,10 +802,13 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
}
// todo search qualified points in blk, according to primary key (timestamp) column
// cur->pos = binarySearchForBlockImpl(ptsBuf->data, pBlocks->numOfPoints, key, pQueryHandle->order);
SDataCols
*
pDataCols
=
pCheckInfo
->
pDataCols
;
cur
->
pos
=
binarySearchForKey
(
pDataCols
->
cols
[
0
].
pData
,
pBlock
->
numOfPoints
,
key
,
pQueryHandle
->
order
);
cur
->
fid
=
pCheckInfo
->
pFileGroup
->
fileId
;
assert
(
cur
->
pos
>=
0
&&
cur
->
fid
>=
0
&&
cur
->
slot
>=
0
);
filterDataInDataBlock
(
pQueryHandle
,
sa
);
filterDataInDataBlock
(
pQueryHandle
,
pCheckInfo
->
pDataCols
,
sa
);
return
pQueryHandle
->
realNumOfRows
>
0
;
}
...
...
@@ -755,32 +911,46 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max
SDataBlockInfo
tsdbRetrieveDataBlockInfo
(
tsdb_query_handle_t
*
pQueryHandle
)
{
STsdbQueryHandle
*
pHandle
=
(
STsdbQueryHandle
*
)
pQueryHandle
;
STableCheckInfo
*
p
TableQ
Info
=
taosArrayGet
(
pHandle
->
pTableCheckInfo
,
pHandle
->
activeIndex
);
STable
*
pTable
=
p
TableQ
Info
->
pTableObj
;
STableCheckInfo
*
p
Check
Info
=
taosArrayGet
(
pHandle
->
pTableCheckInfo
,
pHandle
->
activeIndex
);
STable
*
pTable
=
p
Check
Info
->
pTableObj
;
TSKEY
skey
=
0
,
ekey
=
0
;
int32_t
rows
=
0
;
if
(
pTable
->
mem
!=
NULL
)
{
// create mem table iterator if it is not created yet
if
(
pTableQInfo
->
iter
==
NULL
)
{
pTableQInfo
->
iter
=
tSkipListCreateIter
(
pTable
->
mem
->
pData
);
// data in file
if
(
pHandle
->
cur
.
fid
>
0
)
{
SDataBlockInfo
binfo
=
getTrueDataBlockInfo
(
pHandle
,
pCheckInfo
);
if
(
binfo
.
size
==
pHandle
->
realNumOfRows
)
{
return
binfo
;
}
else
{
/* not a whole disk block, only the qualified rows, so this block is loaded in to buffer during the
* block next function
*/
SColumnInfoEx
*
pColInfoEx
=
taosArrayGet
(
pHandle
->
pColumns
,
0
);
rows
=
pHandle
->
realNumOfRows
;
skey
=
*
(
TSKEY
*
)
pColInfoEx
->
pData
;
ekey
=
*
(
TSKEY
*
)
pColInfoEx
->
pData
+
TSDB_KEYSIZE
*
(
rows
-
1
);
}
}
else
{
if
(
pTable
->
mem
!=
NULL
)
{
// create mem table iterator if it is not created yet
if
(
pCheckInfo
->
iter
==
NULL
)
{
pCheckInfo
->
iter
=
tSkipListCreateIter
(
pTable
->
mem
->
pData
);
}
rows
=
tsdbReadRowsFromCache
(
pCheckInfo
->
iter
,
INT64_MAX
,
2
,
&
skey
,
&
ekey
,
pHandle
);
}
rows
=
tsdbReadRowsFromCache
(
pTableQInfo
->
iter
,
INT64_MAX
,
2
,
&
skey
,
&
ekey
,
pHandle
);
}
SDataBlockInfo
blockInfo
=
{
.
uid
=
pTable
->
tableId
.
uid
,
.
sid
=
pTable
->
tableId
.
tid
,
.
size
=
rows
,
.
window
=
{.
skey
=
skey
,
.
ekey
=
ekey
}
.
uid
=
pTable
->
tableId
.
uid
,
.
sid
=
pTable
->
tableId
.
tid
,
.
size
=
rows
,
.
window
=
{.
skey
=
skey
,
.
ekey
=
ekey
}
};
// update the last key value
pTableQInfo
->
lastKey
=
ekey
+
1
;
pCheckInfo
->
lastKey
=
ekey
+
1
;
return
blockInfo
;
}
...
...
@@ -791,9 +961,25 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t *pQueryHandle, SData
}
SArray
*
tsdbRetrieveDataBlock
(
tsdb_query_handle_t
*
pQueryHandle
,
SArray
*
pIdList
)
{
// in case of data in cache, all data has been kept in column info object.
/**
* In the following two cases, the data has been loaded to SColumnInfoEx.
* 1. data is from cache, 2. data block is not completed qualified to query time range
*/
STsdbQueryHandle
*
pHandle
=
(
STsdbQueryHandle
*
)
pQueryHandle
;
return
pHandle
->
pColumns
;
if
(
pHandle
->
cur
.
fid
<
0
)
{
return
pHandle
->
pColumns
;
}
else
{
STableCheckInfo
*
pCheckInfo
=
taosArrayGet
(
pHandle
->
pTableCheckInfo
,
pHandle
->
activeIndex
);
SDataBlockInfo
binfo
=
getTrueDataBlockInfo
(
pHandle
,
pCheckInfo
);
if
(
pHandle
->
realNumOfRows
<=
binfo
.
size
)
{
return
pHandle
->
pColumns
;
}
else
{
// todo do load data block
assert
(
0
);
}
}
}
int32_t
tsdbResetQuery
(
tsdb_query_handle_t
*
pQueryHandle
,
STimeWindow
*
window
,
tsdbpos_t
position
,
int16_t
order
)
{}
...
...
@@ -1011,19 +1197,20 @@ static void getTagColumnInfo(SSyntaxTreeFilterSupporter* pSupporter, SSchema* pS
}
void
filterPrepare
(
void
*
expr
,
void
*
param
)
{
tSQL
BinaryExpr
*
pExpr
=
(
tSQLBinaryExpr
*
)
expr
;
if
(
pExpr
->
info
!=
NULL
)
{
tSQL
SyntaxNode
*
pExpr
=
(
tSQLSyntaxNode
*
)
expr
;
if
(
pExpr
->
_node
.
info
!=
NULL
)
{
return
;
}
int32_t
i
=
0
,
offset
=
0
;
pExpr
->
info
=
calloc
(
1
,
sizeof
(
tQueryInfo
));
pExpr
->
_node
.
info
=
calloc
(
1
,
sizeof
(
tQueryInfo
));
tQueryInfo
*
pInfo
=
pExpr
->
_node
.
info
;
tQueryInfo
*
pInfo
=
pExpr
->
info
;
SSyntaxTreeFilterSupporter
*
pSupporter
=
(
SSyntaxTreeFilterSupporter
*
)
param
;
tVariant
*
pCond
=
pExpr
->
pRight
->
pVal
;
SSchema
*
pSchema
=
pExpr
->
pLeft
->
pSchema
;
tVariant
*
pCond
=
pExpr
->
_node
.
pRight
->
pVal
;
SSchema
*
pSchema
=
pExpr
->
_node
.
pLeft
->
pSchema
;
getTagColumnInfo
(
pSupporter
,
pSchema
,
&
i
,
&
offset
);
assert
((
i
>=
0
&&
i
<
TSDB_MAX_TAGS
)
||
(
i
==
TSDB_TBNAME_COLUMN_INDEX
));
...
...
@@ -1031,7 +1218,7 @@ void filterPrepare(void* expr, void* param) {
pInfo
->
sch
=
*
pSchema
;
pInfo
->
colIdx
=
i
;
pInfo
->
optr
=
pExpr
->
nSQLBinaryO
ptr
;
pInfo
->
optr
=
pExpr
->
_node
.
o
ptr
;
pInfo
->
offset
=
offset
;
pInfo
->
compare
=
getFilterComparator
(
pSchema
->
type
,
pCond
->
nType
,
pInfo
->
optr
);
...
...
@@ -1089,7 +1276,7 @@ bool tSkipListNodeFilterCallback(const void* pNode, void* param) {
static
int32_t
doQueryTableList
(
STable
*
pSTable
,
SArray
*
pRes
,
const
char
*
pCond
)
{
STColumn
*
stcol
=
schemaColAt
(
pSTable
->
tagSchema
,
0
);
tSQL
BinaryExpr
*
pExpr
=
NULL
;
tSQL
SyntaxNode
*
pExpr
=
NULL
;
tSQLBinaryExprFromString
(
&
pExpr
,
stcol
,
schemaNCols
(
pSTable
->
tagSchema
),
pCond
,
strlen
(
pCond
));
// failed to build expression, no result, return immediately
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录