Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4b1d93ac
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4b1d93ac
编写于
4月 26, 2020
作者:
S
slguan
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' into feature/alter
上级
9a9468d1
a7a7ef79
变更
24
显示空白变更内容
内联
并排
Showing
24 changed file
with
322 addition
and
367 deletion
+322
-367
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+1
-1
src/client/src/tscFunctionImpl.c
src/client/src/tscFunctionImpl.c
+9
-13
src/client/src/tscProfile.c
src/client/src/tscProfile.c
+6
-5
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+71
-61
src/client/src/tscServer.c
src/client/src/tscServer.c
+1
-26
src/client/src/tscSql.c
src/client/src/tscSql.c
+22
-23
src/client/src/tscSubquery.c
src/client/src/tscSubquery.c
+2
-2
src/client/src/tscSystem.c
src/client/src/tscSystem.c
+1
-6
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+2
-3
src/inc/taosmsg.h
src/inc/taosmsg.h
+9
-25
src/query/inc/qast.h
src/query/inc/qast.h
+4
-3
src/query/inc/tsqlfunction.h
src/query/inc/tsqlfunction.h
+2
-2
src/query/inc/tvariant.h
src/query/inc/tvariant.h
+1
-1
src/query/src/qast.c
src/query/src/qast.c
+9
-24
src/query/src/queryExecutor.c
src/query/src/queryExecutor.c
+131
-157
src/query/src/tvariant.c
src/query/src/tvariant.c
+5
-5
src/query/tests/astTest.cpp
src/query/tests/astTest.cpp
+2
-4
src/tsdb/inc/tsdbMain.h
src/tsdb/inc/tsdbMain.h
+3
-0
src/tsdb/src/tsdbMain.c
src/tsdb/src/tsdbMain.c
+39
-0
src/tsdb/src/tsdbMeta.c
src/tsdb/src/tsdbMeta.c
+1
-0
tests/pytest/insert/basic.py
tests/pytest/insert/basic.py
+0
-1
tests/pytest/insert/float.py
tests/pytest/insert/float.py
+0
-3
tests/pytest/insert/int.py
tests/pytest/insert/int.py
+0
-1
tests/pytest/util/cases.py
tests/pytest/util/cases.py
+1
-1
未找到文件。
src/client/inc/tsclient.h
浏览文件 @
4b1d93ac
...
...
@@ -78,7 +78,7 @@ typedef struct STableMetaInfo {
*/
int32_t
vgroupIndex
;
char
name
[
TSDB_TABLE_ID_LEN
];
// (super) table name
SArray
*
tagColList
;
// involved tag columns
SArray
*
tagColList
;
//
SArray<SColumn*>,
involved tag columns
}
STableMetaInfo
;
/* the structure for sql function in select clause */
...
...
src/client/src/tscFunctionImpl.c
浏览文件 @
4b1d93ac
...
...
@@ -3294,29 +3294,26 @@ static void diff_function_f(SQLFunctionCtx *pCtx, int32_t index) {
}
}
char
*
arithmetic_callback_function
(
void
*
param
,
char
*
name
,
int32_t
colId
)
{
char
*
getArithColumnData
(
void
*
param
,
const
char
*
name
,
int32_t
colId
)
{
SArithmeticSupport
*
pSupport
=
(
SArithmeticSupport
*
)
param
;
SArithExprInfo
*
pExpr
=
pSupport
->
pArithExpr
;
int32_t
colIndex
=
-
1
;
for
(
int32_t
i
=
0
;
i
<
pExpr
->
binExprInfo
.
numOfCols
;
++
i
)
{
if
(
colId
==
pExpr
->
binExprInfo
.
pReqColumns
[
i
].
colId
)
{
colIndex
=
pExpr
->
binExprInfo
.
pReqColumns
[
i
].
colIndex
;
int32_t
index
=
-
1
;
for
(
int32_t
i
=
0
;
i
<
pSupport
->
numOfCols
;
++
i
)
{
if
(
colId
==
pSupport
->
colList
[
i
].
colId
)
{
index
=
i
;
break
;
}
}
assert
(
colI
ndex
>=
0
&&
colId
>=
0
);
return
pSupport
->
data
[
colIndex
]
+
pSupport
->
offset
*
pSupport
->
elemSize
[
colIndex
]
;
assert
(
i
ndex
>=
0
&&
colId
>=
0
);
return
pSupport
->
data
[
index
]
+
pSupport
->
offset
*
pSupport
->
colList
[
index
].
bytes
;
}
static
void
arithmetic_function
(
SQLFunctionCtx
*
pCtx
)
{
GET_RES_INFO
(
pCtx
)
->
numOfRes
+=
pCtx
->
size
;
SArithmeticSupport
*
sas
=
(
SArithmeticSupport
*
)
pCtx
->
param
[
1
].
pz
;
tSQLBinaryExprCalcTraverse
(
sas
->
pArithExpr
->
binExprInfo
.
pBinExpr
,
pCtx
->
size
,
pCtx
->
aOutputBuf
,
sas
,
pCtx
->
order
,
arithmetic_callback_function
);
tExprTreeCalcTraverse
(
sas
->
pArithExpr
->
pExpr
,
pCtx
->
size
,
pCtx
->
aOutputBuf
,
sas
,
pCtx
->
order
,
getArithColumnData
);
pCtx
->
aOutputBuf
+=
pCtx
->
outputBytes
*
pCtx
->
size
;
pCtx
->
param
[
1
].
pz
=
NULL
;
...
...
@@ -3327,8 +3324,7 @@ static void arithmetic_function_f(SQLFunctionCtx *pCtx, int32_t index) {
SArithmeticSupport
*
sas
=
(
SArithmeticSupport
*
)
pCtx
->
param
[
1
].
pz
;
sas
->
offset
=
index
;
tSQLBinaryExprCalcTraverse
(
sas
->
pArithExpr
->
binExprInfo
.
pBinExpr
,
1
,
pCtx
->
aOutputBuf
,
sas
,
pCtx
->
order
,
arithmetic_callback_function
);
tExprTreeCalcTraverse
(
sas
->
pArithExpr
->
pExpr
,
1
,
pCtx
->
aOutputBuf
,
sas
,
pCtx
->
order
,
getArithColumnData
);
pCtx
->
aOutputBuf
+=
pCtx
->
outputBytes
;
}
...
...
src/client/src/tscProfile.c
浏览文件 @
4b1d93ac
...
...
@@ -209,15 +209,15 @@ void tscKillStream(STscObj *pObj, uint32_t killId) {
}
char
*
tscBuildQueryStreamDesc
(
char
*
pMsg
,
STscObj
*
pObj
)
{
SQqueryList
*
pQList
=
(
SQqueryList
*
)
pMsg
;
char
*
pMax
=
pMsg
+
TSDB_PAYLOAD_SIZE
-
256
;
SQ
ueryDesc
*
pQdesc
=
pQList
->
qdesc
;
SQ
queryList
*
pQList
=
(
SQqueryList
*
)
pMsg
;
pQList
->
numOfQueries
=
0
;
SQueryDesc
*
pQdesc
=
(
SQueryDesc
*
)(
pMsg
+
sizeof
(
SQqueryList
));
// We extract the lock to tscBuildHeartBeatMsg function.
/* pthread_mutex_lock (&pObj->mutex); */
pMsg
+=
sizeof
(
SQqueryList
);
SSqlObj
*
pSql
=
pObj
->
sqlList
;
while
(
pSql
)
{
...
...
@@ -244,9 +244,10 @@ char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) {
}
SStreamList
*
pSList
=
(
SStreamList
*
)
pMsg
;
SStreamDesc
*
pSdesc
=
pSList
->
sdesc
;
pSList
->
numOfStreams
=
0
;
SStreamDesc
*
pSdesc
=
(
SStreamDesc
*
)
(
pMsg
+
sizeof
(
SStreamList
));
pMsg
+=
sizeof
(
SStreamList
);
SSqlStream
*
pStream
=
pObj
->
streamList
;
while
(
pStream
)
{
...
...
src/client/src/tscSQLParser.c
浏览文件 @
4b1d93ac
...
...
@@ -101,7 +101,7 @@ static void updateTagColumnIndex(SQueryInfo* pQueryInfo, int32_t tableIndex);
static
int32_t
parseLimitClause
(
SQueryInfo
*
pQueryInfo
,
int32_t
index
,
SQuerySQL
*
pQuerySql
,
SSqlObj
*
pSql
);
static
int32_t
parseCreateDBOptions
(
SSqlCmd
*
pCmd
,
SCreateDBInfo
*
pCreateDbSql
);
static
int32_t
getColumnIndexByName
(
SSQLToken
*
pToken
,
SQueryInfo
*
pQueryInfo
,
SColumnIndex
*
pIndex
);
static
int32_t
getColumnIndexByName
(
const
SSQLToken
*
pToken
,
SQueryInfo
*
pQueryInfo
,
SColumnIndex
*
pIndex
);
static
int32_t
getTableIndexByName
(
SSQLToken
*
pToken
,
SQueryInfo
*
pQueryInfo
,
SColumnIndex
*
pIndex
);
static
int32_t
optrToString
(
tSQLExpr
*
pExpr
,
char
**
exprString
);
...
...
@@ -116,7 +116,7 @@ static int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSql
static
int32_t
doCheckForCreateFromStable
(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
);
static
int32_t
doCheckForStream
(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
);
static
int32_t
doCheckForQuery
(
SSqlObj
*
pSql
,
SQuerySQL
*
pQuerySql
,
int32_t
index
);
static
int32_t
exprTreeFromSqlExpr
(
tExprNode
**
pExpr
,
tSQLExpr
*
pSqlExpr
,
SArray
*
pExprInfo
,
SQueryInfo
*
pQueryInfo
,
SArray
*
pCols
);
static
int32_t
exprTreeFromSqlExpr
(
tExprNode
**
pExpr
,
const
tSQLExpr
*
pSqlExpr
,
SArray
*
pExprInfo
,
SQueryInfo
*
pQueryInfo
,
SArray
*
pCols
);
/*
* Used during parsing query sql. Since the query sql usually small in length, error position
...
...
@@ -1171,13 +1171,32 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
SColumnIndex
index
=
{.
tableIndex
=
tableIndex
};
SSqlExpr
*
pExpr
=
tscSqlExprAppend
(
pQueryInfo
,
TSDB_FUNC_ARITHM
,
&
index
,
TSDB_DATA_TYPE_DOUBLE
,
sizeof
(
double
),
sizeof
(
double
),
false
);
addExprParams
(
pExpr
,
arithmeticExprStr
,
TSDB_DATA_TYPE_BINARY
,
strlen
(
arithmeticExprStr
),
index
.
tableIndex
);
/* todo alias name should use the original sql string */
char
*
name
=
(
pItem
->
aliasName
!=
NULL
)
?
pItem
->
aliasName
:
arithmeticExprStr
;
strncpy
(
pExpr
->
aliasName
,
name
,
TSDB_COL_NAME_LEN
);
tExprNode
*
pNode
=
NULL
;
SArray
*
colList
=
taosArrayInit
(
10
,
sizeof
(
SColIndex
));
int32_t
ret
=
exprTreeFromSqlExpr
(
&
pNode
,
pItem
->
pNode
,
pQueryInfo
->
exprsInfo
,
pQueryInfo
,
colList
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
tExprTreeDestroy
(
&
pNode
,
NULL
);
return
invalidSqlErrMsg
(
pQueryInfo
->
msg
,
"invalid arithmetic expression in select clause"
);
}
SBuffer
buf
=
exprTreeToBinary
(
pNode
);
size_t
len
=
tbufTell
(
&
buf
);
char
*
c
=
tbufGetData
(
&
buf
,
true
);
// set the serialized binary string as the parameter of arithmetic expression
addExprParams
(
pExpr
,
c
,
TSDB_DATA_TYPE_BINARY
,
len
,
index
.
tableIndex
);
insertResultField
(
pQueryInfo
,
i
,
&
columnList
,
sizeof
(
double
),
TSDB_DATA_TYPE_DOUBLE
,
pExpr
->
aliasName
,
pExpr
);
taosArrayDestroy
(
colList
);
tExprTreeDestroy
(
&
pNode
,
NULL
);
}
else
{
columnList
.
num
=
0
;
columnList
.
ids
[
0
]
=
(
SColumnIndex
)
{
0
,
0
};
...
...
@@ -1196,8 +1215,6 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
pFuncExpr
->
interResBytes
=
sizeof
(
double
);
pFuncExpr
->
type
=
TSDB_DATA_TYPE_DOUBLE
;
SExprInfo
*
pBinExprInfo
=
&
pFuncExpr
->
binExprInfo
;
tExprNode
*
pNode
=
NULL
;
// SArray* colList = taosArrayInit(10, sizeof(SColIndex));
...
...
@@ -1207,25 +1224,25 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
return
invalidSqlErrMsg
(
pQueryInfo
->
msg
,
"invalid expression in select clause"
);
}
p
BinExprInfo
->
pBin
Expr
=
pNode
;
p
FuncExpr
->
p
Expr
=
pNode
;
assert
(
0
);
// pBinExprInfo->pReqColumns = pColIndex;
for
(
int32_t
k
=
0
;
k
<
pBinExprInfo
->
numOfCols
;
++
k
)
{
SColIndex
*
pCol
=
&
pBinExprInfo
->
pReqColumns
[
k
];
size_t
size
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
// pExprInfo->pReqColumns = pColIndex;
for
(
int32_t
f
=
0
;
f
<
size
;
++
f
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
f
);
if
(
strcmp
(
pExpr
->
aliasName
,
pCol
->
name
)
==
0
)
{
pCol
->
colIndex
=
f
;
break
;
}
}
assert
(
pCol
->
colIndex
>=
0
&&
pCol
->
colIndex
<
size
);
tfree
(
pNode
);
}
// for(int32_t k = 0; k < pFuncExpr->numOfCols; ++k) {
// SColIndex* pCol = &pFuncExpr->colList[k];
// size_t size = tscSqlExprNumOfExprs(pQueryInfo);
//
// for(int32_t f = 0; f < size; ++f) {
// SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, f);
// if (strcmp(pExpr->aliasName, pCol->name) == 0) {
// pCol->colIndex = f;
// break;
// }
// }
//
// assert(pCol->colIndex >= 0 && pCol->colIndex < size);
// tfree(pNode);
// }
}
}
}
else
{
...
...
@@ -1317,28 +1334,6 @@ SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t c
void
addRequiredTagColumn
(
STableMetaInfo
*
pTableMetaInfo
,
SColumnIndex
*
index
)
{
tscColumnListInsert
(
pTableMetaInfo
->
tagColList
,
index
);
// if (pTableMetaInfo->numOfTags == 0 || pTableMetaInfo->tagColumnIndex[pTableMetaInfo->numOfTags - 1] < tagColIndex) {
// pTableMetaInfo->tagColumnIndex[pTableMetaInfo->numOfTags++] = tagColIndex;
// } else { // find the appropriate position
// for (int32_t i = 0; i < pTableMetaInfo->numOfTags; ++i) {
// if (tagColIndex > pTableMetaInfo->tagColumnIndex[i]) {
// continue;
// } else if (tagColIndex == pTableMetaInfo->tagColumnIndex[i]) {
// break;
// } else {
// memmove(&pTableMetaInfo->tagColumnIndex[i + 1], &pTableMetaInfo->tagColumnIndex[i],
// sizeof(pTableMetaInfo->tagColumnIndex[0]) * (pTableMetaInfo->numOfTags - i));
//
// pTableMetaInfo->tagColumnIndex[i] = tagColIndex;
//
// pTableMetaInfo->numOfTags++;
// break;
// }
// }
// }
// plus one means tbname
// assert(tagColIndex >= -1 && tagColIndex < TSDB_MAX_TAGS && pTableMetaInfo->numOfTags <= TSDB_MAX_TAGS + 1);
}
static
void
addProjectQueryCol
(
SQueryInfo
*
pQueryInfo
,
int32_t
startPos
,
SColumnIndex
*
pIndex
,
tSQLExprItem
*
pItem
)
{
...
...
@@ -2047,7 +2042,7 @@ int32_t getTableIndexByName(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIn
return
TSDB_CODE_SUCCESS
;
}
int32_t
getColumnIndexByName
(
SSQLToken
*
pToken
,
SQueryInfo
*
pQueryInfo
,
SColumnIndex
*
pIndex
)
{
int32_t
getColumnIndexByName
(
const
SSQLToken
*
pToken
,
SQueryInfo
*
pQueryInfo
,
SColumnIndex
*
pIndex
)
{
if
(
pQueryInfo
->
pTableMetaInfo
==
NULL
||
pQueryInfo
->
numOfTables
==
0
)
{
return
TSDB_CODE_INVALID_SQL
;
}
...
...
@@ -3754,12 +3749,23 @@ static int32_t getTagQueryCondExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr,
tSQLExpr
*
p1
=
extractExprForSTable
(
pExpr
,
pQueryInfo
,
i
);
tExprNode
*
p
=
NULL
;
ret
=
exprTreeFromSqlExpr
(
&
p
,
p1
,
NULL
,
pQueryInfo
,
NULL
);
SArray
*
colList
=
taosArrayInit
(
10
,
sizeof
(
SColIndex
));
ret
=
exprTreeFromSqlExpr
(
&
p
,
p1
,
NULL
,
pQueryInfo
,
colList
);
SBuffer
buf
=
exprTreeToBinary
(
p
);
int64_t
uid
=
tscGetMetaInfo
(
pQueryInfo
,
i
)
->
pTableMeta
->
uid
;
tsSetSTableQueryCond
(
&
pQueryInfo
->
tagCond
,
uid
,
&
buf
);
// add to source column list
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
i
);
int64_t
uid
=
pTableMetaInfo
->
pTableMeta
->
uid
;
int32_t
numOfCols
=
tscGetNumOfColumns
(
pTableMetaInfo
->
pTableMeta
);
size_t
num
=
taosArrayGetSize
(
colList
);
for
(
int32_t
j
=
0
;
j
<
num
;
++
j
)
{
SColIndex
*
pIndex
=
taosArrayGet
(
colList
,
j
);
SColumnIndex
index
=
{.
tableIndex
=
i
,
.
columnIndex
=
pIndex
->
colIndex
-
numOfCols
};
addRequiredTagColumn
(
pTableMetaInfo
,
&
index
);
}
tsSetSTableQueryCond
(
&
pQueryInfo
->
tagCond
,
uid
,
&
buf
);
doCompactQueryExpr
(
pExpr
);
tSQLExprDestroy
(
p1
);
...
...
@@ -5856,7 +5862,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
return
TSDB_CODE_SUCCESS
;
// Does not build query message here
}
int32_t
exprTreeFromSqlExpr
(
tExprNode
**
pExpr
,
tSQLExpr
*
pSqlExpr
,
SArray
*
pExprInfo
,
SQueryInfo
*
pQueryInfo
,
SArray
*
pCols
)
{
int32_t
exprTreeFromSqlExpr
(
tExprNode
**
pExpr
,
const
tSQLExpr
*
pSqlExpr
,
SArray
*
pExprInfo
,
SQueryInfo
*
pQueryInfo
,
SArray
*
pCols
)
{
tExprNode
*
pLeft
=
NULL
;
tExprNode
*
pRight
=
NULL
;
...
...
@@ -5881,8 +5887,9 @@ int32_t exprTreeFromSqlExpr(tExprNode **pExpr, tSQLExpr* pSqlExpr, SArray* pExpr
(
*
pExpr
)
->
pVal
=
calloc
(
1
,
sizeof
(
tVariant
));
tVariantAssign
((
*
pExpr
)
->
pVal
,
&
pSqlExpr
->
val
);
return
TSDB_CODE_SUCCESS
;
}
else
if
(
pSqlExpr
->
nSQLOptr
>=
TK_COUNT
&&
pSqlExpr
->
nSQLOptr
<=
TK_AVG_IRATE
)
{
// arithmetic expression on the results of aggregation functions
*
pExpr
=
calloc
(
1
,
sizeof
(
tExprNode
));
(
*
pExpr
)
->
nodeType
=
TSQL_NODE_COL
;
(
*
pExpr
)
->
pSchema
=
calloc
(
1
,
sizeof
(
SSchema
));
...
...
@@ -5900,7 +5907,7 @@ int32_t exprTreeFromSqlExpr(tExprNode **pExpr, tSQLExpr* pSqlExpr, SArray* pExpr
break
;
}
}
}
else
if
(
pSqlExpr
->
nSQLOptr
==
TK_ID
)
{
// column name
}
else
if
(
pSqlExpr
->
nSQLOptr
==
TK_ID
)
{
// column name
, normal column arithmetic expression
SColumnIndex
index
=
{
0
};
int32_t
ret
=
getColumnIndexByName
(
&
pSqlExpr
->
colInfo
,
pQueryInfo
,
&
index
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -5915,17 +5922,20 @@ int32_t exprTreeFromSqlExpr(tExprNode **pExpr, tSQLExpr* pSqlExpr, SArray* pExpr
SSchema
*
pSchema
=
tscGetTableColumnSchema
(
pTableMeta
,
index
.
columnIndex
);
*
(
*
pExpr
)
->
pSchema
=
*
pSchema
;
return
TSDB_CODE_SUCCESS
;
}
else
{
return
TSDB_CODE_INVALID_SQL
;
}
if
(
pCols
!=
NULL
)
{
// record the involved columns
SColIndex
colIndex
=
{
0
};
strncpy
(
colIndex
.
name
,
pSqlExpr
->
operand
.
z
,
pSqlExpr
->
operand
.
n
);
strncpy
(
colIndex
.
name
,
pSchema
->
name
,
TSDB_COL_NAME_LEN
);
colIndex
.
colId
=
pSchema
->
colId
;
colIndex
.
colIndex
=
index
.
columnIndex
;
taosArrayPush
(
pCols
,
&
colIndex
);
}
return
TSDB_CODE_SUCCESS
;
}
else
{
return
TSDB_CODE_INVALID_SQL
;
}
}
else
{
*
pExpr
=
(
tExprNode
*
)
calloc
(
1
,
sizeof
(
tExprNode
));
(
*
pExpr
)
->
nodeType
=
TSQL_NODE_EXPR
;
...
...
src/client/src/tscServer.c
浏览文件 @
4b1d93ac
...
...
@@ -758,16 +758,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
}
bool
hasArithmeticFunction
=
false
;
SSqlFuncMsg
*
pSqlFuncExpr
=
(
SSqlFuncMsg
*
)
pMsg
;
for
(
int32_t
i
=
0
;
i
<
tscSqlExprNumOfExprs
(
pQueryInfo
);
++
i
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
i
);
if
(
pExpr
->
functionId
==
TSDB_FUNC_ARITHM
)
{
hasArithmeticFunction
=
true
;
}
if
(
!
tscValidateColumnId
(
pTableMetaInfo
,
pExpr
->
colInfo
.
colId
))
{
/* column id is not valid according to the cached table meta, the table meta is expired */
tscError
(
"%p table schema is not matched with parsed sql"
,
pSql
);
...
...
@@ -788,9 +782,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if
(
pExpr
->
param
[
j
].
nType
==
TSDB_DATA_TYPE_BINARY
)
{
memcpy
(
pMsg
,
pExpr
->
param
[
j
].
pz
,
pExpr
->
param
[
j
].
nLen
);
// by plus one char to make the string null-terminated
pMsg
+=
pExpr
->
param
[
j
].
nLen
+
1
;
pMsg
+=
pExpr
->
param
[
j
].
nLen
;
}
else
{
pSqlFuncExpr
->
arg
[
j
].
argValue
.
i64
=
htobe64
(
pExpr
->
param
[
j
].
i64Key
);
}
...
...
@@ -799,23 +791,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSqlFuncExpr
=
(
SSqlFuncMsg
*
)
pMsg
;
}
int32_t
len
=
0
;
if
(
hasArithmeticFunction
)
{
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumn
*
pColBase
=
taosArrayGetP
(
pQueryInfo
->
colList
,
i
);
char
*
name
=
pSchema
[
pColBase
[
i
].
colIndex
.
columnIndex
].
name
;
int32_t
lenx
=
strlen
(
name
);
memcpy
(
pMsg
,
name
,
lenx
);
*
(
pMsg
+
lenx
)
=
','
;
len
+=
(
lenx
+
1
);
// one for comma
pMsg
+=
(
lenx
+
1
);
}
}
pQueryMsg
->
colNameLen
=
htonl
(
len
);
// serialize the table info (sid, uid, tags)
pMsg
=
doSerializeTableInfo
(
pSql
,
htons
(
pQueryMsg
->
head
.
vgId
),
pMsg
);
...
...
src/client/src/tscSql.c
浏览文件 @
4b1d93ac
...
...
@@ -458,20 +458,21 @@ static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pF
}
}
static
char
*
getArithemicInputSrc
(
void
*
param
,
char
*
name
,
int32_t
colId
)
{
SArithmeticSupport
*
pSupport
=
(
SArithmeticSupport
*
)
param
;
SArithExprInfo
*
pExpr
=
pSupport
->
pArithExpr
;
int32_t
index
=
-
1
;
for
(
int32_t
i
=
0
;
i
<
pExpr
->
binExprInfo
.
numOfCols
;
++
i
)
{
if
(
strcmp
(
name
,
pExpr
->
binExprInfo
.
pReqColumns
[
i
].
name
)
==
0
)
{
index
=
i
;
break
;
}
}
assert
(
index
>=
0
&&
index
<
pExpr
->
binExprInfo
.
numOfCols
);
return
pSupport
->
data
[
index
]
+
pSupport
->
offset
*
pSupport
->
elemSize
[
index
];
static
char
*
getArithemicInputSrc
(
void
*
param
,
const
char
*
name
,
int32_t
colId
)
{
// SArithmeticSupport *pSupport = (SArithmeticSupport *)param;
// SArithExprInfo * pExpr = pSupport->pArithExpr;
// int32_t index = -1;
// for (int32_t i = 0; i < pExpr->numOfCols; ++i) {
// if (strcmp(name, pExpr->colList[i].name) == 0) {
// index = i;
// break;
// }
// }
//
// assert(index >= 0 && index < pExpr->numOfCols);
// return pSupport->data[index] + pSupport->offset * pSupport->elemSize[index];
return
0
;
}
static
void
**
doSetResultRowData
(
SSqlObj
*
pSql
)
{
...
...
@@ -521,21 +522,21 @@ static void **doSetResultRowData(SSqlObj *pSql) {
sas
->
offset
=
0
;
sas
->
pArithExpr
=
pInfo
->
pArithExprInfo
;
sas
->
numOfCols
=
sas
->
pArithExpr
->
binExprInfo
.
numOfCols
;
// sas->numOfCols = sas->pArithExpr->
numOfCols;
if
(
pRes
->
buffer
[
i
]
==
NULL
)
{
pRes
->
buffer
[
i
]
=
malloc
(
tscFieldInfoGetField
(
&
pQueryInfo
->
fieldsInfo
,
i
)
->
bytes
);
}
for
(
int32_t
k
=
0
;
k
<
sas
->
numOfCols
;
++
k
)
{
int32_t
columnIndex
=
sas
->
pArithExpr
->
binExprInfo
.
pReqColumns
[
k
].
colIndex
;
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
columnIndex
);
sas
->
elemSize
[
k
]
=
pExpr
->
resBytes
;
sas
->
data
[
k
]
=
(
pRes
->
data
+
pRes
->
numOfRows
*
pExpr
->
offset
)
+
pRes
->
row
*
pExpr
->
resBytes
;
// int32_t columnIndex = sas->pArithExpr->colList
[k].colIndex;
//
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, columnIndex);
//
//
sas->elemSize[k] = pExpr->resBytes;
//
sas->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes;
}
t
SQLBinaryExprCalcTraverse
(
sas
->
pArithExpr
->
binExprInfo
.
pBin
Expr
,
1
,
pRes
->
buffer
[
i
],
sas
,
TSDB_ORDER_ASC
,
getArithemicInputSrc
);
t
ExprTreeCalcTraverse
(
sas
->
pArithExpr
->
p
Expr
,
1
,
pRes
->
buffer
[
i
],
sas
,
TSDB_ORDER_ASC
,
getArithemicInputSrc
);
pRes
->
tsrow
[
i
]
=
pRes
->
buffer
[
i
];
free
(
sas
);
//todo optimization
...
...
@@ -634,8 +635,6 @@ static UNUSED_FUNC void **tscBuildResFromSubqueries(SSqlObj *pSql) {
}
if
(
success
)
{
// current row of final output has been built, return to app
size_t
numOfExprs
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
for
(
int32_t
i
=
0
;
i
<
numOfExprs
;
++
i
)
{
int32_t
tableIndex
=
pRes
->
pColumnIndex
[
i
].
tableIndex
;
int32_t
columnIndex
=
pRes
->
pColumnIndex
[
i
].
columnIndex
;
...
...
src/client/src/tscSubquery.c
浏览文件 @
4b1d93ac
...
...
@@ -214,8 +214,8 @@ bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
size_t
numOfCols
=
taosArrayGetSize
(
pQueryInfo
->
colList
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumn
*
pB
ase
=
taosArrayGet
(
pQueryInfo
->
colList
,
i
);
if
(
pB
ase
->
colIndex
.
columnIndex
!=
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
{
SColumn
*
b
ase
=
taosArrayGet
(
pQueryInfo
->
colList
,
i
);
if
(
b
ase
->
colIndex
.
columnIndex
!=
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
{
return
true
;
}
}
...
...
src/client/src/tscSystem.c
浏览文件 @
4b1d93ac
...
...
@@ -17,8 +17,6 @@
#include "taosmsg.h"
#include "tcache.h"
#include "trpc.h"
#include "taosdef.h"
#include "tsocket.h"
#include "tsystem.h"
#include "ttime.h"
#include "ttimer.h"
...
...
@@ -34,7 +32,6 @@
// global, not configurable
void
*
pVnodeConn
;
void
*
tscCacheHandle
;
int
slaveIndex
;
void
*
tscTmr
;
void
*
tscQhandle
;
void
*
tscCheckDiskUsageTmr
;
...
...
@@ -46,7 +43,7 @@ static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
void
taosInitNote
(
int
numOfNoteLines
,
int
maxNotes
,
char
*
lable
);
void
tscUpdateIpSet
(
void
*
ahandle
,
SRpcIpSet
*
pIpSet
);
void
tscCheckDiskUsage
(
void
*
para
,
void
*
unused
)
{
void
tscCheckDiskUsage
(
void
*
UNUSED_PARAM
(
para
),
void
*
UNUSED_PARAM
(
param
)
)
{
taosGetDisk
();
taosTmrReset
(
tscCheckDiskUsage
,
1000
,
NULL
,
tscTmr
,
&
tscCheckDiskUsageTmr
);
}
...
...
@@ -156,7 +153,6 @@ void taos_init_imp() {
}
tscInitMsgsFp
();
slaveIndex
=
rand
();
int
queueSize
=
tsMaxVnodeConnections
+
tsMaxMeterConnections
+
tsMaxMgmtConnections
+
tsMaxMgmtConnections
;
if
(
tscEmbedded
==
0
)
{
...
...
@@ -379,7 +375,6 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
return
0
;
}
int
taos_options
(
TSDB_OPTION
option
,
const
void
*
arg
,
...)
{
static
int32_t
lock
=
0
;
...
...
src/client/src/tscUtil.c
浏览文件 @
4b1d93ac
...
...
@@ -939,8 +939,7 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
SFieldSupInfo
*
pInfo
=
taosArrayGet
(
pFieldInfo
->
pSupportInfo
,
i
);
if
(
pInfo
->
pArithExprInfo
!=
NULL
)
{
tExprTreeDestroy
(
&
pInfo
->
pArithExprInfo
->
binExprInfo
.
pBinExpr
,
NULL
);
tfree
(
pInfo
->
pArithExprInfo
->
binExprInfo
.
pReqColumns
);
tExprTreeDestroy
(
&
pInfo
->
pArithExprInfo
->
pExpr
,
NULL
);
}
}
...
...
@@ -1678,7 +1677,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
}
if
(
pTagCols
==
NULL
)
{
pTableMetaInfo
->
tagColList
=
taosArrayInit
(
4
,
sizeof
(
SColumnIndex
)
);
pTableMetaInfo
->
tagColList
=
taosArrayInit
(
4
,
POINTER_BYTES
);
}
else
{
pTableMetaInfo
->
tagColList
=
taosArrayClone
(
pTagCols
);
}
...
...
src/inc/taosmsg.h
浏览文件 @
4b1d93ac
...
...
@@ -355,17 +355,9 @@ typedef struct {
}
SMDDropVnodeMsg
;
typedef
struct
SColIndex
{
int16_t
colId
;
/*
* colIdx is the index of column in latest schema of table
* it is available in the client side. Also used to determine
* whether current table schema is up-to-date.
*
* colIdxInBuf is used to denote the index of column in pQuery->colList,
* this value is invalid in client side, as well as in cache block of vnode either.
*/
int16_t
colIndex
;
uint16_t
flag
;
// denote if it is a tag or not
int16_t
colId
;
// column id
int16_t
colIndex
;
// column index in colList if it is a normal column or index in tagColList if a tag
uint16_t
flag
;
// denote if it is a tag or a normal column
char
name
[
TSDB_COL_NAME_LEN
];
}
SColIndex
;
...
...
@@ -387,15 +379,9 @@ typedef struct SSqlFuncMsg {
}
arg
[
3
];
}
SSqlFuncMsg
;
typedef
struct
SExprInfo
{
struct
tExprNode
*
pBinExpr
;
/* for binary expression */
int32_t
numOfCols
;
/* binary expression involves the readed number of columns*/
SColIndex
*
pReqColumns
;
/* source column list */
}
SExprInfo
;
typedef
struct
SArithExprInfo
{
SSqlFuncMsg
pB
ase
;
SExprInfo
binExprInfo
;
SSqlFuncMsg
b
ase
;
struct
tExprNode
*
pExpr
;
int16_t
bytes
;
int16_t
type
;
int16_t
interResBytes
;
...
...
@@ -473,8 +459,8 @@ typedef struct {
int16_t
interpoType
;
// interpolate type
uint64_t
defaultVal
;
// default value array list
int32_t
colNameLen
;
int64_t
colNameList
;
//
int32_t colNameLen;
//
int64_t colNameList;
int32_t
tsOffset
;
// offset value in current msg body, NOTE: ts list is compressed
int32_t
tsLen
;
// total length of ts comp block
int32_t
tsNumOfBlocks
;
// ts comp block numbers
...
...
@@ -772,14 +758,14 @@ typedef struct {
}
SMDCfgDnodeMsg
,
SCMCfgDnodeMsg
;
typedef
struct
{
char
sql
[
TSDB_SHOW_SQL_LEN
+
1
];
char
sql
[
TSDB_SHOW_SQL_LEN
];
uint32_t
queryId
;
int64_t
useconds
;
int64_t
stime
;
}
SQueryDesc
;
typedef
struct
{
char
sql
[
TSDB_SHOW_SQL_LEN
+
1
];
char
sql
[
TSDB_SHOW_SQL_LEN
];
uint32_t
streamId
;
int64_t
num
;
// number of computing/cycles
int64_t
useconds
;
...
...
@@ -791,12 +777,10 @@ typedef struct {
typedef
struct
{
int32_t
numOfQueries
;
SQueryDesc
*
qdesc
;
}
SQqueryList
;
typedef
struct
{
int32_t
numOfStreams
;
SStreamDesc
*
sdesc
;
}
SStreamList
;
typedef
struct
{
...
...
src/query/inc/qast.h
浏览文件 @
4b1d93ac
...
...
@@ -82,10 +82,11 @@ void tExprTreeDestroy(tExprNode **pExprs, void (*fp)(void*));
void
tExprTreeTraverse
(
tExprNode
*
pExpr
,
SSkipList
*
pSkipList
,
SArray
*
result
,
SBinaryFilterSupp
*
param
);
void
t
SQLBinaryExpr
CalcTraverse
(
tExprNode
*
pExprs
,
int32_t
numOfRows
,
char
*
pOutput
,
void
*
param
,
int32_t
order
,
char
*
(
*
cb
)(
void
*
,
c
har
*
,
int32_t
));
void
t
ExprTree
CalcTraverse
(
tExprNode
*
pExprs
,
int32_t
numOfRows
,
char
*
pOutput
,
void
*
param
,
int32_t
order
,
char
*
(
*
cb
)(
void
*
,
c
onst
char
*
,
int32_t
));
void
tSQLBinaryExprTrv
(
tExprNode
*
pExprs
,
int32_t
*
val
,
int16_t
*
ids
);
// todo refactor: remove it
void
tSQLBinaryExprTrv
(
tExprNode
*
pExprs
,
SArray
*
res
);
uint8_t
getBinaryExprOptr
(
SSQLToken
*
pToken
);
...
...
src/query/inc/tsqlfunction.h
浏览文件 @
4b1d93ac
...
...
@@ -115,10 +115,10 @@ enum {
typedef
struct
SArithmeticSupport
{
SArithExprInfo
*
pArithExpr
;
int32_t
elemSize
[
TSDB_MAX_COLUMNS
];
int32_t
numOfCols
;
SColumnInfo
*
colList
;
int32_t
offset
;
char
*
data
[
TSDB_MAX_COLUMNS
]
;
char
**
data
;
}
SArithmeticSupport
;
typedef
struct
SQLPreAggVal
{
...
...
src/query/inc/tvariant.h
浏览文件 @
4b1d93ac
...
...
@@ -40,7 +40,7 @@ void tVariantCreate(tVariant *pVar, SSQLToken *token);
void
tVariantCreateFromString
(
tVariant
*
pVar
,
char
*
pz
,
uint32_t
len
,
uint32_t
type
);
void
tVariantCreateFromBinary
(
tVariant
*
pVar
,
c
har
*
pz
,
uint32
_t
len
,
uint32_t
type
);
void
tVariantCreateFromBinary
(
tVariant
*
pVar
,
c
onst
char
*
pz
,
size
_t
len
,
uint32_t
type
);
void
tVariantDestroy
(
tVariant
*
pV
);
...
...
src/query/src/qast.c
浏览文件 @
4b1d93ac
...
...
@@ -620,19 +620,6 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr
}
}
/*
* qsort comparator
* sort the result to ensure meters with the same gid is grouped together
*/
//static int32_t compareByAddr(const void *pLeft, const void *pRight) {
// int64_t p1 = (int64_t) * ((tSkipListNode **)pLeft);
// int64_t p2 = (int64_t) * ((tSkipListNode **)pRight);
//
// DEFAULT_COMP(p1, p2);
//}
// develop_old mgmtSTableQuery for merge & intersect
int32_t
merge
(
SArray
*
pLeft
,
SArray
*
pRight
,
SArray
*
pFinalRes
)
{
// assert(pFinalRes->pRes == 0);
//
...
...
@@ -934,8 +921,8 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
}
void
t
SQLBinaryExpr
CalcTraverse
(
tExprNode
*
pExprs
,
int32_t
numOfRows
,
char
*
pOutput
,
void
*
param
,
int32_t
order
,
char
*
(
*
getSourceDataBlock
)(
void
*
,
c
har
*
,
int32_t
))
{
void
t
ExprTree
CalcTraverse
(
tExprNode
*
pExprs
,
int32_t
numOfRows
,
char
*
pOutput
,
void
*
param
,
int32_t
order
,
char
*
(
*
getSourceDataBlock
)(
void
*
,
c
onst
char
*
,
int32_t
))
{
if
(
pExprs
==
NULL
)
{
return
;
}
...
...
@@ -946,13 +933,13 @@ void tSQLBinaryExprCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOut
/* the left output has result from the left child syntax tree */
char
*
pLeftOutput
=
(
char
*
)
malloc
(
sizeof
(
int64_t
)
*
numOfRows
);
if
(
pLeft
->
nodeType
==
TSQL_NODE_EXPR
)
{
t
SQLBinaryExpr
CalcTraverse
(
pLeft
,
numOfRows
,
pLeftOutput
,
param
,
order
,
getSourceDataBlock
);
t
ExprTree
CalcTraverse
(
pLeft
,
numOfRows
,
pLeftOutput
,
param
,
order
,
getSourceDataBlock
);
}
/* the right output has result from the right child syntax tree */
char
*
pRightOutput
=
malloc
(
sizeof
(
int64_t
)
*
numOfRows
);
if
(
pRight
->
nodeType
==
TSQL_NODE_EXPR
)
{
t
SQLBinaryExpr
CalcTraverse
(
pRight
,
numOfRows
,
pRightOutput
,
param
,
order
,
getSourceDataBlock
);
t
ExprTree
CalcTraverse
(
pRight
,
numOfRows
,
pRightOutput
,
param
,
order
,
getSourceDataBlock
);
}
if
(
pLeft
->
nodeType
==
TSQL_NODE_EXPR
)
{
...
...
@@ -1014,7 +1001,7 @@ void tSQLBinaryExprCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOut
free
(
pRightOutput
);
}
void
tSQLBinaryExprTrv
(
tExprNode
*
pExprs
,
int32_t
*
val
,
int16_t
*
id
s
)
{
void
tSQLBinaryExprTrv
(
tExprNode
*
pExprs
,
SArray
*
re
s
)
{
if
(
pExprs
==
NULL
)
{
return
;
}
...
...
@@ -1024,17 +1011,15 @@ void tSQLBinaryExprTrv(tExprNode *pExprs, int32_t *val, int16_t *ids) {
// recursive traverse left child branch
if
(
pLeft
->
nodeType
==
TSQL_NODE_EXPR
)
{
tSQLBinaryExprTrv
(
pLeft
,
val
,
id
s
);
tSQLBinaryExprTrv
(
pLeft
,
re
s
);
}
else
if
(
pLeft
->
nodeType
==
TSQL_NODE_COL
)
{
ids
[
*
val
]
=
pLeft
->
pSchema
->
colId
;
(
*
val
)
+=
1
;
taosArrayPush
(
res
,
&
pLeft
->
pSchema
->
colId
);
}
if
(
pRight
->
nodeType
==
TSQL_NODE_EXPR
)
{
tSQLBinaryExprTrv
(
pRight
,
val
,
id
s
);
tSQLBinaryExprTrv
(
pRight
,
re
s
);
}
else
if
(
pRight
->
nodeType
==
TSQL_NODE_COL
)
{
ids
[
*
val
]
=
pRight
->
pSchema
->
colId
;
(
*
val
)
+=
1
;
taosArrayPush
(
res
,
&
pRight
->
pSchema
->
colId
);
}
}
...
...
src/query/src/queryExecutor.c
浏览文件 @
4b1d93ac
...
...
@@ -53,8 +53,8 @@
/* get the qinfo struct address from the query struct address */
#define GET_COLUMN_BYTES(query, colidx) \
((query)->colList[(query)->pSelectExpr[colidx].
pB
ase.colInfo.colIndex].bytes)
#define GET_COLUMN_TYPE(query, colidx) ((query)->colList[(query)->pSelectExpr[colidx].
pB
ase.colInfo.colIndex].type)
((query)->colList[(query)->pSelectExpr[colidx].
b
ase.colInfo.colIndex].bytes)
#define GET_COLUMN_TYPE(query, colidx) ((query)->colList[(query)->pSelectExpr[colidx].
b
ase.colInfo.colIndex].type)
typedef
struct
SPointInterpoSupporter
{
int32_t
numOfCols
;
...
...
@@ -257,7 +257,7 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) {
int64_t
maxOutput
=
0
;
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
j
].
pB
ase
.
functionId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
j
].
b
ase
.
functionId
;
/*
* ts, tag, tagprj function can not decide the output number of current query
...
...
@@ -334,7 +334,7 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) {
int32_t
numOfSelectivity
=
0
;
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
functId
=
pQuery
->
pSelectExpr
[
i
].
pB
ase
.
functionId
;
int32_t
functId
=
pQuery
->
pSelectExpr
[
i
].
b
ase
.
functionId
;
if
(
functId
==
TSDB_FUNC_TAG_DUMMY
||
functId
==
TSDB_FUNC_TS_DUMMY
)
{
hasTags
=
true
;
continue
;
...
...
@@ -352,7 +352,7 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) {
return
false
;
}
bool
isTSCompQuery
(
SQuery
*
pQuery
)
{
return
pQuery
->
pSelectExpr
[
0
].
pB
ase
.
functionId
==
TSDB_FUNC_TS_COMP
;
}
bool
isTSCompQuery
(
SQuery
*
pQuery
)
{
return
pQuery
->
pSelectExpr
[
0
].
b
ase
.
functionId
==
TSDB_FUNC_TS_COMP
;
}
static
bool
limitResults
(
SQInfo
*
pQInfo
)
{
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
...
...
@@ -370,7 +370,7 @@ static bool limitResults(SQInfo *pQInfo) {
static
bool
isTopBottomQuery
(
SQuery
*
pQuery
)
{
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
pB
ase
.
functionId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
b
ase
.
functionId
;
if
(
functionId
==
TSDB_FUNC_TS
)
{
continue
;
}
...
...
@@ -385,7 +385,7 @@ static bool isTopBottomQuery(SQuery *pQuery) {
static
SDataStatis
*
getStatisInfo
(
SQuery
*
pQuery
,
SDataStatis
*
pStatis
,
SDataBlockInfo
*
pDataBlockInfo
,
int32_t
index
)
{
// for a tag column, no corresponding field info
SColIndex
*
pColIndexEx
=
&
pQuery
->
pSelectExpr
[
index
].
pB
ase
.
colInfo
;
SColIndex
*
pColIndexEx
=
&
pQuery
->
pSelectExpr
[
index
].
b
ase
.
colInfo
;
if
(
TSDB_COL_IS_TAG
(
pColIndexEx
->
flag
))
{
return
NULL
;
}
...
...
@@ -413,7 +413,7 @@ static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, SDataBlo
*/
static
bool
hasNullValue
(
SQuery
*
pQuery
,
int32_t
col
,
SDataBlockInfo
*
pDataBlockInfo
,
SDataStatis
*
pStatis
,
SDataStatis
**
pColStatis
)
{
SColIndex
*
pColIndex
=
&
pQuery
->
pSelectExpr
[
col
].
pB
ase
.
colInfo
;
SColIndex
*
pColIndex
=
&
pQuery
->
pSelectExpr
[
col
].
b
ase
.
colInfo
;
if
(
TSDB_COL_IS_TAG
(
pColIndex
->
flag
))
{
return
false
;
}
...
...
@@ -717,7 +717,7 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat
if
(
IS_MASTER_SCAN
(
pRuntimeEnv
)
||
pStatus
->
closed
)
{
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutput
;
++
k
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
k
].
pB
ase
.
functionId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
k
].
b
ase
.
functionId
;
pCtx
[
k
].
nStartQueryTimestamp
=
pWin
->
skey
;
pCtx
[
k
].
size
=
forwardStep
;
...
...
@@ -743,7 +743,7 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutput
;
++
k
)
{
pCtx
[
k
].
nStartQueryTimestamp
=
pWin
->
skey
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
k
].
pB
ase
.
functionId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
k
].
b
ase
.
functionId
;
if
(
functionNeedToExecute
(
pRuntimeEnv
,
&
pCtx
[
k
],
functionId
))
{
aAggs
[
functionId
].
xFunctionF
(
&
pCtx
[
k
],
offset
);
}
...
...
@@ -817,13 +817,12 @@ static TSKEY reviseWindowEkey(SQuery *pQuery, STimeWindow *pWindow) {
char
*
getDataBlocks
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SArithmeticSupport
*
sas
,
int32_t
col
,
int32_t
size
,
SArray
*
pDataBlock
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQLFunctionCtx
*
pCtx
=
pRuntimeEnv
->
pCtx
;
char
*
dataBlock
=
NULL
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
col
].
pBase
.
functionId
;
SQLFunctionCtx
*
pCtx
=
pRuntimeEnv
->
pCtx
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
col
].
base
.
functionId
;
if
(
functionId
==
TSDB_FUNC_ARITHM
)
{
sas
->
pArithExpr
=
&
pQuery
->
pSelectExpr
[
col
];
...
...
@@ -834,18 +833,32 @@ char *getDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int3
pCtx
->
startOffset
=
pQuery
->
pos
-
(
size
-
1
);
}
sas
->
offset
=
0
;
sas
->
colList
=
pQuery
->
colList
;
sas
->
numOfCols
=
pQuery
->
numOfCols
;
sas
->
data
=
calloc
(
pQuery
->
numOfCols
,
POINTER_BYTES
);
// here the pQuery->colList and sas->colList are identical
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfCols
;
++
i
)
{
SColumnInfo
*
pColMsg
=
&
pQuery
->
colList
[
i
];
assert
(
0
);
// char * pData = doGetDataBlocks(pQuery, pRuntimeEnv->colDataBuffer, pQuery->colList[i].colIdxInBuf);
sas
->
elemSize
[
i
]
=
pColMsg
->
bytes
;
// sas->data[i] = pData + pCtx->startOffset * sas->elemSize[i]; // start from the offset
int32_t
numOfCols
=
taosArrayGetSize
(
pDataBlock
);
dataBlock
=
NULL
;
for
(
int32_t
k
=
0
;
k
<
numOfCols
;
++
k
)
{
//todo refactor
SColumnInfoData
*
p
=
taosArrayGet
(
pDataBlock
,
k
);
if
(
pColMsg
->
colId
==
p
->
info
.
colId
)
{
dataBlock
=
p
->
pData
;
break
;
}
}
assert
(
dataBlock
!=
NULL
);
sas
->
data
[
i
]
=
dataBlock
+
pCtx
->
startOffset
*
pQuery
->
colList
[
i
].
bytes
;
// start from the offset
}
sas
->
numOfCols
=
pQuery
->
numOfCols
;
sas
->
offset
=
0
;
}
else
{
// other type of query function
SColIndex
*
pCol
=
&
pQuery
->
pSelectExpr
[
col
].
pB
ase
.
colInfo
;
SColIndex
*
pCol
=
&
pQuery
->
pSelectExpr
[
col
].
b
ase
.
colInfo
;
if
(
TSDB_COL_IS_TAG
(
pCol
->
flag
)
||
pDataBlock
==
NULL
)
{
dataBlock
=
NULL
;
}
else
{
...
...
@@ -896,7 +909,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
SArithmeticSupport
*
sasArray
=
calloc
((
size_t
)
pQuery
->
numOfOutput
,
sizeof
(
SArithmeticSupport
));
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutput
;
++
k
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
k
].
pB
ase
.
functionId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
k
].
b
ase
.
functionId
;
SDataStatis
*
tpField
=
NULL
;
...
...
@@ -953,13 +966,21 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
* tag_prj function are changed to be TSDB_FUNC_TAG_DUMMY
*/
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutput
;
++
k
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
k
].
pB
ase
.
functionId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
k
].
b
ase
.
functionId
;
if
(
functionNeedToExecute
(
pRuntimeEnv
,
&
pCtx
[
k
],
functionId
))
{
aAggs
[
functionId
].
xFunction
(
&
pCtx
[
k
]);
}
}
}
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
if
(
pQuery
->
pSelectExpr
[
i
].
base
.
functionId
!=
TSDB_FUNC_ARITHM
)
{
continue
;
}
tfree
(
sasArray
[
i
].
data
);
}
tfree
(
sasArray
);
}
...
...
@@ -1103,7 +1124,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
}
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutput
;
++
k
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
k
].
pB
ase
.
functionId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
k
].
b
ase
.
functionId
;
SDataStatis
*
pColStatis
=
NULL
;
...
...
@@ -1212,7 +1233,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
offset
-=
pCtx
[
0
].
startOffset
;
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutput
;
++
k
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
k
].
pB
ase
.
functionId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
k
].
b
ase
.
functionId
;
if
(
functionNeedToExecute
(
pRuntimeEnv
,
&
pCtx
[
k
],
functionId
))
{
aAggs
[
functionId
].
xFunctionF
(
&
pCtx
[
k
],
offset
);
}
...
...
@@ -1229,6 +1250,16 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
}
pQuery
->
lastKey
=
lastKey
+
step
;
// todo refactor: extract method
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
if
(
pQuery
->
pSelectExpr
[
i
].
base
.
functionId
!=
TSDB_FUNC_ARITHM
)
{
continue
;
}
tfree
(
sasArray
[
i
].
data
);
}
free
(
sasArray
);
}
...
...
@@ -1343,7 +1374,7 @@ static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) {
SQLFunctionCtx
**
pTagCtx
=
calloc
(
pQuery
->
numOfOutput
,
POINTER_BYTES
);
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
SSqlFuncMsg
*
pSqlFuncMsg
=
&
pQuery
->
pSelectExpr
[
i
].
pB
ase
;
SSqlFuncMsg
*
pSqlFuncMsg
=
&
pQuery
->
pSelectExpr
[
i
].
b
ase
;
if
(
pSqlFuncMsg
->
functionId
==
TSDB_FUNC_TAG_DUMMY
||
pSqlFuncMsg
->
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
tagLen
+=
pCtx
[
i
].
outputBytes
;
pTagCtx
[
num
++
]
=
&
pCtx
[
i
];
...
...
@@ -1383,7 +1414,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
pRuntimeEnv
->
offset
[
0
]
=
0
;
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
SSqlFuncMsg
*
pSqlFuncMsg
=
&
pQuery
->
pSelectExpr
[
i
].
pB
ase
;
SSqlFuncMsg
*
pSqlFuncMsg
=
&
pQuery
->
pSelectExpr
[
i
].
b
ase
;
SQLFunctionCtx
*
pCtx
=
&
pRuntimeEnv
->
pCtx
[
i
];
SColIndex
*
pIndex
=
&
pSqlFuncMsg
->
colInfo
;
...
...
@@ -1425,7 +1456,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
int32_t
functionId
=
pCtx
->
functionId
;
if
(
functionId
==
TSDB_FUNC_TOP
||
functionId
==
TSDB_FUNC_BOTTOM
||
functionId
==
TSDB_FUNC_DIFF
)
{
int32_t
f
=
pQuery
->
pSelectExpr
[
0
].
pB
ase
.
functionId
;
int32_t
f
=
pQuery
->
pSelectExpr
[
0
].
b
ase
.
functionId
;
assert
(
f
==
TSDB_FUNC_TS
||
f
==
TSDB_FUNC_TS_DUMMY
);
pCtx
->
param
[
2
].
i64Key
=
order
;
...
...
@@ -1533,7 +1564,7 @@ bool isFixedOutputQuery(SQuery *pQuery) {
}
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
SSqlFuncMsg
*
pExprMsg
=
&
pQuery
->
pSelectExpr
[
i
].
pB
ase
;
SSqlFuncMsg
*
pExprMsg
=
&
pQuery
->
pSelectExpr
[
i
].
b
ase
;
// ignore the ts_comp function
if
(
i
==
0
&&
pExprMsg
->
functionId
==
TSDB_FUNC_PRJ
&&
pExprMsg
->
numOfParams
==
1
&&
...
...
@@ -1555,7 +1586,7 @@ bool isFixedOutputQuery(SQuery *pQuery) {
bool
isPointInterpoQuery
(
SQuery
*
pQuery
)
{
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
functionID
=
pQuery
->
pSelectExpr
[
i
].
pB
ase
.
functionId
;
int32_t
functionID
=
pQuery
->
pSelectExpr
[
i
].
b
ase
.
functionId
;
if
(
functionID
==
TSDB_FUNC_INTERP
||
functionID
==
TSDB_FUNC_LAST_ROW
)
{
return
true
;
}
...
...
@@ -1567,7 +1598,7 @@ bool isPointInterpoQuery(SQuery *pQuery) {
// TODO REFACTOR:MERGE WITH CLIENT-SIDE FUNCTION
bool
isSumAvgRateQuery
(
SQuery
*
pQuery
)
{
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
pB
ase
.
functionId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
b
ase
.
functionId
;
if
(
functionId
==
TSDB_FUNC_TS
)
{
continue
;
}
...
...
@@ -1583,7 +1614,7 @@ bool isSumAvgRateQuery(SQuery *pQuery) {
bool
isFirstLastRowQuery
(
SQuery
*
pQuery
)
{
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
functionID
=
pQuery
->
pSelectExpr
[
i
].
pB
ase
.
functionId
;
int32_t
functionID
=
pQuery
->
pSelectExpr
[
i
].
b
ase
.
functionId
;
if
(
functionID
==
TSDB_FUNC_LAST_ROW
)
{
return
true
;
}
...
...
@@ -1599,7 +1630,7 @@ bool notHasQueryTimeRange(SQuery *pQuery) {
static
bool
needReverseScan
(
SQuery
*
pQuery
)
{
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
pB
ase
.
functionId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
b
ase
.
functionId
;
if
(
functionId
==
TSDB_FUNC_TS
||
functionId
==
TSDB_FUNC_TS_DUMMY
||
functionId
==
TSDB_FUNC_TAG
)
{
continue
;
}
...
...
@@ -1772,7 +1803,7 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) {
}
else
{
bool
hasMultioutput
=
false
;
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
SSqlFuncMsg
*
pExprMsg
=
&
pQuery
->
pSelectExpr
[
i
].
pB
ase
;
SSqlFuncMsg
*
pExprMsg
=
&
pQuery
->
pSelectExpr
[
i
].
b
ase
;
if
(
pExprMsg
->
functionId
==
TSDB_FUNC_TS
||
pExprMsg
->
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
continue
;
}
...
...
@@ -1805,7 +1836,7 @@ bool vnodeParametersSafetyCheck(SQuery *pQuery) {
// the scan order is not matter
static
bool
onlyOneQueryType
(
SQuery
*
pQuery
,
int32_t
functId
,
int32_t
functIdDst
)
{
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
pB
ase
.
functionId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
b
ase
.
functionId
;
if
(
functionId
==
TSDB_FUNC_TS
||
functionId
==
TSDB_FUNC_TS_DUMMY
||
functionId
==
TSDB_FUNC_TAG
||
functionId
==
TSDB_FUNC_TAG_DUMMY
)
{
...
...
@@ -2008,7 +2039,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI
SInterpInfoDetail
*
pInterpDetail
=
pInterpInfo
->
pInterpDetail
;
// for primary timestamp column, set the flag
if
(
pQuery
->
pSelectExpr
[
i
].
pB
ase
.
colInfo
.
colId
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
{
if
(
pQuery
->
pSelectExpr
[
i
].
b
ase
.
colInfo
.
colId
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
{
pInterpDetail
->
primaryCol
=
1
;
}
...
...
@@ -2031,11 +2062,11 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI
SQLFunctionCtx
*
pCtx
=
&
pRuntimeEnv
->
pCtx
[
i
];
// tag column does not need the interp environment
if
(
pQuery
->
pSelectExpr
[
i
].
pB
ase
.
functionId
==
TSDB_FUNC_TAG
)
{
if
(
pQuery
->
pSelectExpr
[
i
].
b
ase
.
functionId
==
TSDB_FUNC_TAG
)
{
continue
;
}
int32_t
colInBuf
=
0
;
// pQuery->pSelectExpr[i].
pB
ase.colInfo.colIdxInBuf;
int32_t
colInBuf
=
0
;
// pQuery->pSelectExpr[i].
b
ase.colInfo.colIdxInBuf;
SInterpInfo
*
pInterpInfo
=
(
SInterpInfo
*
)
pRuntimeEnv
->
pCtx
[
i
].
aOutputBuf
;
pInterpInfo
->
pInterpDetail
=
calloc
(
1
,
sizeof
(
SInterpInfoDetail
));
...
...
@@ -2046,7 +2077,7 @@ void pointInterpSupporterSetData(SQInfo *pQInfo, SPointInterpoSupporter *pPointI
assert
(
0
);
// for primary timestamp column, set the flag
if
(
pQuery
->
pSelectExpr
[
i
].
pB
ase
.
colInfo
.
colId
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
{
if
(
pQuery
->
pSelectExpr
[
i
].
b
ase
.
colInfo
.
colId
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
{
pInterpDetail
->
primaryCol
=
1
;
}
else
{
doSetInterpVal
(
pCtx
,
prevKey
,
type
,
1
,
pPointInterpSupport
->
pPrevPoint
[
colInBuf
]);
...
...
@@ -2145,7 +2176,7 @@ static int32_t getRowParamForMultiRowsOutput(SQuery *pQuery, bool isSTableQuery)
int32_t
rowparam
=
1
;
if
(
isTopBottomQuery
(
pQuery
)
&&
(
!
isSTableQuery
))
{
rowparam
=
pQuery
->
pSelectExpr
[
1
].
pB
ase
.
arg
->
argValue
.
i64
;
rowparam
=
pQuery
->
pSelectExpr
[
1
].
b
ase
.
arg
->
argValue
.
i64
;
}
return
rowparam
;
...
...
@@ -2211,7 +2242,7 @@ UNUSED_FUNC void setTimestampRange(SQueryRuntimeEnv *pRuntimeEnv, int64_t stime,
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
pB
ase
.
functionId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
b
ase
.
functionId
;
if
(
functionId
==
TSDB_FUNC_SPREAD
)
{
pRuntimeEnv
->
pCtx
[
i
].
param
[
1
].
dKey
=
stime
;
...
...
@@ -2270,7 +2301,7 @@ static bool needToLoadDataBlock(SQuery *pQuery, SDataStatis *pDataStatis, SQLFun
// todo disable this opt code block temporarily
// for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
// int32_t functId = pQuery->pSelectExpr[i].
pB
ase.functionId;
// int32_t functId = pQuery->pSelectExpr[i].
b
ase.functionId;
// if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) {
// return top_bot_datablock_filter(&pCtx[i], functId, (char *)&pField[i].min, (char *)&pField[i].max);
// }
...
...
@@ -2298,8 +2329,8 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBl
r
=
BLK_DATA_ALL_NEEDED
;
}
else
{
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
pB
ase
.
functionId
;
int32_t
colId
=
pQuery
->
pSelectExpr
[
i
].
pB
ase
.
colInfo
.
colId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
b
ase
.
functionId
;
int32_t
colId
=
pQuery
->
pSelectExpr
[
i
].
b
ase
.
colInfo
.
colId
;
r
|=
aAggs
[
functionId
].
dataReqFunc
(
&
pRuntimeEnv
->
pCtx
[
i
],
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
colId
);
}
...
...
@@ -2533,14 +2564,14 @@ static void doSetTagValueInParam(void *tsdb, STableId id, int32_t tagColId, tVar
void
setTagVal
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
STableId
id
,
void
*
tsdb
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SSqlFuncMsg
*
pFuncMsg
=
&
pQuery
->
pSelectExpr
[
0
].
pB
ase
;
SSqlFuncMsg
*
pFuncMsg
=
&
pQuery
->
pSelectExpr
[
0
].
b
ase
;
if
(
pQuery
->
numOfOutput
==
1
&&
pFuncMsg
->
functionId
==
TSDB_FUNC_TS_COMP
)
{
assert
(
pFuncMsg
->
numOfParams
==
1
);
doSetTagValueInParam
(
tsdb
,
id
,
pFuncMsg
->
arg
->
argValue
.
i64
,
&
pRuntimeEnv
->
pCtx
[
0
].
tag
);
}
else
{
// set tag value, by which the results are aggregated.
for
(
int32_t
idx
=
0
;
idx
<
pQuery
->
numOfOutput
;
++
idx
)
{
SColIndex
*
pCol
=
&
pQuery
->
pSelectExpr
[
idx
].
pB
ase
.
colInfo
;
SColIndex
*
pCol
=
&
pQuery
->
pSelectExpr
[
idx
].
b
ase
.
colInfo
;
// ts_comp column required the tag value for join filter
if
(
!
TSDB_COL_IS_TAG
(
pCol
->
flag
))
{
...
...
@@ -2566,7 +2597,7 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes
SQLFunctionCtx
*
pCtx
=
pRuntimeEnv
->
pCtx
;
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
pB
ase
.
functionId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
b
ase
.
functionId
;
if
(
!
mergeFlag
)
{
pCtx
[
i
].
aOutputBuf
=
pCtx
[
i
].
aOutputBuf
+
pCtx
[
i
].
outputBytes
;
pCtx
[
i
].
currentStage
=
FIRST_STAGE_MERGE
;
...
...
@@ -2590,7 +2621,7 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes
}
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
pB
ase
.
functionId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
b
ase
.
functionId
;
if
(
functionId
==
TSDB_FUNC_TAG_DUMMY
)
{
continue
;
}
...
...
@@ -2680,15 +2711,15 @@ void UNUSED_FUNC displayInterResult(SData **pdata, SQuery *pQuery, int32_t numOf
for (int32_t i = 0; i < numOfCols; ++i) {
switch (pQuery->pSelectExpr[i].type) {
case TSDB_DATA_TYPE_BINARY: {
int32_t colIndex = pQuery->pSelectExpr[i].
pB
ase.colInfo.colIndex;
int32_t colIndex = pQuery->pSelectExpr[i].
b
ase.colInfo.colIndex;
int32_t type = 0;
if (TSDB_COL_IS_TAG(pQuery->pSelectExpr[i].
pB
ase.colInfo.flag)) {
if (TSDB_COL_IS_TAG(pQuery->pSelectExpr[i].
b
ase.colInfo.flag)) {
type = pQuery->pSelectExpr[i].type;
} else {
type = pMeterObj->schema[colIndex].type;
}
printBinaryData(pQuery->pSelectExpr[i].
pB
ase.functionId, pdata[i]->data + pQuery->pSelectExpr[i].bytes * j,
printBinaryData(pQuery->pSelectExpr[i].
b
ase.functionId, pdata[i]->data + pQuery->pSelectExpr[i].bytes * j,
type);
break;
}
...
...
@@ -2843,7 +2874,7 @@ int64_t getNumOfResultWindowRes(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pW
int64_t
maxOutput
=
0
;
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
j
].
pB
ase
.
functionId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
j
].
b
ase
.
functionId
;
/*
* ts, tag, tagprj function can not decide the output number of current query
...
...
@@ -3059,7 +3090,7 @@ static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo *
// open/close the specified query for each group result
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
int32_t
functId
=
pQuery
->
pSelectExpr
[
j
].
pB
ase
.
functionId
;
int32_t
functId
=
pQuery
->
pSelectExpr
[
j
].
b
ase
.
functionId
;
if
(((
functId
==
TSDB_FUNC_FIRST
||
functId
==
TSDB_FUNC_FIRST_DST
)
&&
order
==
TSDB_ORDER_ASC
)
||
((
functId
==
TSDB_FUNC_LAST
||
functId
==
TSDB_FUNC_LAST_DST
)
&&
order
==
TSDB_ORDER_DESC
))
{
...
...
@@ -3081,7 +3112,7 @@ void disableFuncInReverseScan(SQueryRuntimeEnv *pRuntimeEnv) {
doDisableFunctsForSupplementaryScan
(
pQuery
,
pWindowResInfo
,
order
);
}
else
{
// for simple result of table query,
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
int32_t
functId
=
pQuery
->
pSelectExpr
[
j
].
pB
ase
.
functionId
;
int32_t
functId
=
pQuery
->
pSelectExpr
[
j
].
b
ase
.
functionId
;
SQLFunctionCtx
*
pCtx
=
&
pRuntimeEnv
->
pCtx
[
j
];
...
...
@@ -3151,7 +3182,7 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
pCtx
->
resultInfo
=
&
pRuntimeEnv
->
resultInfo
[
i
];
// set the timestamp output buffer for top/bottom/diff query
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
pB
ase
.
functionId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
b
ase
.
functionId
;
if
(
functionId
==
TSDB_FUNC_TOP
||
functionId
==
TSDB_FUNC_BOTTOM
||
functionId
==
TSDB_FUNC_DIFF
)
{
pCtx
->
ptsOutputBuf
=
pRuntimeEnv
->
pCtx
[
0
].
aOutputBuf
;
}
...
...
@@ -3167,7 +3198,7 @@ void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) {
// reset the execution contexts
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
j
].
pB
ase
.
functionId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
j
].
b
ase
.
functionId
;
assert
(
functionId
!=
TSDB_FUNC_DIFF
);
// set next output position
...
...
@@ -3194,7 +3225,7 @@ void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
j
].
pB
ase
.
functionId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
j
].
b
ase
.
functionId
;
pRuntimeEnv
->
pCtx
[
j
].
currentStage
=
0
;
aAggs
[
functionId
].
init
(
&
pRuntimeEnv
->
pCtx
[
j
]);
...
...
@@ -3220,7 +3251,7 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) {
pQuery
->
rec
.
rows
-=
numOfSkip
;
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
pB
ase
.
functionId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
b
ase
.
functionId
;
int32_t
bytes
=
pRuntimeEnv
->
pCtx
[
i
].
outputBytes
;
memmove
(
pQuery
->
sdata
[
i
]
->
data
,
pQuery
->
sdata
[
i
]
->
data
+
bytes
*
numOfSkip
,
pQuery
->
rec
.
rows
*
bytes
);
...
...
@@ -3262,7 +3293,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
setWindowResOutputBuf
(
pRuntimeEnv
,
pResult
);
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
int16_t
functId
=
pQuery
->
pSelectExpr
[
j
].
pB
ase
.
functionId
;
int16_t
functId
=
pQuery
->
pSelectExpr
[
j
].
b
ase
.
functionId
;
if
(
functId
==
TSDB_FUNC_TS
)
{
continue
;
}
...
...
@@ -3275,7 +3306,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
}
}
else
{
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
int16_t
functId
=
pQuery
->
pSelectExpr
[
j
].
pB
ase
.
functionId
;
int16_t
functId
=
pQuery
->
pSelectExpr
[
j
].
b
ase
.
functionId
;
if
(
functId
==
TSDB_FUNC_TS
)
{
continue
;
}
...
...
@@ -3443,7 +3474,7 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
setWindowResOutputBuf
(
pRuntimeEnv
,
buf
);
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
aAggs
[
pQuery
->
pSelectExpr
[
j
].
pB
ase
.
functionId
].
xFinalize
(
&
pRuntimeEnv
->
pCtx
[
j
]);
aAggs
[
pQuery
->
pSelectExpr
[
j
].
b
ase
.
functionId
].
xFinalize
(
&
pRuntimeEnv
->
pCtx
[
j
]);
}
/*
...
...
@@ -3455,14 +3486,14 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
}
else
{
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
aAggs
[
pQuery
->
pSelectExpr
[
j
].
pB
ase
.
functionId
].
xFinalize
(
&
pRuntimeEnv
->
pCtx
[
j
]);
aAggs
[
pQuery
->
pSelectExpr
[
j
].
b
ase
.
functionId
].
xFinalize
(
&
pRuntimeEnv
->
pCtx
[
j
]);
}
}
}
static
bool
hasMainOutput
(
SQuery
*
pQuery
)
{
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
pB
ase
.
functionId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
b
ase
.
functionId
;
if
(
functionId
!=
TSDB_FUNC_TS
&&
functionId
!=
TSDB_FUNC_TAG
&&
functionId
!=
TSDB_FUNC_TAGPRJ
)
{
return
true
;
...
...
@@ -3568,7 +3599,7 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
SQLFunctionCtx
*
pCtx
=
&
pRuntimeEnv
->
pCtx
[
i
];
pCtx
->
aOutputBuf
=
getPosInResultPage
(
pRuntimeEnv
,
i
,
pResult
);
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
pB
ase
.
functionId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
b
ase
.
functionId
;
if
(
functionId
==
TSDB_FUNC_TOP
||
functionId
==
TSDB_FUNC_BOTTOM
||
functionId
==
TSDB_FUNC_DIFF
)
{
pCtx
->
ptsOutputBuf
=
pRuntimeEnv
->
pCtx
[
0
].
aOutputBuf
;
}
...
...
@@ -3666,7 +3697,7 @@ void setIntervalQueryRange(STableQueryInfo *pTableQueryInfo, SQInfo *pQInfo, TSK
bool
requireTimestamp
(
SQuery
*
pQuery
)
{
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
i
++
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
pB
ase
.
functionId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
b
ase
.
functionId
;
if
((
aAggs
[
functionId
].
nStatus
&
TSDB_FUNCSTATE_NEED_TS
)
!=
0
)
{
return
true
;
}
...
...
@@ -3883,7 +3914,7 @@ static UNUSED_FUNC int32_t resultInterpolate(SQInfo *pQInfo, tFilePage **data, t
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
srcData[i] = pDataSrc[i]->data;
functions[i] = pQuery->pSelectExpr[i].
pB
ase.functionId;
functions[i] = pQuery->pSelectExpr[i].
b
ase.functionId;
}
assert(0);
...
...
@@ -4307,15 +4338,6 @@ static UNUSED_FUNC bool isGroupbyEachTable(SSqlGroupbyExpr *pGroupbyExpr, STable
return
false
;
}
static
UNUSED_FUNC
bool
doCheckWithPrevQueryRange
(
SQuery
*
pQuery
,
TSKEY
nextKey
)
{
if
((
nextKey
>
pQuery
->
window
.
ekey
&&
QUERY_IS_ASC_QUERY
(
pQuery
))
||
(
nextKey
<
pQuery
->
window
.
ekey
&&
!
QUERY_IS_ASC_QUERY
(
pQuery
)))
{
return
false
;
}
return
true
;
}
static
void
enableExecutionForNextTable
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
...
...
@@ -5338,9 +5360,9 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
if
(
pDestFilterInfo
->
filterstr
)
{
pDestFilterInfo
->
len
=
htobe64
(
pFilterInfo
->
len
);
pDestFilterInfo
->
pz
=
(
int64_t
)
calloc
(
1
,
pDestFilterInfo
->
len
+
1
);
memcpy
((
void
*
)
pDestFilterInfo
->
pz
,
pMsg
,
pDestFilterInfo
->
len
+
1
);
pMsg
+=
(
pDestFilterInfo
->
len
+
1
);
pDestFilterInfo
->
pz
=
(
int64_t
)
calloc
(
1
,
pDestFilterInfo
->
len
);
memcpy
((
void
*
)
pDestFilterInfo
->
pz
,
pMsg
,
pDestFilterInfo
->
len
);
pMsg
+=
(
pDestFilterInfo
->
len
);
}
else
{
pDestFilterInfo
->
lowerBndi
=
htobe64
(
pFilterInfo
->
lowerBndi
);
pDestFilterInfo
->
upperBndi
=
htobe64
(
pFilterInfo
->
upperBndi
);
...
...
@@ -5351,8 +5373,6 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
}
}
bool
hasArithmeticFunction
=
false
;
*
pExpr
=
calloc
(
pQueryMsg
->
numOfOutput
,
POINTER_BYTES
);
SSqlFuncMsg
*
pExprMsg
=
(
SSqlFuncMsg
*
)
pMsg
;
...
...
@@ -5373,15 +5393,13 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
if
(
pExprMsg
->
arg
[
j
].
argType
==
TSDB_DATA_TYPE_BINARY
)
{
pExprMsg
->
arg
[
j
].
argValue
.
pz
=
pMsg
;
pMsg
+=
pExprMsg
->
arg
[
j
].
argBytes
+
1
;
// one more for the string terminated char.
pMsg
+=
pExprMsg
->
arg
[
j
].
argBytes
;
// one more for the string terminated char.
}
else
{
pExprMsg
->
arg
[
j
].
argValue
.
i64
=
htobe64
(
pExprMsg
->
arg
[
j
].
argValue
.
i64
);
}
}
if
(
pExprMsg
->
functionId
==
TSDB_FUNC_ARITHM
)
{
hasArithmeticFunction
=
true
;
}
else
if
(
pExprMsg
->
functionId
==
TSDB_FUNC_TAG
||
pExprMsg
->
functionId
==
TSDB_FUNC_TAGPRJ
||
if
(
pExprMsg
->
functionId
==
TSDB_FUNC_TAG
||
pExprMsg
->
functionId
==
TSDB_FUNC_TAGPRJ
||
pExprMsg
->
functionId
==
TSDB_FUNC_TAG_DUMMY
)
{
if
(
pExprMsg
->
colInfo
.
flag
!=
TSDB_COL_TAG
)
{
// ignore the column index check for arithmetic expression.
return
TSDB_CODE_INVALID_QUERY_MSG
;
...
...
@@ -5395,13 +5413,6 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pExprMsg
=
(
SSqlFuncMsg
*
)
pMsg
;
}
pQueryMsg
->
colNameLen
=
htonl
(
pQueryMsg
->
colNameLen
);
if
(
hasArithmeticFunction
)
{
// column name array
assert
(
pQueryMsg
->
colNameLen
>
0
);
pQueryMsg
->
colNameList
=
(
int64_t
)
pMsg
;
pMsg
+=
pQueryMsg
->
colNameLen
;
}
pMsg
=
createTableIdList
(
pQueryMsg
,
pMsg
,
pTableIdList
);
if
(
pQueryMsg
->
numOfGroupCols
>
0
)
{
// group by tag columns
...
...
@@ -5477,54 +5488,16 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
return
0
;
}
static
int32_t
buildAirthmeticExprFromMsg
(
SArithExprInfo
*
pExpr
,
SQueryTableMsg
*
pQueryMsg
)
{
// SExprInfo *pBinaryExprInfo = &pExpr->binExprInfo;
// SColumnInfo * pColMsg = pQueryMsg->colList;
#if 0
tExprNode* pBinExpr = NULL;
SSchema* pSchema = toSchema(pQueryMsg, pColMsg, pQueryMsg->numOfCols);
qTrace("qmsg:%p create binary expr from string:%s", pQueryMsg, pExpr->pBase.arg[0].argValue.pz);
tSQLBinaryExprFromString(&pBinExpr, pSchema, pQueryMsg->numOfCols, pExpr->pBase.arg[0].argValue.pz,
pExpr->pBase.arg[0].argBytes);
static
int32_t
buildAirthmeticExprFromMsg
(
SArithExprInfo
*
pArithExprInfo
,
SQueryTableMsg
*
pQueryMsg
)
{
qTrace
(
"qmsg:%p create arithmetic expr from binary string"
,
pQueryMsg
,
pArithExprInfo
->
base
.
arg
[
0
].
argValue
.
pz
);
if (pBinExpr == NULL) {
qError("qmsg:%p failed to create arithmetic expression string from:%s", pQueryMsg, pExpr->pBase.arg[0].argValue.pz);
tExprNode
*
pExprNode
=
exprTreeFromBinary
(
pArithExprInfo
->
base
.
arg
[
0
].
argValue
.
pz
,
pArithExprInfo
->
base
.
arg
[
0
].
argBytes
);
if
(
pExprNode
==
NULL
)
{
qError
(
"qmsg:%p failed to create arithmetic expression string from:%s"
,
pQueryMsg
,
pArithExprInfo
->
base
.
arg
[
0
].
argValue
.
pz
);
return
TSDB_CODE_APP_ERROR
;
}
pBinaryExprInfo->pBinExpr = pBinExpr;
int32_t num = 0;
int16_t ids[TSDB_MAX_COLUMNS] = {0};
tSQLBinaryExprTrv(pBinExpr, &num, ids);
qsort(ids, num, sizeof(int16_t), id_compar);
int32_t i = 0, j = 0;
while (i < num && j < num) {
if (ids[i] == ids[j]) {
j++;
} else {
ids[++i] = ids[j++];
}
}
assert(i <= num);
// there may be duplicated referenced columns.
num = i + 1;
pBinaryExprInfo->pReqColumns = malloc(sizeof(SColIndex) * num);
for (int32_t k = 0; k < num; ++k) {
SColIndex* pColIndex = &pBinaryExprInfo->pReqColumns[k];
pColIndex->colId = ids[k];
}
pBinaryExprInfo->numOfCols = num;
free(pSchema);
#endif
pArithExprInfo
->
pExpr
=
pExprNode
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -5542,14 +5515,14 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SArithExp
int16_t
tagLen
=
0
;
for
(
int32_t
i
=
0
;
i
<
pQueryMsg
->
numOfOutput
;
++
i
)
{
pExprs
[
i
].
pB
ase
=
*
pExprMsg
[
i
];
pExprs
[
i
].
b
ase
=
*
pExprMsg
[
i
];
pExprs
[
i
].
bytes
=
0
;
int16_t
type
=
0
;
int16_t
bytes
=
0
;
// parse the arithmetic expression
if
(
pExprs
[
i
].
pB
ase
.
functionId
==
TSDB_FUNC_ARITHM
)
{
if
(
pExprs
[
i
].
b
ase
.
functionId
==
TSDB_FUNC_ARITHM
)
{
code
=
buildAirthmeticExprFromMsg
(
&
pExprs
[
i
],
pQueryMsg
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -5559,26 +5532,26 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SArithExp
type
=
TSDB_DATA_TYPE_DOUBLE
;
bytes
=
tDataTypeDesc
[
type
].
nSize
;
}
else
if
(
pExprs
[
i
].
pB
ase
.
colInfo
.
colId
==
TSDB_TBNAME_COLUMN_INDEX
)
{
// parse the normal column
}
else
if
(
pExprs
[
i
].
b
ase
.
colInfo
.
colId
==
TSDB_TBNAME_COLUMN_INDEX
)
{
// parse the normal column
type
=
TSDB_DATA_TYPE_BINARY
;
bytes
=
TSDB_TABLE_NAME_LEN
;
}
else
{
int32_t
j
=
getColumnIndexInSource
(
pQueryMsg
,
&
pExprs
[
i
].
pB
ase
,
pTagCols
);
int32_t
j
=
getColumnIndexInSource
(
pQueryMsg
,
&
pExprs
[
i
].
b
ase
,
pTagCols
);
assert
(
j
<
pQueryMsg
->
numOfCols
);
SColumnInfo
*
pCol
=
(
TSDB_COL_IS_TAG
(
pExprs
[
i
].
pB
ase
.
colInfo
.
flag
))
?
&
pTagCols
[
j
]
:&
pQueryMsg
->
colList
[
j
];
SColumnInfo
*
pCol
=
(
TSDB_COL_IS_TAG
(
pExprs
[
i
].
b
ase
.
colInfo
.
flag
))
?
&
pTagCols
[
j
]
:&
pQueryMsg
->
colList
[
j
];
type
=
pCol
->
type
;
bytes
=
pCol
->
bytes
;
}
int32_t
param
=
pExprs
[
i
].
pB
ase
.
arg
[
0
].
argValue
.
i64
;
if
(
getResultDataInfo
(
type
,
bytes
,
pExprs
[
i
].
pB
ase
.
functionId
,
param
,
&
pExprs
[
i
].
type
,
&
pExprs
[
i
].
bytes
,
int32_t
param
=
pExprs
[
i
].
b
ase
.
arg
[
0
].
argValue
.
i64
;
if
(
getResultDataInfo
(
type
,
bytes
,
pExprs
[
i
].
b
ase
.
functionId
,
param
,
&
pExprs
[
i
].
type
,
&
pExprs
[
i
].
bytes
,
&
pExprs
[
i
].
interResBytes
,
0
,
isSuperTable
)
!=
TSDB_CODE_SUCCESS
)
{
tfree
(
pExprs
);
return
TSDB_CODE_INVALID_QUERY_MSG
;
}
if
(
pExprs
[
i
].
pBase
.
functionId
==
TSDB_FUNC_TAG_DUMMY
||
pExprs
[
i
].
pB
ase
.
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
if
(
pExprs
[
i
].
base
.
functionId
==
TSDB_FUNC_TAG_DUMMY
||
pExprs
[
i
].
b
ase
.
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
tagLen
+=
pExprs
[
i
].
bytes
;
}
assert
(
isValidDataType
(
pExprs
[
i
].
type
,
pExprs
[
i
].
bytes
));
...
...
@@ -5588,17 +5561,17 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SArithExp
// TODO refactor
for
(
int32_t
i
=
0
;
i
<
pQueryMsg
->
numOfOutput
;
++
i
)
{
pExprs
[
i
].
pB
ase
=
*
pExprMsg
[
i
];
int16_t
functId
=
pExprs
[
i
].
pB
ase
.
functionId
;
pExprs
[
i
].
b
ase
=
*
pExprMsg
[
i
];
int16_t
functId
=
pExprs
[
i
].
b
ase
.
functionId
;
if
(
functId
==
TSDB_FUNC_TOP
||
functId
==
TSDB_FUNC_BOTTOM
)
{
int32_t
j
=
getColumnIndexInSource
(
pQueryMsg
,
&
pExprs
[
i
].
pB
ase
,
pTagCols
);
int32_t
j
=
getColumnIndexInSource
(
pQueryMsg
,
&
pExprs
[
i
].
b
ase
,
pTagCols
);
assert
(
j
<
pQueryMsg
->
numOfCols
);
SColumnInfo
*
pCol
=
&
pQueryMsg
->
colList
[
j
];
int32_t
ret
=
getResultDataInfo
(
pCol
->
type
,
pCol
->
bytes
,
functId
,
pExprs
[
i
].
pB
ase
.
arg
[
0
].
argValue
.
i64
,
getResultDataInfo
(
pCol
->
type
,
pCol
->
bytes
,
functId
,
pExprs
[
i
].
b
ase
.
arg
[
0
].
argValue
.
i64
,
&
pExprs
[
i
].
type
,
&
pExprs
[
i
].
bytes
,
&
pExprs
[
i
].
interResBytes
,
tagLen
,
isSuperTable
);
assert
(
ret
==
TSDB_CODE_SUCCESS
);
}
...
...
@@ -5723,7 +5696,7 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) {
assert
(
pQuery
->
pSelectExpr
!=
NULL
&&
pQuery
!=
NULL
);
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutput
;
++
k
)
{
SSqlFuncMsg
*
pSqlExprMsg
=
&
pQuery
->
pSelectExpr
[
k
].
pB
ase
;
SSqlFuncMsg
*
pSqlExprMsg
=
&
pQuery
->
pSelectExpr
[
k
].
b
ase
;
if
(
pSqlExprMsg
->
functionId
==
TSDB_FUNC_ARITHM
||
pSqlExprMsg
->
colInfo
.
flag
==
TSDB_COL_TAG
)
{
continue
;
}
...
...
@@ -5774,6 +5747,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
pQuery
->
interpoType
=
pQueryMsg
->
interpoType
;
pQuery
->
numOfTags
=
pQueryMsg
->
numOfTags
;
// todo do not allocate ??
pQuery
->
colList
=
calloc
(
numOfCols
,
sizeof
(
SSingleColumnFilterInfo
));
if
(
pQuery
->
colList
==
NULL
)
{
goto
_cleanup
;
...
...
@@ -5951,12 +5925,12 @@ static void freeQInfo(SQInfo *pQInfo) {
if
(
pQuery
->
pSelectExpr
!=
NULL
)
{
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
SExprInfo
*
pBinExprInfo
=
&
pQuery
->
pSelectExpr
[
i
].
bin
ExprInfo
;
// SExprInfo *pBinExprInfo = &pQuery->pSelectExpr[i].arith
ExprInfo;
if
(
pBinExprInfo
->
numOfCols
>
0
)
{
tfree
(
pBinExprInfo
->
pReqColumns
);
tExprTreeDestroy
(
&
pBinExprInfo
->
pBinExpr
,
NULL
);
}
//
if (pBinExprInfo->numOfCols > 0) {
//
tfree(pBinExprInfo->pReqColumns);
//
tExprTreeDestroy(&pBinExprInfo->pBinExpr, NULL);
//
}
}
tfree
(
pQuery
->
pSelectExpr
);
...
...
src/query/src/tvariant.c
浏览文件 @
4b1d93ac
...
...
@@ -72,7 +72,7 @@ void tVariantCreateFromString(tVariant *pVar, char *pz, uint32_t len, uint32_t t
* @param len
* @param type
*/
void
tVariantCreateFromBinary
(
tVariant
*
pVar
,
c
har
*
pz
,
uint32
_t
len
,
uint32_t
type
)
{
void
tVariantCreateFromBinary
(
tVariant
*
pVar
,
c
onst
char
*
pz
,
size
_t
len
,
uint32_t
type
)
{
switch
(
type
)
{
case
TSDB_DATA_TYPE_BOOL
:
case
TSDB_DATA_TYPE_TINYINT
:
{
...
...
@@ -109,10 +109,10 @@ void tVariantCreateFromBinary(tVariant *pVar, char *pz, uint32_t len, uint32_t t
break
;
}
case
TSDB_DATA_TYPE_BINARY
:
{
pVar
->
pz
=
strndup
(
pz
,
len
);
pVar
->
nLen
=
strdequote
(
pVar
->
pz
);
case
TSDB_DATA_TYPE_BINARY
:
{
// todo refactor, extract a method
pVar
->
pz
=
calloc
(
len
,
sizeof
(
char
)
);
memcpy
(
pVar
->
pz
,
pz
,
len
);
pVar
->
nLen
=
len
;
break
;
}
...
...
src/query/tests/astTest.cpp
浏览文件 @
4b1d93ac
...
...
@@ -556,8 +556,7 @@ void exprSerializeTest1() {
ASSERT_TRUE
(
size
>
0
);
char
*
b
=
tbufGetData
(
&
buf
,
false
);
tExprNode
*
p2
=
NULL
;
exprTreeFromBinary
(
b
,
size
,
&
p2
);
tExprNode
*
p2
=
exprTreeFromBinary
(
b
,
size
);
ASSERT_EQ
(
p1
->
nodeType
,
p2
->
nodeType
);
ASSERT_EQ
(
p2
->
_node
.
optr
,
p1
->
_node
.
optr
);
...
...
@@ -593,8 +592,7 @@ void exprSerializeTest2() {
ASSERT_TRUE
(
size
>
0
);
char
*
b
=
tbufGetData
(
&
buf
,
false
);
tExprNode
*
p2
=
NULL
;
exprTreeFromBinary
(
b
,
size
,
&
p2
);
tExprNode
*
p2
=
exprTreeFromBinary
(
b
,
size
);
ASSERT_EQ
(
p1
->
nodeType
,
p2
->
nodeType
);
ASSERT_EQ
(
p2
->
_node
.
optr
,
p1
->
_node
.
optr
);
...
...
src/tsdb/inc/tsdbMain.h
浏览文件 @
4b1d93ac
...
...
@@ -74,9 +74,12 @@ typedef struct STable {
void
*
pIndex
;
// For TSDB_SUPER_TABLE, it is the skiplist index
void
*
eventHandler
;
// TODO
void
*
streamHandler
;
// TODO
TSKEY
lastKey
;
// lastkey inserted in this table, initialized as 0, TODO: make a structure
struct
STable
*
next
;
// TODO: remove the next
}
STable
;
#define TSDB_GET_TABLE_LAST_KEY(pTable) ((pTable)->lastKey)
void
*
tsdbEncodeTable
(
STable
*
pTable
,
int
*
contLen
);
STable
*
tsdbDecodeTable
(
void
*
cont
,
int
contLen
);
void
tsdbFreeEncode
(
void
*
cont
);
...
...
src/tsdb/src/tsdbMain.c
浏览文件 @
4b1d93ac
...
...
@@ -145,6 +145,34 @@ int32_t tsdbDropRepo(TsdbRepoT *repo) {
return
0
;
}
static
int
tsdbRestoreInfo
(
STsdbRepo
*
pRepo
)
{
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
STsdbFileH
*
pFileH
=
pRepo
->
tsdbFileH
;
SFileGroup
*
pFGroup
=
NULL
;
SFileGroupIter
iter
;
SRWHelper
rhelper
=
{
0
};
if
(
tsdbInitReadHelper
(
&
rhelper
,
pRepo
)
<
0
)
goto
_err
;
tsdbInitFileGroupIter
(
pFileH
,
&
iter
,
TSDB_ORDER_ASC
);
while
((
pFGroup
=
tsdbGetFileGroupNext
(
&
iter
))
!=
NULL
)
{
if
(
tsdbSetAndOpenHelperFile
(
&
rhelper
,
pFGroup
)
<
0
)
goto
_err
;
for
(
int
i
=
0
;
i
<
pRepo
->
config
.
maxTables
;
i
++
)
{
STable
*
pTable
=
pMeta
->
tables
[
i
];
SCompIdx
*
pIdx
=
&
rhelper
.
pCompIdx
[
i
];
if
(
pIdx
->
offset
>
0
&&
pTable
->
lastKey
<
pIdx
->
maxKey
)
pTable
->
lastKey
=
pIdx
->
maxKey
;
}
}
tsdbDestroyHelper
(
&
rhelper
);
return
0
;
_err:
tsdbDestroyHelper
(
&
rhelper
);
return
-
1
;
}
/**
* Open an existing TSDB storage repository
* @param tsdbDir the existing TSDB root directory
...
...
@@ -192,6 +220,16 @@ TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) {
return
NULL
;
}
// Restore key from file
if
(
tsdbRestoreInfo
(
pRepo
)
<
0
)
{
tsdbFreeCache
(
pRepo
->
tsdbCache
);
tsdbFreeMeta
(
pRepo
->
tsdbMeta
);
tsdbCloseFileH
(
pRepo
->
tsdbFileH
);
free
(
pRepo
->
rootDir
);
free
(
pRepo
);
return
NULL
;
}
pRepo
->
state
=
TSDB_REPO_STATE_ACTIVE
;
return
(
TsdbRepoT
*
)
pRepo
;
...
...
@@ -730,6 +768,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
tSkipListPut
(
pTable
->
mem
->
pData
,
pNode
);
if
(
key
>
pTable
->
mem
->
keyLast
)
pTable
->
mem
->
keyLast
=
key
;
if
(
key
<
pTable
->
mem
->
keyFirst
)
pTable
->
mem
->
keyFirst
=
key
;
if
(
key
>
pTable
->
lastKey
)
pTable
->
lastKey
=
key
;
pTable
->
mem
->
numOfPoints
=
tSkipListGetSize
(
pTable
->
mem
->
pData
);
...
...
src/tsdb/src/tsdbMeta.c
浏览文件 @
4b1d93ac
...
...
@@ -311,6 +311,7 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
table
->
tableId
=
pCfg
->
tableId
;
table
->
name
=
strdup
(
pCfg
->
name
);
table
->
lastKey
=
0
;
if
(
IS_CREATE_STABLE
(
pCfg
))
{
// TSDB_CHILD_TABLE
table
->
type
=
TSDB_CHILD_TABLE
;
table
->
superUid
=
pCfg
->
superUid
;
...
...
tests/pytest/insert/basic.py
浏览文件 @
4b1d93ac
...
...
@@ -12,7 +12,6 @@
# -*- coding: utf-8 -*-
import
sys
import
taos
from
util.log
import
*
from
util.cases
import
*
from
util.sql
import
*
...
...
tests/pytest/insert/float.py
浏览文件 @
4b1d93ac
...
...
@@ -13,9 +13,6 @@
import
sys
import
datetime
import
taos
from
util.log
import
*
from
util.cases
import
*
from
util.sql
import
*
...
...
tests/pytest/insert/int.py
浏览文件 @
4b1d93ac
...
...
@@ -12,7 +12,6 @@
# -*- coding: utf-8 -*-
import
sys
import
taos
import
datetime
from
util.log
import
*
...
...
tests/pytest/util/cases.py
浏览文件 @
4b1d93ac
...
...
@@ -71,7 +71,7 @@ class TDCases:
case
.
run
()
except
Exception
as
e
:
tdLog
.
notice
(
repr
(
e
))
tdLog
.
notice
(
"%s failed: %s"
%
(
__file__
,
fileName
))
tdLog
.
exit
(
"%s failed: %s"
%
(
__file__
,
fileName
))
case
.
stop
()
runNum
+=
1
continue
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录