Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
0e0e6d5a
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
0e0e6d5a
编写于
3月 29, 2021
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-2819]
上级
8d399443
变更
17
展开全部
隐藏空白更改
内联
并排
Showing
17 changed file
with
674 addition
and
565 deletion
+674
-565
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+50
-49
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+17
-17
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+4
-4
src/client/src/tscLocal.c
src/client/src/tscLocal.c
+17
-17
src/client/src/tscLocalMerge.c
src/client/src/tscLocalMerge.c
+27
-27
src/client/src/tscParseInsert.c
src/client/src/tscParseInsert.c
+3
-3
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+119
-119
src/client/src/tscServer.c
src/client/src/tscServer.c
+46
-52
src/client/src/tscSql.c
src/client/src/tscSql.c
+7
-7
src/client/src/tscStream.c
src/client/src/tscStream.c
+7
-7
src/client/src/tscSub.c
src/client/src/tscSub.c
+2
-2
src/client/src/tscSubquery.c
src/client/src/tscSubquery.c
+52
-55
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+167
-104
src/inc/taosmsg.h
src/inc/taosmsg.h
+12
-0
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+1
-1
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+110
-101
src/query/src/qPlan.c
src/query/src/qPlan.c
+33
-0
未找到文件。
src/client/inc/tscUtil.h
浏览文件 @
0e0e6d5a
...
...
@@ -88,7 +88,7 @@ typedef struct SVgroupTableInfo {
SArray
*
itemList
;
//SArray<STableIdInfo>
}
SVgroupTableInfo
;
static
FORCE_INLINE
SQuery
Node
Info
*
tscGetQueryInfo
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
)
{
static
FORCE_INLINE
SQueryInfo
*
tscGetQueryInfo
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
)
{
assert
(
pCmd
!=
NULL
&&
subClauseIndex
>=
0
);
if
(
pCmd
->
pQueryInfo
==
NULL
||
subClauseIndex
>=
pCmd
->
numOfClause
)
{
return
NULL
;
...
...
@@ -97,7 +97,7 @@ static FORCE_INLINE SQueryNodeInfo* tscGetQueryInfo(SSqlCmd* pCmd, int32_t subCl
return
pCmd
->
pQueryInfo
[
subClauseIndex
];
}
SQuery
Node
Info
*
tscGetActiveQueryInfo
(
SSqlCmd
*
pCmd
);
SQueryInfo
*
tscGetActiveQueryInfo
(
SSqlCmd
*
pCmd
);
int32_t
tscCreateDataBlock
(
size_t
initialSize
,
int32_t
rowSize
,
int32_t
startOffset
,
SName
*
name
,
STableMeta
*
pTableMeta
,
STableDataBlocks
**
dataBlocks
);
void
tscDestroyDataBlock
(
STableDataBlocks
*
pDataBlock
,
bool
removeMeta
);
...
...
@@ -121,33 +121,34 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i
* @param pSql sql object
* @return
*/
bool
tscIsPointInterpQuery
(
SQueryNodeInfo
*
pQueryInfo
);
bool
tscIsTWAQuery
(
SQueryNodeInfo
*
pQueryInfo
);
bool
tscIsSecondStageQuery
(
SQueryNodeInfo
*
pQueryInfo
);
bool
tscGroupbyColumn
(
SQueryNodeInfo
*
pQueryInfo
);
bool
tscIsTopBotQuery
(
SQueryNodeInfo
*
pQueryInfo
);
bool
hasTagValOutput
(
SQueryNodeInfo
*
pQueryInfo
);
bool
timeWindowInterpoRequired
(
SQueryNodeInfo
*
pQueryNodeInfo
);
bool
isStabledev
(
SQueryNodeInfo
*
pQueryInfo
);
bool
isTsCompQuery
(
SQueryNodeInfo
*
pQueryNodeInfo
);
bool
isSimpleAggregate
(
SQueryNodeInfo
*
pQueryNodeInfo
);
bool
tscNonOrderedProjectionQueryOnSTable
(
SQueryNodeInfo
*
pQueryInfo
,
int32_t
tableIndex
);
bool
tscOrderedProjectionQueryOnSTable
(
SQueryNodeInfo
*
pQueryInfo
,
int32_t
tableIndex
);
bool
tscIsProjectionQueryOnSTable
(
SQueryNodeInfo
*
pQueryInfo
,
int32_t
tableIndex
);
bool
tscIsProjectionQuery
(
SQueryNodeInfo
*
pQueryInfo
);
bool
tscIsTwoStageSTableQuery
(
SQueryNodeInfo
*
pQueryInfo
,
int32_t
tableIndex
);
bool
tscQueryTags
(
SQueryNodeInfo
*
pQueryInfo
);
bool
tscMultiRoundQuery
(
SQueryNodeInfo
*
pQueryInfo
,
int32_t
tableIndex
);
bool
tscQueryBlockInfo
(
SQueryNodeInfo
*
pQueryInfo
);
SExprInfo
*
tscAddFuncInSelectClause
(
SQueryNodeInfo
*
pQueryInfo
,
int32_t
outputColIndex
,
int16_t
functionId
,
bool
tscIsPointInterpQuery
(
SQueryInfo
*
pQueryInfo
);
bool
tscIsTWAQuery
(
SQueryInfo
*
pQueryInfo
);
bool
tscIsSecondStageQuery
(
SQueryInfo
*
pQueryInfo
);
bool
tscGroupbyColumn
(
SQueryInfo
*
pQueryInfo
);
bool
tscIsTopBotQuery
(
SQueryInfo
*
pQueryInfo
);
bool
hasTagValOutput
(
SQueryInfo
*
pQueryInfo
);
bool
timeWindowInterpoRequired
(
SQueryInfo
*
pQueryNodeInfo
);
bool
isStabledev
(
SQueryInfo
*
pQueryInfo
);
bool
isTsCompQuery
(
SQueryInfo
*
pQueryNodeInfo
);
bool
isSimpleAggregate
(
SQueryInfo
*
pQueryNodeInfo
);
bool
isBlockDistQuery
(
SQueryInfo
*
pQueryInfo
);
bool
tscNonOrderedProjectionQueryOnSTable
(
SQueryInfo
*
pQueryInfo
,
int32_t
tableIndex
);
bool
tscOrderedProjectionQueryOnSTable
(
SQueryInfo
*
pQueryInfo
,
int32_t
tableIndex
);
bool
tscIsProjectionQueryOnSTable
(
SQueryInfo
*
pQueryInfo
,
int32_t
tableIndex
);
bool
tscIsProjectionQuery
(
SQueryInfo
*
pQueryInfo
);
bool
tscIsTwoStageSTableQuery
(
SQueryInfo
*
pQueryInfo
,
int32_t
tableIndex
);
bool
tscQueryTags
(
SQueryInfo
*
pQueryInfo
);
bool
tscMultiRoundQuery
(
SQueryInfo
*
pQueryInfo
,
int32_t
tableIndex
);
bool
tscQueryBlockInfo
(
SQueryInfo
*
pQueryInfo
);
SExprInfo
*
tscAddFuncInSelectClause
(
SQueryInfo
*
pQueryInfo
,
int32_t
outputColIndex
,
int16_t
functionId
,
SColumnIndex
*
pIndex
,
SSchema
*
pColSchema
,
int16_t
colType
);
int32_t
tscSetTableFullName
(
STableMetaInfo
*
pTableMetaInfo
,
SStrToken
*
pzTableName
,
SSqlObj
*
pSql
);
void
tscClearInterpInfo
(
SQuery
Node
Info
*
pQueryInfo
);
void
tscClearInterpInfo
(
SQueryInfo
*
pQueryInfo
);
bool
tscIsInsertData
(
char
*
sqlstr
);
...
...
@@ -161,12 +162,12 @@ SInternalField* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_F
SInternalField
*
tscFieldInfoGetInternalField
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
);
TAOS_FIELD
*
tscFieldInfoGetField
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
);
void
tscFieldInfoUpdateOffset
(
SQuery
Node
Info
*
pQueryInfo
);
void
tscFieldInfoUpdateOffset
(
SQueryInfo
*
pQueryInfo
);
int16_t
tscFieldInfoGetOffset
(
SQuery
Node
Info
*
pQueryInfo
,
int32_t
index
);
int16_t
tscFieldInfoGetOffset
(
SQueryInfo
*
pQueryInfo
,
int32_t
index
);
void
tscFieldInfoClear
(
SFieldInfo
*
pFieldInfo
);
static
FORCE_INLINE
int32_t
tscNumOfFields
(
SQuery
Node
Info
*
pQueryInfo
)
{
return
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
}
static
FORCE_INLINE
int32_t
tscNumOfFields
(
SQueryInfo
*
pQueryInfo
)
{
return
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
}
int32_t
tscFieldInfoCompare
(
const
SFieldInfo
*
pFieldInfo1
,
const
SFieldInfo
*
pFieldInfo2
);
...
...
@@ -174,18 +175,18 @@ void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes)
int32_t
tscGetResRowLength
(
SArray
*
pExprList
);
SExprInfo
*
tscSqlExprInsert
(
SQuery
Node
Info
*
pQueryInfo
,
int32_t
index
,
int16_t
functionId
,
SColumnIndex
*
pColIndex
,
int16_t
type
,
SExprInfo
*
tscSqlExprInsert
(
SQueryInfo
*
pQueryInfo
,
int32_t
index
,
int16_t
functionId
,
SColumnIndex
*
pColIndex
,
int16_t
type
,
int16_t
size
,
int16_t
resColId
,
int16_t
interSize
,
bool
isTagCol
);
SExprInfo
*
tscSqlExprAppend
(
SQuery
Node
Info
*
pQueryInfo
,
int16_t
functionId
,
SColumnIndex
*
pColIndex
,
int16_t
type
,
SExprInfo
*
tscSqlExprAppend
(
SQueryInfo
*
pQueryInfo
,
int16_t
functionId
,
SColumnIndex
*
pColIndex
,
int16_t
type
,
int16_t
size
,
int16_t
resColId
,
int16_t
interSize
,
bool
isTagCol
);
SExprInfo
*
tscSqlExprUpdate
(
SQuery
Node
Info
*
pQueryInfo
,
int32_t
index
,
int16_t
functionId
,
int16_t
srcColumnIndex
,
int16_t
type
,
SExprInfo
*
tscSqlExprUpdate
(
SQueryInfo
*
pQueryInfo
,
int32_t
index
,
int16_t
functionId
,
int16_t
srcColumnIndex
,
int16_t
type
,
int16_t
size
);
size_t
tscSqlExprNumOfExprs
(
SQuery
Node
Info
*
pQueryInfo
);
void
tscInsertPrimaryTsSourceColumn
(
SQuery
Node
Info
*
pQueryInfo
,
SColumnIndex
*
pIndex
);
size_t
tscSqlExprNumOfExprs
(
SQueryInfo
*
pQueryInfo
);
void
tscInsertPrimaryTsSourceColumn
(
SQueryInfo
*
pQueryInfo
,
SColumnIndex
*
pIndex
);
SExprInfo
*
tscSqlExprGet
(
SQuery
Node
Info
*
pQueryInfo
,
int32_t
index
);
SExprInfo
*
tscSqlExprGet
(
SQueryInfo
*
pQueryInfo
,
int32_t
index
);
int32_t
tscSqlExprCopy
(
SArray
*
dst
,
const
SArray
*
src
,
uint64_t
uid
,
bool
deepcopy
);
void
tscSqlExprAssign
(
SExprInfo
*
dst
,
const
SExprInfo
*
src
);
void
tscSqlExprInfoDestroy
(
SArray
*
pExprInfo
);
...
...
@@ -208,25 +209,25 @@ void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, SBufferWriter* bw)
int32_t
tscTagCondCopy
(
STagCond
*
dest
,
const
STagCond
*
src
);
void
tscTagCondRelease
(
STagCond
*
pCond
);
void
tscGetSrcColumnInfo
(
SSrcColumnInfo
*
pColInfo
,
SQuery
Node
Info
*
pQueryInfo
);
void
tscGetSrcColumnInfo
(
SSrcColumnInfo
*
pColInfo
,
SQueryInfo
*
pQueryInfo
);
bool
tscShouldBeFreed
(
SSqlObj
*
pSql
);
STableMetaInfo
*
tscGetTableMetaInfoFromCmd
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
,
int32_t
tableIndex
);
STableMetaInfo
*
tscGetMetaInfo
(
SQuery
Node
Info
*
pQueryInfo
,
int32_t
tableIndex
);
STableMetaInfo
*
tscGetMetaInfo
(
SQueryInfo
*
pQueryInfo
,
int32_t
tableIndex
);
void
tscInitQueryInfo
(
SQuery
Node
Info
*
pQueryInfo
);
void
tscInitQueryInfo
(
SQueryInfo
*
pQueryInfo
);
void
tscClearSubqueryInfo
(
SSqlCmd
*
pCmd
);
int32_t
tscAddQueryInfo
(
SSqlCmd
*
pCmd
);
SQuery
Node
Info
*
tscGetQueryInfo
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
);
SQuery
Node
Info
*
tscGetQueryInfoS
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
);
SQueryInfo
*
tscGetQueryInfo
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
);
SQueryInfo
*
tscGetQueryInfoS
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
);
void
tscClearTableMetaInfo
(
STableMetaInfo
*
pTableMetaInfo
);
STableMetaInfo
*
tscAddTableMetaInfo
(
SQuery
Node
Info
*
pQueryInfo
,
SName
*
name
,
STableMeta
*
pTableMeta
,
STableMetaInfo
*
tscAddTableMetaInfo
(
SQueryInfo
*
pQueryInfo
,
SName
*
name
,
STableMeta
*
pTableMeta
,
SVgroupsInfo
*
vgroupList
,
SArray
*
pTagCols
,
SArray
*
pVgroupTables
);
STableMetaInfo
*
tscAddEmptyMetaInfo
(
SQuery
Node
Info
*
pQueryInfo
);
STableMetaInfo
*
tscAddEmptyMetaInfo
(
SQueryInfo
*
pQueryInfo
);
void
tscFreeVgroupTableInfo
(
SArray
*
pVgroupTables
);
SArray
*
tscVgroupTableInfoDup
(
SArray
*
pVgroupTables
);
...
...
@@ -239,8 +240,8 @@ int tscGetTableMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool creat
void
tscResetForNextRetrieve
(
SSqlRes
*
pRes
);
void
tscDoQuery
(
SSqlObj
*
pSql
);
void
executeQuery
(
SSqlObj
*
pSql
,
SQuery
Node
Info
*
pQueryInfo
);
void
doExecuteQuery
(
SSqlObj
*
pSql
,
SQuery
Node
Info
*
pQueryInfo
);
void
executeQuery
(
SSqlObj
*
pSql
,
SQueryInfo
*
pQueryInfo
);
void
doExecuteQuery
(
SSqlObj
*
pSql
,
SQueryInfo
*
pQueryInfo
);
SVgroupsInfo
*
tscVgroupInfoClone
(
SVgroupsInfo
*
pInfo
);
void
*
tscVgroupInfoClear
(
SVgroupsInfo
*
pInfo
);
...
...
@@ -269,7 +270,7 @@ void registerSqlObj(SSqlObj* pSql);
SSqlObj
*
createSubqueryObj
(
SSqlObj
*
pSql
,
int16_t
tableIndex
,
__async_cb_func_t
fp
,
void
*
param
,
int32_t
cmd
,
SSqlObj
*
pPrevSql
);
void
addGroupInfoForSubquery
(
SSqlObj
*
pParentObj
,
SSqlObj
*
pSql
,
int32_t
subClauseIndex
,
int32_t
tableIndex
);
void
doAddGroupColumnForSubquery
(
SQuery
Node
Info
*
pQueryInfo
,
int32_t
tagIndex
);
void
doAddGroupColumnForSubquery
(
SQueryInfo
*
pQueryInfo
,
int32_t
tagIndex
);
int16_t
tscGetJoinTagColIdByUid
(
STagCond
*
pTagCond
,
uint64_t
uid
);
int16_t
tscGetTagColIndexById
(
STableMeta
*
pTableMeta
,
int16_t
colId
);
...
...
@@ -288,7 +289,7 @@ int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet
bool
tscSetSqlOwner
(
SSqlObj
*
pSql
);
void
tscClearSqlOwner
(
SSqlObj
*
pSql
);
int32_t
doArithmeticCalculate
(
SQuery
Node
Info
*
pQueryInfo
,
tFilePage
*
pOutput
,
int32_t
rowSize
,
int32_t
finalRowSize
);
int32_t
doArithmeticCalculate
(
SQueryInfo
*
pQueryInfo
,
tFilePage
*
pOutput
,
int32_t
rowSize
,
int32_t
finalRowSize
);
char
*
serializeTagData
(
STagData
*
pTagData
,
char
*
pMsg
);
int32_t
copyTagData
(
STagData
*
dst
,
const
STagData
*
src
);
...
...
@@ -299,10 +300,10 @@ CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta);
uint32_t
tscGetTableMetaMaxSize
();
int32_t
tscCreateTableMetaFromCChildMeta
(
STableMeta
*
pChild
,
const
char
*
name
);
STableMeta
*
tscTableMetaDup
(
STableMeta
*
pTableMeta
);
SQuery
*
tscCreateQueryFromQueryNodeInfo
(
SQuery
Node
Info
*
pQueryNodeInfo
);
SQuery
*
tscCreateQueryFromQueryNodeInfo
(
SQueryInfo
*
pQueryNodeInfo
);
void
tsCreateSQLFunctionCtx
(
SQuery
Node
Info
*
pQueryInfo
,
SQLFunctionCtx
*
pCtx
);
void
*
createQueryInfoFromQueryNode
(
SQuery
Node
Info
*
pQueryNodeInfo
,
SExprInfo
*
pExprs
,
STableGroupInfo
*
pTableGroupInfo
,
void
tsCreateSQLFunctionCtx
(
SQueryInfo
*
pQueryInfo
,
SQLFunctionCtx
*
pCtx
);
void
*
createQueryInfoFromQueryNode
(
SQueryInfo
*
pQueryNodeInfo
,
SExprInfo
*
pExprs
,
STableGroupInfo
*
pTableGroupInfo
,
uint64_t
*
qId
,
char
*
sql
);
void
*
malloc_throw
(
size_t
size
);
...
...
src/client/inc/tsclient.h
浏览文件 @
0e0e6d5a
...
...
@@ -179,7 +179,7 @@ typedef struct STableDataBlocks {
SParamInfo
*
params
;
}
STableDataBlocks
;
typedef
struct
SQuery
Node
Info
{
typedef
struct
SQueryInfo
{
int16_t
command
;
// the command may be different for each subclause, so keep it seperately.
uint32_t
type
;
// query/insert type
STimeWindow
window
;
// the whole query time window
...
...
@@ -187,10 +187,10 @@ typedef struct SQueryNodeInfo {
SInterval
interval
;
// tumble time window
SSessionWindow
sessionWindow
;
// session time window
SSqlGroupbyExpr
groupbyExpr
;
// group
by tags info
SSqlGroupbyExpr
groupbyExpr
;
// groupby tags info
SArray
*
colList
;
// SArray<SColumn*>
SFieldInfo
fieldsInfo
;
SArray
*
exprList
;
// SArray<S
SqlExpr
*>
SArray
*
exprList
;
// SArray<S
ExprInfo
*>
SLimitVal
limit
;
SLimitVal
slimit
;
STagCond
tagCond
;
...
...
@@ -218,10 +218,10 @@ typedef struct SQueryNodeInfo {
SArray
*
pPhyOperator
;
// physical query execution plan
SQuery
*
pQuery
;
// query object
struct
SQuery
Node
Info
*
sibling
;
// sibling
SArray
*
pUpstream
;
// SArray<struct SQuery
Node
Info>
SArray
*
pDownstream
;
// SArray<struct SQuery
Node
Info>
}
SQuery
Node
Info
;
struct
SQueryInfo
*
sibling
;
// sibling
SArray
*
pUpstream
;
// SArray<struct SQueryInfo>
SArray
*
pDownstream
;
// SArray<struct SQueryInfo>
}
SQueryInfo
;
typedef
struct
{
int
command
;
...
...
@@ -245,10 +245,10 @@ typedef struct {
char
*
payload
;
int32_t
payloadLen
;
SQuery
Node
Info
**
pQueryInfo
;
SQueryInfo
**
pQueryInfo
;
int32_t
numOfClause
;
int32_t
clauseIndex
;
// index of multiple subclause query
SQuery
Node
Info
*
active
;
// current active query info
SQueryInfo
*
active
;
// current active query info
int32_t
batchSize
;
// for parameter ('?') binding and batch processing
int32_t
numOfParams
;
...
...
@@ -405,7 +405,7 @@ void tscInitMsgsFp();
int
tsParseSql
(
SSqlObj
*
pSql
,
bool
initial
);
void
tscProcessMsgFromServer
(
SRpcMsg
*
rpcMsg
,
SRpcEpSet
*
pEpSet
);
int
tscProcessSql
(
SSqlObj
*
pSql
,
SQuery
Node
Info
*
pQueryInfo
);
int
tscProcessSql
(
SSqlObj
*
pSql
,
SQueryInfo
*
pQueryInfo
);
int
tscRenewTableMeta
(
SSqlObj
*
pSql
,
int32_t
tableIndex
);
void
tscAsyncResultOnError
(
SSqlObj
*
pSql
);
...
...
@@ -415,12 +415,12 @@ void tscQueueAsyncError(void(*fp), void *param, int32_t code);
int
tscProcessLocalCmd
(
SSqlObj
*
pSql
);
int
tscCfgDynamicOptions
(
char
*
msg
);
int32_t
tscTansformFuncForSTableQuery
(
SQuery
Node
Info
*
pQueryInfo
);
void
tscRestoreFuncForSTableQuery
(
SQuery
Node
Info
*
pQueryInfo
);
int32_t
tscTansformFuncForSTableQuery
(
SQueryInfo
*
pQueryInfo
);
void
tscRestoreFuncForSTableQuery
(
SQueryInfo
*
pQueryInfo
);
int32_t
tscCreateResPointerInfo
(
SSqlRes
*
pRes
,
SQuery
Node
Info
*
pQueryInfo
);
void
tscSetResRawPtr
(
SSqlRes
*
pRes
,
SQuery
Node
Info
*
pQueryInfo
);
void
prepareInputDataFromUpstream
(
SSqlRes
*
pRes
,
SQuery
Node
Info
*
pQueryInfo
);
int32_t
tscCreateResPointerInfo
(
SSqlRes
*
pRes
,
SQueryInfo
*
pQueryInfo
);
void
tscSetResRawPtr
(
SSqlRes
*
pRes
,
SQueryInfo
*
pQueryInfo
);
void
prepareInputDataFromUpstream
(
SSqlRes
*
pRes
,
SQueryInfo
*
pQueryInfo
);
void
tscResetSqlCmd
(
SSqlCmd
*
pCmd
,
bool
removeMeta
);
...
...
@@ -455,7 +455,7 @@ bool tscIsUpdateQuery(SSqlObj* pSql);
char
*
tscGetSqlStr
(
SSqlObj
*
pSql
);
bool
tscIsQueryWithLimit
(
SSqlObj
*
pSql
);
bool
tscHasReachLimitation
(
SQuery
Node
Info
*
pQueryInfo
,
SSqlRes
*
pRes
);
bool
tscHasReachLimitation
(
SQueryInfo
*
pQueryInfo
,
SSqlRes
*
pRes
);
char
*
tscGetErrorMsgPayload
(
SSqlCmd
*
pCmd
);
...
...
@@ -519,7 +519,7 @@ extern int tscNumOfObj; // number of existed sqlObj in current process.
extern
int
(
*
tscBuildMsg
[
TSDB_SQL_MAX
])(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
);
void
tscBuildVgroupTableInfo
(
SSqlObj
*
pSql
,
STableMetaInfo
*
pTableMetaInfo
,
SArray
*
tables
);
int16_t
getNewResColId
(
SQuery
Node
Info
*
pQueryInfo
);
int16_t
getNewResColId
(
SQueryInfo
*
pQueryInfo
);
#ifdef __cplusplus
}
...
...
src/client/src/tscAsync.c
浏览文件 @
0e0e6d5a
...
...
@@ -69,7 +69,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
return
;
}
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
executeQuery
(
pSql
,
pQueryInfo
);
}
...
...
@@ -255,7 +255,7 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
pCmd
->
command
=
(
pCmd
->
command
>
TSDB_SQL_MGMT
)
?
TSDB_SQL_RETRIEVE
:
TSDB_SQL_FETCH
;
}
SQuery
Node
Info
*
pQueryInfo1
=
tscGetActiveQueryInfo
(
&
pSql
->
cmd
);
SQueryInfo
*
pQueryInfo1
=
tscGetActiveQueryInfo
(
&
pSql
->
cmd
);
tscProcessSql
(
pSql
,
pQueryInfo1
);
}
}
...
...
@@ -333,7 +333,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
tscDebug
(
"%p get %s successfully"
,
pSql
,
msg
);
if
(
pSql
->
pStream
==
NULL
)
{
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
// check if it is a sub-query of super table query first, if true, enter another routine
if
(
TSDB_QUERY_HAS_TYPE
(
pQueryInfo
->
type
,
(
TSDB_QUERY_TYPE_STABLE_SUBQUERY
|
TSDB_QUERY_TYPE_SUBQUERY
|
TSDB_QUERY_TYPE_TAG_FILTER_QUERY
)))
{
...
...
@@ -414,7 +414,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
}
else
if
(
TSDB_QUERY_HAS_TYPE
(
pQueryInfo
->
type
,
TSDB_QUERY_TYPE_INSERT
))
{
tscHandleMultivnodeInsert
(
pSql
);
}
else
{
SQuery
Node
Info
*
pQueryInfo1
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo1
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
executeQuery
(
pSql
,
pQueryInfo1
);
}
...
...
src/client/src/tscLocal.c
浏览文件 @
0e0e6d5a
...
...
@@ -53,7 +53,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
SSqlRes
*
pRes
=
&
pSql
->
res
;
// one column for each row
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
STableMeta
*
pMeta
=
pTableMetaInfo
->
pTableMeta
;
...
...
@@ -154,7 +154,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
pSql
->
cmd
.
numOfCols
=
numOfCols
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
pQueryInfo
->
order
.
order
=
TSDB_ORDER_ASC
;
TAOS_FIELD
f
=
{.
type
=
TSDB_DATA_TYPE_BINARY
,
.
bytes
=
(
TSDB_COL_NAME_LEN
-
1
)
+
VARSTR_HEADER_SIZE
};
...
...
@@ -199,7 +199,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
}
static
int32_t
tscProcessDescribeTable
(
SSqlObj
*
pSql
)
{
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
assert
(
tscGetMetaInfo
(
pQueryInfo
,
0
)
->
pTableMeta
!=
NULL
);
...
...
@@ -389,7 +389,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
SColumnIndex
index
=
{
0
};
pSql
->
cmd
.
numOfCols
=
2
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
pQueryInfo
->
order
.
order
=
TSDB_ORDER_ASC
;
TAOS_FIELD
f
;
...
...
@@ -427,7 +427,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
static
int32_t
tscSCreateSetValueToResObj
(
SSqlObj
*
pSql
,
int32_t
rowLen
,
const
char
*
tableName
,
const
char
*
ddl
)
{
SSqlRes
*
pRes
=
&
pSql
->
res
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
int32_t
numOfRows
=
1
;
if
(
strlen
(
ddl
)
==
0
)
{
...
...
@@ -444,7 +444,7 @@ static int32_t tscSCreateSetValueToResObj(SSqlObj *pSql, int32_t rowLen, const c
return
0
;
}
static
int32_t
tscSCreateBuildResult
(
SSqlObj
*
pSql
,
BuildType
type
,
const
char
*
str
,
const
char
*
result
)
{
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
int32_t
rowLen
=
tscSCreateBuildResultFields
(
pSql
,
type
,
result
);
tscFieldInfoUpdateOffset
(
pQueryInfo
);
...
...
@@ -552,7 +552,7 @@ static int32_t tscGetTableTagColumnName(SSqlObj *pSql, char **result) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
tscRebuildDDLForSubTable
(
SSqlObj
*
pSql
,
const
char
*
tableName
,
char
*
ddl
)
{
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
STableMeta
*
pMeta
=
pTableMetaInfo
->
pTableMeta
;
...
...
@@ -606,7 +606,7 @@ static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, ch
}
static
int32_t
tscRebuildDDLForNormalTable
(
SSqlObj
*
pSql
,
const
char
*
tableName
,
char
*
ddl
)
{
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
STableMeta
*
pMeta
=
pTableMetaInfo
->
pTableMeta
;
...
...
@@ -633,7 +633,7 @@ static int32_t tscRebuildDDLForNormalTable(SSqlObj *pSql, const char *tableName,
}
static
int32_t
tscRebuildDDLForSuperTable
(
SSqlObj
*
pSql
,
const
char
*
tableName
,
char
*
ddl
)
{
char
*
result
=
ddl
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
STableMeta
*
pMeta
=
pTableMetaInfo
->
pTableMeta
;
...
...
@@ -674,7 +674,7 @@ static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName,
}
static
int32_t
tscProcessShowCreateTable
(
SSqlObj
*
pSql
)
{
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
assert
(
pTableMetaInfo
->
pTableMeta
!=
NULL
);
...
...
@@ -700,7 +700,7 @@ static int32_t tscProcessShowCreateTable(SSqlObj *pSql) {
}
static
int32_t
tscProcessShowCreateDatabase
(
SSqlObj
*
pSql
)
{
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
...
...
@@ -727,7 +727,7 @@ static int32_t tscProcessShowCreateDatabase(SSqlObj *pSql) {
return
TSDB_CODE_TSC_ACTION_IN_PROGRESS
;
}
static
int32_t
tscProcessCurrentUser
(
SSqlObj
*
pSql
)
{
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SSqlExpr
*
pExpr
=
taosArrayGetP
(
pQueryInfo
->
exprList
,
0
);
pExpr
->
resBytes
=
TSDB_USER_LEN
+
TSDB_DATA_TYPE_BINARY
;
...
...
@@ -754,7 +754,7 @@ static int32_t tscProcessCurrentDB(SSqlObj *pSql) {
extractDBName
(
pSql
->
pTscObj
->
db
,
db
);
pthread_mutex_unlock
(
&
pSql
->
pTscObj
->
mutex
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
pSql
->
cmd
.
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
pSql
->
cmd
.
clauseIndex
);
SSqlExpr
*
pExpr
=
taosArrayGetP
(
pQueryInfo
->
exprList
,
0
);
pExpr
->
resType
=
TSDB_DATA_TYPE_BINARY
;
...
...
@@ -781,7 +781,7 @@ static int32_t tscProcessCurrentDB(SSqlObj *pSql) {
static
int32_t
tscProcessServerVer
(
SSqlObj
*
pSql
)
{
const
char
*
v
=
pSql
->
pTscObj
->
sversion
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
pSql
->
cmd
.
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
pSql
->
cmd
.
clauseIndex
);
SSqlExpr
*
pExpr
=
taosArrayGetP
(
pQueryInfo
->
exprList
,
0
);
pExpr
->
resType
=
TSDB_DATA_TYPE_BINARY
;
...
...
@@ -804,7 +804,7 @@ static int32_t tscProcessServerVer(SSqlObj *pSql) {
}
static
int32_t
tscProcessClientVer
(
SSqlObj
*
pSql
)
{
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SSqlExpr
*
pExpr
=
taosArrayGetP
(
pQueryInfo
->
exprList
,
0
);
pExpr
->
resType
=
TSDB_DATA_TYPE_BINARY
;
...
...
@@ -856,7 +856,7 @@ static int32_t tscProcessServStatus(SSqlObj *pSql) {
return
pSql
->
res
.
code
;
}
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SSqlExpr
*
pExpr
=
taosArrayGetP
(
pQueryInfo
->
exprList
,
0
);
int32_t
val
=
1
;
...
...
@@ -870,7 +870,7 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa
pCmd
->
numOfCols
=
1
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
pQueryInfo
->
order
.
order
=
TSDB_ORDER_ASC
;
tscFieldInfoClear
(
&
pQueryInfo
->
fieldsInfo
);
...
...
src/client/src/tscLocalMerge.c
浏览文件 @
0e0e6d5a
...
...
@@ -57,7 +57,7 @@ int32_t treeComparator(const void *pLeft, const void *pRight, void *param) {
}
// todo merge with vnode side function
void
tsCreateSQLFunctionCtx
(
SQuery
Node
Info
*
pQueryInfo
,
SQLFunctionCtx
*
pCtx
)
{
void
tsCreateSQLFunctionCtx
(
SQueryInfo
*
pQueryInfo
,
SQLFunctionCtx
*
pCtx
)
{
size_t
size
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
...
...
@@ -122,7 +122,7 @@ void tsCreateSQLFunctionCtx(SQueryNodeInfo* pQueryInfo, SQLFunctionCtx* pCtx) {
}
}
static
void
setCtxInputOutputBuffer
(
SQuery
Node
Info
*
pQueryInfo
,
SQLFunctionCtx
*
pCtx
,
SLocalMerger
*
pReducer
,
static
void
setCtxInputOutputBuffer
(
SQueryInfo
*
pQueryInfo
,
SQLFunctionCtx
*
pCtx
,
SLocalMerger
*
pReducer
,
tOrderDescriptor
*
pDesc
)
{
size_t
size
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
...
...
@@ -136,7 +136,7 @@ static void setCtxInputOutputBuffer(SQueryNodeInfo* pQueryInfo, SQLFunctionCtx *
}
}
static
SFillColInfo
*
createFillColInfo
(
SQuery
Node
Info
*
pQueryInfo
)
{
static
SFillColInfo
*
createFillColInfo
(
SQueryInfo
*
pQueryInfo
)
{
int32_t
numOfCols
=
(
int32_t
)
tscNumOfFields
(
pQueryInfo
);
int32_t
offset
=
0
;
...
...
@@ -263,7 +263,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
#ifdef _DEBUG_VIEW
printf
(
"load data page into mem for build loser tree: %"
PRIu64
" rows
\n
"
,
ds
->
filePage
.
num
);
SSrcColumnInfo
colInfo
[
256
]
=
{
0
};
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
tscGetSrcColumnInfo
(
colInfo
,
pQueryInfo
);
...
...
@@ -298,7 +298,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
param
->
pLocalData
=
pReducer
->
pLocalDataSrc
;
param
->
pDesc
=
pReducer
->
pDesc
;
param
->
num
=
pReducer
->
pLocalDataSrc
[
0
]
->
pMemBuffer
->
numOfElemsPerPage
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
param
->
groupOrderType
=
pQueryInfo
->
groupbyExpr
.
orderType
;
pReducer
->
orderPrjOnSTable
=
tscOrderedProjectionQueryOnSTable
(
pQueryInfo
,
0
);
...
...
@@ -492,7 +492,7 @@ void tscDestroyLocalMerger(SSqlObj *pSql) {
}
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
// there is no more result, so we release all allocated resource
SLocalMerger
*
pLocalMerge
=
(
SLocalMerger
*
)
atomic_exchange_ptr
(
&
pRes
->
pLocalMerger
,
NULL
);
...
...
@@ -546,7 +546,7 @@ void tscDestroyLocalMerger(SSqlObj *pSql) {
static
int32_t
createOrderDescriptor
(
tOrderDescriptor
**
pOrderDesc
,
SSqlCmd
*
pCmd
,
SColumnModel
*
pModel
)
{
int32_t
numOfGroupByCols
=
0
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
if
(
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
>
0
)
{
numOfGroupByCols
=
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
;
...
...
@@ -609,7 +609,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
}
bool
isSameGroup
(
SSqlCmd
*
pCmd
,
SLocalMerger
*
pReducer
,
char
*
pPrev
,
tFilePage
*
tmpBuffer
)
{
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
// disable merge procedure for column projection query
int16_t
functionId
=
pReducer
->
pCtx
[
0
].
functionId
;
...
...
@@ -660,7 +660,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
SColumnModel
*
pModel
=
NULL
;
*
pFinalModel
=
NULL
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
(
*
pMemBuffer
)
=
(
tExtMemBuffer
**
)
malloc
(
POINTER_BYTES
*
pSql
->
subState
.
numOfSub
);
...
...
@@ -866,7 +866,7 @@ void adjustLoserTreeFromNewData(SLocalMerger *pLocalMerge, SLocalDataSource *pOn
}
}
void
savePrevRecordAndSetupFillInfo
(
SLocalMerger
*
pLocalMerge
,
SQuery
Node
Info
*
pQueryInfo
,
SFillInfo
*
pFillInfo
)
{
void
savePrevRecordAndSetupFillInfo
(
SLocalMerger
*
pLocalMerge
,
SQueryInfo
*
pQueryInfo
,
SFillInfo
*
pFillInfo
)
{
// discard following dataset in the same group and reset the interpolation information
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
...
...
@@ -886,7 +886,7 @@ void savePrevRecordAndSetupFillInfo(SLocalMerger *pLocalMerge, SQueryNodeInfo *p
tColModelAppend
(
pModel
,
pLocalMerge
->
discardData
,
pLocalMerge
->
prevRowOfInput
,
0
,
1
,
1
);
}
static
void
genFinalResWithoutFill
(
SSqlRes
*
pRes
,
SLocalMerger
*
pLocalMerge
,
SQuery
Node
Info
*
pQueryInfo
)
{
static
void
genFinalResWithoutFill
(
SSqlRes
*
pRes
,
SLocalMerger
*
pLocalMerge
,
SQueryInfo
*
pQueryInfo
)
{
assert
(
pQueryInfo
->
interval
.
interval
==
0
||
pQueryInfo
->
fillType
==
TSDB_FILL_NONE
);
tFilePage
*
pBeforeFillData
=
pLocalMerge
->
pResultBuf
;
...
...
@@ -950,7 +950,7 @@ static void doFillResult(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool doneOutp
SSqlRes
*
pRes
=
&
pSql
->
res
;
tFilePage
*
pBeforeFillData
=
pLocalMerge
->
pResultBuf
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SFillInfo
*
pFillInfo
=
pLocalMerge
->
pFillInfo
;
// todo extract function
...
...
@@ -1049,7 +1049,7 @@ static void savePreviousRow(SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) {
static
void
doExecuteFinalMerge
(
SSqlCmd
*
pCmd
,
SLocalMerger
*
pLocalMerge
,
bool
needInit
)
{
// the tag columns need to be set before all functions execution
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
size_t
size
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
for
(
int32_t
j
=
0
;
j
<
size
;
++
j
)
{
...
...
@@ -1098,7 +1098,7 @@ static void handleUnprocessedRow(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, tFile
}
}
static
int64_t
getNumOfResultLocal
(
SQuery
Node
Info
*
pQueryInfo
,
SQLFunctionCtx
*
pCtx
)
{
static
int64_t
getNumOfResultLocal
(
SQueryInfo
*
pQueryInfo
,
SQLFunctionCtx
*
pCtx
)
{
int64_t
maxOutput
=
0
;
size_t
size
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
...
...
@@ -1127,7 +1127,7 @@ static int64_t getNumOfResultLocal(SQueryNodeInfo *pQueryInfo, SQLFunctionCtx *p
* filled with the same result, which is the tags, specified in group by clause
*
*/
static
void
fillMultiRowsOfTagsVal
(
SQuery
Node
Info
*
pQueryInfo
,
int32_t
numOfRes
,
SLocalMerger
*
pLocalMerge
)
{
static
void
fillMultiRowsOfTagsVal
(
SQueryInfo
*
pQueryInfo
,
int32_t
numOfRes
,
SLocalMerger
*
pLocalMerge
)
{
int32_t
maxBufSize
=
0
;
// find the max tags column length to prepare the buffer
size_t
size
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
...
...
@@ -1160,7 +1160,7 @@ static void fillMultiRowsOfTagsVal(SQueryNodeInfo *pQueryInfo, int32_t numOfRes,
free
(
buf
);
}
int32_t
finalizeRes
(
SQuery
Node
Info
*
pQueryInfo
,
SLocalMerger
*
pLocalMerge
)
{
int32_t
finalizeRes
(
SQueryInfo
*
pQueryInfo
,
SLocalMerger
*
pLocalMerge
)
{
size_t
size
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
for
(
int32_t
k
=
0
;
k
<
size
;
++
k
)
{
...
...
@@ -1184,7 +1184,7 @@ int32_t finalizeRes(SQueryNodeInfo *pQueryInfo, SLocalMerger *pLocalMerge) {
* results generated by simple aggregation function, we merge them all into one points
* *Exception*: column projection query, required no merge procedure
*/
bool
needToMerge
(
SQuery
Node
Info
*
pQueryInfo
,
SLocalMerger
*
pLocalMerge
,
tFilePage
*
tmpBuffer
)
{
bool
needToMerge
(
SQueryInfo
*
pQueryInfo
,
SLocalMerger
*
pLocalMerge
,
tFilePage
*
tmpBuffer
)
{
int32_t
ret
=
0
;
// merge all result by default
int16_t
functionId
=
pLocalMerge
->
pCtx
[
0
].
functionId
;
...
...
@@ -1208,7 +1208,7 @@ bool needToMerge(SQueryNodeInfo *pQueryInfo, SLocalMerger *pLocalMerge, tFilePag
return
(
ret
==
0
);
}
static
bool
reachGroupResultLimit
(
SQuery
Node
Info
*
pQueryInfo
,
SSqlRes
*
pRes
)
{
static
bool
reachGroupResultLimit
(
SQueryInfo
*
pQueryInfo
,
SSqlRes
*
pRes
)
{
return
(
pRes
->
numOfGroups
>=
pQueryInfo
->
slimit
.
limit
&&
pQueryInfo
->
slimit
.
limit
>=
0
);
}
...
...
@@ -1216,7 +1216,7 @@ static bool saveGroupResultInfo(SSqlObj *pSql) {
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
if
(
pRes
->
numOfRowsGroup
>
0
)
{
pRes
->
numOfGroups
+=
1
;
...
...
@@ -1245,7 +1245,7 @@ bool genFinalResults(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool noMoreCurren
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
tFilePage
*
pResBuf
=
pLocalMerge
->
pResultBuf
;
SColumnModel
*
pModel
=
pLocalMerge
->
resColModel
;
...
...
@@ -1292,7 +1292,7 @@ bool genFinalResults(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool noMoreCurren
return
true
;
}
void
resetOutputBuf
(
SQuery
Node
Info
*
pQueryInfo
,
SLocalMerger
*
pLocalMerge
)
{
// reset output buffer to the beginning
void
resetOutputBuf
(
SQueryInfo
*
pQueryInfo
,
SLocalMerger
*
pLocalMerge
)
{
// reset output buffer to the beginning
size_t
t
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
for
(
int32_t
i
=
0
;
i
<
t
;
++
i
)
{
SExprInfo
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
i
);
...
...
@@ -1311,7 +1311,7 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalMerger *
pRes
->
numOfRows
=
0
;
pRes
->
numOfRowsGroup
=
0
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
pQueryInfo
->
limit
.
offset
=
pLocalMerge
->
offset
;
...
...
@@ -1334,7 +1334,7 @@ static bool doBuildFilledResultForGroup(SSqlObj *pSql) {
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SLocalMerger
*
pLocalMerge
=
pRes
->
pLocalMerger
;
SFillInfo
*
pFillInfo
=
pLocalMerge
->
pFillInfo
;
...
...
@@ -1365,7 +1365,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
bool
prevGroupCompleted
=
(
!
pLocalMerge
->
discard
)
&&
pLocalMerge
->
hasUnprocessedRow
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
if
((
isAllSourcesCompleted
(
pLocalMerge
)
&&
!
pLocalMerge
->
hasPrevRow
)
||
pLocalMerge
->
pLocalDataSrc
[
0
]
==
NULL
||
prevGroupCompleted
)
{
...
...
@@ -1406,7 +1406,7 @@ static void doProcessResultInNextWindow(SSqlObj *pSql, int32_t numOfRes) {
SSqlRes
*
pRes
=
&
pSql
->
res
;
SLocalMerger
*
pLocalMerge
=
pRes
->
pLocalMerger
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
size_t
size
=
tscSqlExprNumOfExprs
(
pQueryInfo
);
for
(
int32_t
k
=
0
;
k
<
size
;
++
k
)
{
...
...
@@ -1438,7 +1438,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
}
SLocalMerger
*
pLocalMerge
=
pRes
->
pLocalMerger
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
tFilePage
*
tmpBuffer
=
pLocalMerge
->
pTempBuffer
;
if
(
doHandleLastRemainData
(
pSql
))
{
...
...
@@ -1626,7 +1626,7 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen)
pRes
->
data
=
pRes
->
pLocalMerger
->
pResultBuf
->
data
;
}
int32_t
doArithmeticCalculate
(
SQuery
Node
Info
*
pQueryInfo
,
tFilePage
*
pOutput
,
int32_t
rowSize
,
int32_t
finalRowSize
)
{
int32_t
doArithmeticCalculate
(
SQueryInfo
*
pQueryInfo
,
tFilePage
*
pOutput
,
int32_t
rowSize
,
int32_t
finalRowSize
)
{
int32_t
maxRowSize
=
MAX
(
rowSize
,
finalRowSize
);
char
*
pbuf
=
calloc
(
1
,
(
size_t
)(
pOutput
->
num
*
maxRowSize
));
...
...
src/client/src/tscParseInsert.c
浏览文件 @
0e0e6d5a
...
...
@@ -759,7 +759,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
const
int32_t
STABLE_INDEX
=
1
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
char
*
sql
=
*
sqlstr
;
...
...
@@ -1055,7 +1055,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
int32_t
totalNum
=
0
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
assert
(
pQueryInfo
!=
NULL
);
STableMetaInfo
*
pTableMetaInfo
=
(
pQueryInfo
->
numOfTables
==
0
)
?
tscAddEmptyMetaInfo
(
pQueryInfo
)
:
tscGetMetaInfo
(
pQueryInfo
,
0
);
...
...
@@ -1313,7 +1313,7 @@ int tsInsertInitialCheck(SSqlObj *pSql) {
pCmd
->
count
=
0
;
pCmd
->
command
=
TSDB_SQL_INSERT
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfoS
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoS
(
pCmd
,
pCmd
->
clauseIndex
);
TSDB_QUERY_SET_TYPE
(
pQueryInfo
->
type
,
TSDB_QUERY_TYPE_INSERT
|
pCmd
->
insertType
);
...
...
src/client/src/tscSQLParser.c
浏览文件 @
0e0e6d5a
此差异已折叠。
点击以展开。
src/client/src/tscServer.c
浏览文件 @
0e0e6d5a
...
...
@@ -302,7 +302,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
return
;
}
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
if
(
pQueryInfo
!=
NULL
&&
pQueryInfo
->
type
==
TSDB_QUERY_TYPE_FREE_RESOURCE
)
{
tscDebug
(
"%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p"
,
pSql
,
pCmd
->
command
,
pQueryInfo
->
type
,
pObj
,
pObj
->
signature
);
...
...
@@ -469,7 +469,7 @@ int doProcessSql(SSqlObj *pSql) {
return
TSDB_CODE_SUCCESS
;
}
int
tscProcessSql
(
SSqlObj
*
pSql
,
SQuery
Node
Info
*
pQueryInfo
)
{
int
tscProcessSql
(
SSqlObj
*
pSql
,
SQueryInfo
*
pQueryInfo
)
{
char
name
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
...
...
@@ -509,7 +509,7 @@ int tscProcessSql(SSqlObj *pSql, SQueryNodeInfo* pQueryInfo) {
int
tscBuildFetchMsg
(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
)
{
SRetrieveTableMsg
*
pRetrieveMsg
=
(
SRetrieveTableMsg
*
)
pSql
->
cmd
.
payload
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetActiveQueryInfo
(
&
pSql
->
cmd
);
SQueryInfo
*
pQueryInfo
=
tscGetActiveQueryInfo
(
&
pSql
->
cmd
);
pRetrieveMsg
->
free
=
htons
(
pQueryInfo
->
type
);
pRetrieveMsg
->
qid
=
htobe64
(
pSql
->
res
.
qid
);
...
...
@@ -549,7 +549,7 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
int
tscBuildSubmitMsg
(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
)
{
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
STableMeta
*
pTableMeta
=
tscGetMetaInfo
(
pQueryInfo
,
0
)
->
pTableMeta
;
char
*
pMsg
=
pSql
->
cmd
.
payload
;
...
...
@@ -588,7 +588,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql, int32_t clauseIndex) {
const
static
int32_t
MIN_QUERY_MSG_PKT_SIZE
=
TSDB_MAX_BYTES_PER_ROW
*
5
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
clauseIndex
);
int32_t
srcColListSize
=
(
int32_t
)(
taosArrayGetSize
(
pQueryInfo
->
colList
)
*
sizeof
(
SColumnInfo
));
...
...
@@ -618,7 +618,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql, int32_t clauseIndex) {
}
static
char
*
doSerializeTableInfo
(
SQueryTableMsg
*
pQueryMsg
,
SSqlObj
*
pSql
,
char
*
pMsg
)
{
SQuery
Node
Info
*
pQueryInfo
=
tscGetActiveQueryInfo
(
&
pSql
->
cmd
);
SQueryInfo
*
pQueryInfo
=
tscGetActiveQueryInfo
(
&
pSql
->
cmd
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
TSKEY
dfltKey
=
htobe64
(
pQueryMsg
->
window
.
skey
);
...
...
@@ -704,7 +704,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return
TSDB_CODE_TSC_INVALID_SQL
;
// todo add test for this
}
SQuery
Node
Info
*
pQueryInfo
=
tscGetActiveQueryInfo
(
pCmd
);
SQueryInfo
*
pQueryInfo
=
tscGetActiveQueryInfo
(
pCmd
);
SQuery
*
pQuery
=
tscCreateQueryFromQueryNodeInfo
(
pQueryInfo
);
UNUSED
(
pQuery
);
...
...
@@ -776,7 +776,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// set column list ids
size_t
numOfCols
=
taosArrayGetSize
(
pQueryInfo
->
colList
);
char
*
pMsg
=
(
char
*
)(
pQueryMsg
->
colList
)
+
numOfCols
*
sizeof
(
SColumnInfo
);
SSchema
*
pSchema
=
tscGetTableSchema
(
pTableMeta
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfo
*
pCol
=
&
pQuery
->
colList
[
i
];
...
...
@@ -814,6 +813,20 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
}
{
pQueryMsg
->
stableQuery
=
pQuery
->
stableQuery
;
pQueryMsg
->
topBotQuery
=
pQuery
->
topBotQuery
;
pQueryMsg
->
groupbyColumn
=
pQuery
->
groupbyColumn
;
pQueryMsg
->
hasTagResults
=
pQuery
->
hasTagResults
;
pQueryMsg
->
timeWindowInterpo
=
pQuery
->
timeWindowInterpo
;
pQueryMsg
->
queryBlockDist
=
pQuery
->
queryBlockDist
;
pQueryMsg
->
stabledev
=
pQuery
->
stabledev
;
pQueryMsg
->
tsCompQuery
=
pQuery
->
tsCompQuery
;
pQueryMsg
->
simpleAgg
=
pQuery
->
simpleAgg
;
pQueryMsg
->
pointInterpQuery
=
pQuery
->
pointInterpQuery
;
pQueryMsg
->
needReverseScan
=
pQuery
->
needReverseScan
;
}
SSqlExpr
*
pSqlExpr
=
(
SSqlExpr
*
)
pMsg
;
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
...
...
@@ -944,33 +957,14 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
}
if
(
numOfTags
!=
0
)
{
int32_t
numOfColumns
=
tscGetNumOfColumns
(
pTableMeta
);
int32_t
numOfTagColumns
=
tscGetNumOfTags
(
pTableMeta
);
int32_t
total
=
numOfTagColumns
+
numOfColumns
;
pSchema
=
tscGetTableTagSchema
(
pTableMeta
);
for
(
int32_t
i
=
0
;
i
<
numOfTags
;
++
i
)
{
SColumn
*
pCol
=
taosArrayGetP
(
pTableMetaInfo
->
tagColList
,
i
);
SSchema
*
pColSchema
=
&
pSchema
[
pCol
->
colIndex
.
columnIndex
];
if
((
pCol
->
colIndex
.
columnIndex
>=
numOfTagColumns
||
pCol
->
colIndex
.
columnIndex
<
-
1
)
||
(
!
isValidDataType
(
pColSchema
->
type
)))
{
char
n
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
tNameExtractFullName
(
&
pTableMetaInfo
->
name
,
n
);
tscError
(
"%p tid:%d uid:%"
PRIu64
" id:%s, tag index out of range, totalCols:%d, numOfTags:%d, index:%d, column name:%s"
,
pSql
,
pTableMeta
->
id
.
tid
,
pTableMeta
->
id
.
uid
,
n
,
total
,
numOfTagColumns
,
pCol
->
colIndex
.
columnIndex
,
pColSchema
->
name
);
return
TSDB_CODE_TSC_INVALID_SQL
;
}
if
(
pQuery
->
numOfTags
>
0
)
{
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfTags
;
++
i
)
{
SColumnInfo
*
pTag
=
&
pQuery
->
tagColList
[
i
];
SColumnInfo
*
pTagCol
=
(
SColumnInfo
*
)
pMsg
;
pTagCol
->
colId
=
htons
(
pColSchema
->
colId
);
pTagCol
->
bytes
=
htons
(
pColSchema
->
bytes
);
pTagCol
->
type
=
htons
(
pColSchema
->
type
);
pTagCol
->
colId
=
htons
(
pTag
->
colId
);
pTagCol
->
bytes
=
htons
(
pTag
->
bytes
);
pTagCol
->
type
=
htons
(
pTag
->
type
);
pTagCol
->
numOfFilters
=
0
;
pMsg
+=
sizeof
(
SColumnInfo
);
...
...
@@ -1033,7 +1027,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return
TSDB_CODE_SUCCESS
;
}
/*
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version));
...
...
@@ -1386,7 +1380,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->head.contLen = htonl(msgLen);
assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize);
return
TSDB_CODE_SUCCESS
;
return TSDB_CODE_SUCCESS;
*/
}
int32_t
tscBuildCreateDbMsg
(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
)
{
...
...
@@ -1690,7 +1684,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSchema
*
pSchema
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
// Reallocate the payload size
...
...
@@ -1779,7 +1773,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
int
tscEstimateAlterTableMsgLength
(
SSqlCmd
*
pCmd
)
{
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
return
minMsgSize
()
+
sizeof
(
SAlterTableMsg
)
+
sizeof
(
SSchema
)
*
tscNumOfFields
(
pQueryInfo
)
+
TSDB_EXTRA_PAYLOAD_SIZE
;
}
...
...
@@ -1788,7 +1782,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int
msgLen
=
0
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
...
...
@@ -1837,7 +1831,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
SUpdateTableTagValMsg
*
pUpdateMsg
=
(
SUpdateTableTagValMsg
*
)
pCmd
->
payload
;
pCmd
->
payloadLen
=
htonl
(
pUpdateMsg
->
head
.
contLen
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
STableMeta
*
pTableMeta
=
tscGetMetaInfo
(
pQueryInfo
,
0
)
->
pTableMeta
;
SNewVgroupInfo
vgroupInfo
=
{.
vgId
=
-
1
};
...
...
@@ -1873,7 +1867,7 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SRetrieveTableMsg
*
pRetrieveMsg
=
(
SRetrieveTableMsg
*
)
pCmd
->
payload
;
pRetrieveMsg
->
qid
=
htobe64
(
pSql
->
res
.
qid
);
pRetrieveMsg
->
free
=
htons
(
pQueryInfo
->
type
);
...
...
@@ -1897,7 +1891,7 @@ static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
pRes
->
row
=
0
;
pRes
->
rspType
=
1
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
if
(
tscCreateResPointerInfo
(
pRes
,
pQueryInfo
)
!=
TSDB_CODE_SUCCESS
)
{
return
pRes
->
code
;
}
...
...
@@ -1948,7 +1942,7 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
pRes
->
code
=
tscDoLocalMerge
(
pSql
);
if
(
pRes
->
code
==
TSDB_CODE_SUCCESS
&&
pRes
->
numOfRows
>
0
)
{
SQuery
Node
Info
*
pQueryInfo
=
tscGetActiveQueryInfo
(
pCmd
);
SQueryInfo
*
pQueryInfo
=
tscGetActiveQueryInfo
(
pCmd
);
tscCreateResPointerInfo
(
pRes
,
pQueryInfo
);
tscSetResRawPtr
(
pRes
,
pQueryInfo
);
}
...
...
@@ -2002,7 +1996,7 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int
tscBuildTableMetaMsg
(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
STableInfoMsg
*
pInfoMsg
=
(
STableInfoMsg
*
)
pCmd
->
payload
;
...
...
@@ -2072,7 +2066,7 @@ int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
char
*
pMsg
=
pCmd
->
payload
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SSTableVgroupMsg
*
pStableVgroupMsg
=
(
SSTableVgroupMsg
*
)
pMsg
;
pStableVgroupMsg
->
numOfTables
=
htonl
(
pQueryInfo
->
numOfTables
);
...
...
@@ -2415,7 +2409,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
SSqlRes
*
pRes
=
&
pSql
->
res
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
...
...
@@ -2474,7 +2468,7 @@ static void createHbObj(STscObj* pObj) {
pSql
->
fp
=
tscProcessHeartBeatRsp
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfoS
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoS
(
&
pSql
->
cmd
,
0
);
if
(
pQueryInfo
==
NULL
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
tfree
(
pSql
);
...
...
@@ -2631,7 +2625,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
pRes
->
completed
=
(
pRetrieve
->
completed
==
1
);
pRes
->
data
=
pRetrieve
->
data
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetActiveQueryInfo
(
pCmd
);
SQueryInfo
*
pQueryInfo
=
tscGetActiveQueryInfo
(
pCmd
);
if
(
tscCreateResPointerInfo
(
pRes
,
pQueryInfo
)
!=
TSDB_CODE_SUCCESS
)
{
return
pRes
->
code
;
}
...
...
@@ -2688,7 +2682,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
tscAddQueryInfo
(
&
pNew
->
cmd
);
SQuery
Node
Info
*
pNewQueryInfo
=
tscGetQueryInfoS
(
&
pNew
->
cmd
,
0
);
SQueryInfo
*
pNewQueryInfo
=
tscGetQueryInfoS
(
&
pNew
->
cmd
,
0
);
pNew
->
cmd
.
autoCreated
=
pSql
->
cmd
.
autoCreated
;
// create table if not exists
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
&
pNew
->
cmd
,
TSDB_DEFAULT_PAYLOAD_SIZE
+
pSql
->
cmd
.
payloadLen
))
{
...
...
@@ -2777,7 +2771,7 @@ int tscGetTableMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create
int
tscRenewTableMeta
(
SSqlObj
*
pSql
,
int32_t
tableIndex
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
tableIndex
);
char
name
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
...
...
@@ -2801,7 +2795,7 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
}
static
bool
allVgroupInfoRetrieved
(
SSqlCmd
*
pCmd
,
int32_t
clauseIndex
)
{
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
clauseIndex
);
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
numOfTables
;
++
i
)
{
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
i
);
if
(
pTableMetaInfo
->
vgroupList
==
NULL
)
{
...
...
@@ -2828,13 +2822,13 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
pNew
->
cmd
.
command
=
TSDB_SQL_STABLEVGROUP
;
// TODO TEST IT
SQuery
Node
Info
*
pNewQueryInfo
=
tscGetQueryInfoS
(
&
pNew
->
cmd
,
0
);
SQueryInfo
*
pNewQueryInfo
=
tscGetQueryInfoS
(
&
pNew
->
cmd
,
0
);
if
(
pNewQueryInfo
==
NULL
)
{
tscFreeSqlObj
(
pNew
);
return
code
;
}
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
clauseIndex
);
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
numOfTables
;
++
i
)
{
STableMetaInfo
*
pMInfo
=
tscGetMetaInfo
(
pQueryInfo
,
i
);
STableMeta
*
pTableMeta
=
tscTableMetaDup
(
pMInfo
->
pTableMeta
);
...
...
src/client/src/tscSql.c
浏览文件 @
0e0e6d5a
...
...
@@ -373,7 +373,7 @@ int taos_num_fields(TAOS_RES *res) {
if
(
pSql
==
NULL
||
pSql
->
signature
!=
pSql
)
return
0
;
int32_t
num
=
0
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
if
(
pQueryInfo
==
NULL
)
{
return
num
;
}
...
...
@@ -407,7 +407,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
SSqlObj
*
pSql
=
(
SSqlObj
*
)
res
;
if
(
pSql
==
NULL
||
pSql
->
signature
!=
pSql
)
return
0
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
if
(
pQueryInfo
==
NULL
)
{
return
NULL
;
}
...
...
@@ -558,7 +558,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) {
return
true
;
}
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
if
((
pQueryInfo
==
NULL
)
||
tscIsTwoStageSTableQuery
(
pQueryInfo
,
0
))
{
return
true
;
...
...
@@ -671,7 +671,7 @@ char *taos_get_client_info() { return version; }
static
void
tscKillSTableQuery
(
SSqlObj
*
pSql
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
if
(
!
tscIsTwoStageSTableQuery
(
pQueryInfo
,
0
))
{
return
;
...
...
@@ -722,7 +722,7 @@ void taos_stop_query(TAOS_RES *res) {
// set the error code for master pSqlObj firstly
pSql
->
res
.
code
=
TSDB_CODE_TSC_QUERY_CANCELLED
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
if
(
tscIsTwoStageSTableQuery
(
pQueryInfo
,
0
))
{
assert
(
pSql
->
rpcRid
<=
0
);
...
...
@@ -752,7 +752,7 @@ bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
return
true
;
}
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
if
(
pQueryInfo
==
NULL
)
{
return
true
;
}
...
...
@@ -932,7 +932,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t
int
code
=
TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH
;
char
*
str
=
(
char
*
)
tblNameList
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfoS
(
pCmd
,
pCmd
->
clauseIndex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoS
(
pCmd
,
pCmd
->
clauseIndex
);
if
(
pQueryInfo
==
NULL
)
{
pSql
->
res
.
code
=
terrno
;
return
terrno
;
...
...
src/client/src/tscStream.c
浏览文件 @
0e0e6d5a
...
...
@@ -35,7 +35,7 @@ static int64_t getDelayValueAfterTimewindowClosed(SSqlStream* pStream, int64_t l
return
taosGetTimestamp
(
pStream
->
precision
)
+
launchDelay
-
pStream
->
stime
-
1
;
}
static
bool
isProjectStream
(
SQuery
Node
Info
*
pQueryInfo
)
{
static
bool
isProjectStream
(
SQueryInfo
*
pQueryInfo
)
{
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
++
i
)
{
SExprInfo
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
i
);
if
(
pExpr
->
base
.
functionId
!=
TSDB_FUNC_PRJ
)
{
...
...
@@ -89,7 +89,7 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) {
return
;
}
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
code
=
tscGetTableMeta
(
pSql
,
pTableMetaInfo
);
...
...
@@ -130,7 +130,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
pStream
->
numOfRes
=
0
;
// reset the numOfRes.
SSqlObj
*
pSql
=
pStream
->
pSql
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
tscDebug
(
"%p add into timer"
,
pSql
);
if
(
pStream
->
isProject
)
{
...
...
@@ -208,7 +208,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
static
void
tscStreamFillTimeGap
(
SSqlStream
*
pStream
,
TSKEY
ts
)
{
#if 0
SSqlObj * pSql = pStream->pSql;
SQuery
Node
Info* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
if (pQueryInfo->fillType != TSDB_FILL_SET_VALUE && pQueryInfo->fillType != TSDB_FILL_NULL) {
return;
...
...
@@ -421,7 +421,7 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
int64_t
minIntervalTime
=
(
pStream
->
precision
==
TSDB_TIME_PRECISION_MICRO
)
?
tsMinIntervalTime
*
1000L
:
tsMinIntervalTime
;
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
if
(
!
pStream
->
isProject
&&
pQueryInfo
->
interval
.
interval
==
0
)
{
sprintf
(
pSql
->
cmd
.
payload
,
"the interval value is 0"
);
...
...
@@ -471,7 +471,7 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
}
static
int64_t
tscGetStreamStartTimestamp
(
SSqlObj
*
pSql
,
SSqlStream
*
pStream
,
int64_t
stime
)
{
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
if
(
pStream
->
isProject
)
{
// no data in table, flush all data till now to destination meter, 10sec delay
...
...
@@ -530,7 +530,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
return
;
}
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
STableComInfo
tinfo
=
tscGetTableInfo
(
pTableMetaInfo
->
pTableMeta
);
...
...
src/client/src/tscSub.c
浏览文件 @
0e0e6d5a
...
...
@@ -284,7 +284,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
}
size_t
numOfTables
=
taosArrayGetSize
(
tables
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SArray
*
progress
=
taosArrayInit
(
numOfTables
,
sizeof
(
SSubscriptionProgress
));
for
(
size_t
i
=
0
;
i
<
numOfTables
;
i
++
)
{
STidTags
*
tt
=
taosArrayGet
(
tables
,
i
);
...
...
@@ -502,7 +502,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
SSqlRes
*
pRes
=
&
pSql
->
res
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
pCmd
,
pCmd
->
clauseIndex
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
if
(
taosArrayGetSize
(
pSub
->
progress
)
>
0
)
{
// fix crash in single table subscription
size_t
size
=
taosArrayGetSize
(
pSub
->
progress
);
...
...
src/client/src/tscSubquery.c
浏览文件 @
0e0e6d5a
此差异已折叠。
点击以展开。
src/client/src/tscUtil.c
浏览文件 @
0e0e6d5a
此差异已折叠。
点击以展开。
src/inc/taosmsg.h
浏览文件 @
0e0e6d5a
...
...
@@ -455,6 +455,18 @@ typedef struct {
SMsgHead
head
;
char
version
[
TSDB_VERSION_LEN
];
bool
stableQuery
;
// super table query or not
bool
topBotQuery
;
// TODO used bitwise flag
bool
groupbyColumn
;
// denote if this is a groupby normal column query
bool
hasTagResults
;
// if there are tag values in final result or not
bool
timeWindowInterpo
;
// if the time window start/end required interpolation
bool
queryBlockDist
;
// if query data block distribution
bool
stabledev
;
// super table stddev query
bool
tsCompQuery
;
// is tscomp query
bool
simpleAgg
;
bool
pointInterpQuery
;
// point interpolation query
bool
needReverseScan
;
// need reverse scan
STimeWindow
window
;
int32_t
numOfTables
;
int16_t
order
;
...
...
src/query/inc/qExecutor.h
浏览文件 @
0e0e6d5a
...
...
@@ -177,7 +177,7 @@ typedef struct SSDataBlock {
SDataBlockInfo
info
;
}
SSDataBlock
;
// The basic query information extracted from the SQuery
Node
Info tree to support the
// The basic query information extracted from the SQueryInfo tree to support the
// execution of query in a data node.
typedef
struct
SQuery
{
SLimitVal
limit
;
...
...
src/query/src/qExecutor.c
浏览文件 @
0e0e6d5a
...
...
@@ -196,7 +196,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOp
static
void
initCtxOutputBuffer
(
SQLFunctionCtx
*
pCtx
,
int32_t
size
);
static
void
getAlignQueryTimeWindow
(
SQuery
*
pQuery
,
int64_t
key
,
int64_t
keyFirst
,
int64_t
keyLast
,
STimeWindow
*
win
);
static
bool
isPointInterpoQuery
(
SQuery
*
pQuery
);
//
static bool isPointInterpoQuery(SQuery *pQuery);
static
void
setResultBufSize
(
SQuery
*
pQuery
,
SRspResultInfo
*
pResultInfo
);
static
void
setCtxTagForJoin
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SQLFunctionCtx
*
pCtx
,
SExprInfo
*
pExprInfo
,
void
*
pTable
);
static
void
setParamForStableStddev
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SQLFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
SExprInfo
*
pExpr
);
...
...
@@ -275,36 +275,36 @@ static void clearNumOfRes(SQLFunctionCtx* pCtx, int32_t numOfOutput) {
}
}
static
bool
isGroupbyColumn
(
SSqlGroupbyExpr
*
pGroupbyExpr
)
{
if
(
pGroupbyExpr
==
NULL
||
pGroupbyExpr
->
numOfGroupCols
==
0
)
{
return
false
;
}
for
(
int32_t
i
=
0
;
i
<
pGroupbyExpr
->
numOfGroupCols
;
++
i
)
{
SColIndex
*
pColIndex
=
taosArrayGet
(
pGroupbyExpr
->
columnInfo
,
i
);
if
(
TSDB_COL_IS_NORMAL_COL
(
pColIndex
->
flag
))
{
//make sure the normal column locates at the second position if tbname exists in group by clause
if
(
pGroupbyExpr
->
numOfGroupCols
>
1
)
{
assert
(
pColIndex
->
colIndex
>
0
);
}
return
true
;
}
}
return
false
;
}
static
bool
isStabledev
(
SQuery
*
pQuery
)
{
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
functId
=
pQuery
->
pExpr1
[
i
].
base
.
functionId
;
if
(
functId
==
TSDB_FUNC_STDDEV_DST
)
{
return
true
;
}
}
//static bool isGroupbyColumn(SSqlGroupbyExpr *pGroupbyExpr) {
// if (pGroupbyExpr == NULL || pGroupbyExpr->numOfGroupCols == 0) {
// return false;
// }
//
// for (int32_t i = 0; i < pGroupbyExpr->numOfGroupCols; ++i) {
// SColIndex *pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, i);
// if (TSDB_COL_IS_NORMAL_COL(pColIndex->flag)) {
// //make sure the normal column locates at the second position if tbname exists in group by clause
// if (pGroupbyExpr->numOfGroupCols > 1) {
// assert(pColIndex->colIndex > 0);
// }
//
// return true;
// }
// }
//
// return false;
//}
return
false
;
}
//static UNUSED_FUNC bool isStabledev(SQuery* pQuery) {
// for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
// int32_t functId = pQuery->pExpr1[i].base.functionId;
// if (functId == TSDB_FUNC_STDDEV_DST) {
// return true;
// }
// }
//
// return false;
//}
static
bool
isSelectivityWithTagsQuery
(
SQLFunctionCtx
*
pCtx
,
int32_t
numOfOutput
)
{
bool
hasTags
=
false
;
...
...
@@ -336,51 +336,48 @@ static bool isProjQuery(SQuery *pQuery) {
return
true
;
}
static
bool
isTsCompQuery
(
SQuery
*
pQuery
)
{
return
pQuery
->
pExpr1
[
0
].
base
.
functionId
==
TSDB_FUNC_TS_COMP
;
}
static
bool
isTopBottomQuery
(
SQuery
*
pQuery
)
{
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
functionId
=
pQuery
->
pExpr1
[
i
].
base
.
functionId
;
if
(
functionId
==
TSDB_FUNC_TS
)
{
continue
;
}
if
(
functionId
==
TSDB_FUNC_TOP
||
functionId
==
TSDB_FUNC_BOTTOM
)
{
return
true
;
}
}
return
false
;
}
static
bool
timeWindowInterpoRequired
(
SQuery
*
pQuery
)
{
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
functionId
=
pQuery
->
pExpr1
[
i
].
base
.
functionId
;
if
(
functionId
==
TSDB_FUNC_TWA
||
functionId
==
TSDB_FUNC_INTERP
)
{
return
true
;
}
}
return
false
;
}
static
bool
hasTagValOutput
(
SQuery
*
pQuery
)
{
SExprInfo
*
pExprInfo
=
&
pQuery
->
pExpr1
[
0
];
if
(
pQuery
->
numOfOutput
==
1
&&
pExprInfo
->
base
.
functionId
==
TSDB_FUNC_TS_COMP
)
{
return
true
;
}
else
{
// set tag value, by which the results are aggregated.
for
(
int32_t
idx
=
0
;
idx
<
pQuery
->
numOfOutput
;
++
idx
)
{
SExprInfo
*
pLocalExprInfo
=
&
pQuery
->
pExpr1
[
idx
];
// ts_comp column required the tag value for join filter
if
(
TSDB_COL_IS_TAG
(
pLocalExprInfo
->
base
.
colInfo
.
flag
))
{
return
true
;
}
}
}
return
false
;
}
//static bool isTsCompQuery(SQuery *pQuery) { return pQuery->pExpr1[0].base.functionId == TSDB_FUNC_TS_COMP; }
//static UNUSED_FUNC bool isTopBottomQuery(SQuery *pQuery) {
// for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
// int32_t functionId = pQuery->pExpr1[i].base.functionId;
// if (functionId == TSDB_FUNC_TS) {
// continue;
// }
//
// if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
// return true;
// }
// }
//
// return false;
//}
//static UNUSED_FUNC bool timeWindowInterpoRequired(SQuery *pQuery) {
// for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
// int32_t functionId = pQuery->pExpr1[i].base.functionId;
// if (functionId == TSDB_FUNC_TWA || functionId == TSDB_FUNC_INTERP) {
// return true;
// }
// }
//
// return false;
//}
//static UNUSED_FUNC bool hasTagValOutput(SQuery* pQuery) {
// SExprInfo *pExprInfo = &pQuery->pExpr1[0];
// if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP) {
// return true;
// } else { // set tag value, by which the results are aggregated.
// for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) {
// SExprInfo *pLocalExprInfo = &pQuery->pExpr1[idx];
//
// // ts_comp column required the tag value for join filter
// if (TSDB_COL_IS_TAG(pLocalExprInfo->base.colInfo.flag)) {
// return true;
// }
// }
// }
//
// return false;
//}
static
bool
hasNullRv
(
SColIndex
*
pColIndex
,
SDataStatis
*
pStatis
)
{
if
(
TSDB_COL_IS_TAG
(
pColIndex
->
flag
)
||
TSDB_COL_IS_UD_COL
(
pColIndex
->
flag
)
||
pColIndex
->
colId
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
{
...
...
@@ -1194,7 +1191,7 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc
}
// point interpolation does not require the end key time window interpolation.
if
(
isPointInterpoQuery
(
pQuery
)
)
{
if
(
pQuery
->
pointInterpQuery
)
{
return
;
}
...
...
@@ -1769,7 +1766,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
createArithOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQuery
->
pExpr2
,
pQuery
->
numOfExpr2
);
}
if
(
pQuery
->
fillType
!=
TSDB_FILL_NONE
&&
!
isPointInterpoQuery
(
pQuery
)
)
{
if
(
pQuery
->
fillType
!=
TSDB_FILL_NONE
&&
!
pQuery
->
pointInterpQuery
)
{
SOperatorInfo
*
pInfo
=
pRuntimeEnv
->
proot
;
pRuntimeEnv
->
proot
=
createFillOperatorInfo
(
pRuntimeEnv
,
pInfo
,
pInfo
->
pExpr
,
pInfo
->
numOfOutput
);
}
...
...
@@ -1930,16 +1927,16 @@ void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_TSC_QUERY_CANCELL
//}
// todo refactor with isLastRowQuery
bool
isPointInterpoQuery
(
SQuery
*
pQuery
)
{
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
functionId
=
pQuery
->
pExpr1
[
i
].
base
.
functionId
;
if
(
functionId
==
TSDB_FUNC_INTERP
)
{
return
true
;
}
}
return
false
;
}
//
bool isPointInterpoQuery(SQuery *pQuery) {
//
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
//
int32_t functionId = pQuery->pExpr1[i].base.functionId;
//
if (functionId == TSDB_FUNC_INTERP) {
//
return true;
//
}
//
}
//
//
return false;
//
}
// TODO REFACTOR:MERGE WITH CLIENT-SIDE FUNCTION
static
UNUSED_FUNC
bool
isSumAvgRateQuery
(
SQuery
*
pQuery
)
{
...
...
@@ -2109,7 +2106,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo
return
;
}
if
(
isGroupbyColumn
(
pQuery
->
pGroupbyExpr
)
&&
pQuery
->
order
.
order
==
TSDB_ORDER_DESC
)
{
if
(
pQuery
->
groupbyColumn
&&
pQuery
->
order
.
order
==
TSDB_ORDER_DESC
)
{
pQuery
->
order
.
order
=
TSDB_ORDER_ASC
;
if
(
pQuery
->
window
.
skey
>
pQuery
->
window
.
ekey
)
{
SWAP
(
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
TSKEY
);
...
...
@@ -2119,7 +2116,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo
return
;
}
if
(
isPointInterpoQuery
(
pQuery
)
&&
pQuery
->
interval
.
interval
==
0
)
{
if
(
pQuery
->
pointInterpQuery
&&
pQuery
->
interval
.
interval
==
0
)
{
if
(
!
QUERY_IS_ASC_QUERY
(
pQuery
))
{
qDebug
(
msg
,
pQInfo
,
"interp"
,
pQuery
->
order
.
order
,
TSDB_ORDER_ASC
,
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
pQuery
->
window
.
ekey
,
pQuery
->
window
.
skey
);
SWAP
(
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
TSKEY
);
...
...
@@ -2563,7 +2560,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
pTableScanInfo
->
rowCellInfoOffset
)
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
else
if
(
pQuery
->
stableQuery
&&
(
!
isTsCompQuery
(
pQuery
)
))
{
// stable aggregate, not interval aggregate or normal column aggregate
}
else
if
(
pQuery
->
stableQuery
&&
(
!
pQuery
->
tsCompQuery
))
{
// stable aggregate, not interval aggregate or normal column aggregate
doSetTableGroupOutputBuf
(
pRuntimeEnv
,
pTableScanInfo
->
pResultRowInfo
,
pTableScanInfo
->
pCtx
,
pTableScanInfo
->
rowCellInfoOffset
,
pTableScanInfo
->
numOfOutput
,
pQuery
->
current
->
groupIndex
);
...
...
@@ -3855,7 +3852,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
}
STsdbQueryCond
cond
=
createTsdbQueryCond
(
pQuery
,
&
pQuery
->
window
);
if
(
isTsCompQuery
(
pQuery
)
||
isPointInterpoQuery
(
pQuery
)
)
{
if
(
pQuery
->
tsCompQuery
||
pQuery
->
pointInterpQuery
)
{
cond
.
type
=
BLOCK_LOAD_TABLE_SEQ_ORDER
;
}
...
...
@@ -3893,7 +3890,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
}
}
}
}
else
if
(
isPointInterpoQuery
(
pQuery
)
)
{
}
else
if
(
pQuery
->
pointInterpQuery
)
{
pRuntimeEnv
->
pQueryHandle
=
tsdbQueryRowsInExternalWindow
(
tsdb
,
&
cond
,
&
pQuery
->
tableGroupInfo
,
pQInfo
,
&
pQuery
->
memRef
);
}
else
{
pRuntimeEnv
->
pQueryHandle
=
tsdbQueryTables
(
tsdb
,
&
cond
,
&
pQuery
->
tableGroupInfo
,
pQInfo
,
&
pQuery
->
memRef
);
...
...
@@ -3933,10 +3930,10 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
pQuery
->
tsdb
=
tsdb
;
pQuery
->
topBotQuery
=
isTopBottomQuery
(
pQuery
);
pQuery
->
hasTagResults
=
hasTagValOutput
(
pQuery
);
pQuery
->
timeWindowInterpo
=
timeWindowInterpoRequired
(
pQuery
);
pQuery
->
stabledev
=
isStabledev
(
pQuery
);
//
pQuery->topBotQuery = isTopBottomQuery(pQuery);
//
pQuery->hasTagResults = hasTagValOutput(pQuery);
//
pQuery->timeWindowInterpo = timeWindowInterpoRequired(pQuery);
//
pQuery->stabledev = isStabledev(pQuery);
pRuntimeEnv
->
prevResult
=
prevResult
;
pRuntimeEnv
->
qinfo
=
pQInfo
;
...
...
@@ -3951,7 +3948,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
pQuery
->
tsdb
=
tsdb
;
pQuery
->
vgId
=
vgId
;
pQuery
->
stableQuery
=
isSTableQuery
;
pQuery
->
groupbyColumn
=
isGroupbyColumn
(
pQuery
->
pGroupbyExpr
);
pQuery
->
interBufSize
=
getOutputInterResultBufSize
(
pQuery
);
pRuntimeEnv
->
groupResInfo
.
totalGroup
=
(
int32_t
)
(
isSTableQuery
?
GET_NUM_OF_TABLEGROUP
(
pRuntimeEnv
)
:
0
);
...
...
@@ -3966,7 +3962,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
pRuntimeEnv
->
proot
=
createTagScanOperatorInfo
(
pRuntimeEnv
,
pQuery
->
pExpr1
,
pQuery
->
numOfOutput
);
}
else
if
(
pQuery
->
queryBlockDist
)
{
pRuntimeEnv
->
pTableScanner
=
createTableBlockInfoScanOperator
(
pRuntimeEnv
->
pQueryHandle
,
pRuntimeEnv
);
}
else
if
(
isTsCompQuery
(
pQuery
)
||
isPointInterpoQuery
(
pQuery
)
)
{
}
else
if
(
pQuery
->
tsCompQuery
||
pQuery
->
pointInterpQuery
)
{
pRuntimeEnv
->
pTableScanner
=
createTableSeqScanOperator
(
pRuntimeEnv
->
pQueryHandle
,
pRuntimeEnv
);
}
else
if
(
needReverseScan
(
pQuery
))
{
pRuntimeEnv
->
pTableScanner
=
createDataBlocksOptScanInfo
(
pRuntimeEnv
->
pQueryHandle
,
pRuntimeEnv
,
getNumOfScanTimes
(
pQuery
),
1
);
...
...
@@ -6128,6 +6124,19 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
pQuery
->
prjInfo
.
vgroupLimit
=
pQueryMsg
->
vgroupLimit
;
pQuery
->
prjInfo
.
ts
=
(
pQueryMsg
->
order
==
TSDB_ORDER_ASC
)
?
INT64_MIN
:
INT64_MAX
;
pQuery
->
sw
=
pQueryMsg
->
sw
;
pQuery
->
stableQuery
=
pQueryMsg
->
stableQuery
;
pQuery
->
topBotQuery
=
pQueryMsg
->
topBotQuery
;
pQuery
->
groupbyColumn
=
pQueryMsg
->
groupbyColumn
;
pQuery
->
hasTagResults
=
pQueryMsg
->
hasTagResults
;
pQuery
->
timeWindowInterpo
=
pQueryMsg
->
timeWindowInterpo
;
pQuery
->
queryBlockDist
=
pQueryMsg
->
queryBlockDist
;
pQuery
->
stabledev
=
pQueryMsg
->
stabledev
;
pQuery
->
tsCompQuery
=
pQueryMsg
->
tsCompQuery
;
pQuery
->
simpleAgg
=
pQueryMsg
->
simpleAgg
;
pQuery
->
pointInterpQuery
=
pQueryMsg
->
pointInterpQuery
;
pQuery
->
needReverseScan
=
pQueryMsg
->
needReverseScan
;
pQuery
->
colList
=
calloc
(
numOfCols
,
sizeof
(
SSingleColumnFilterInfo
));
if
(
pQuery
->
colList
==
NULL
)
{
goto
_cleanup
;
...
...
@@ -6197,7 +6206,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
changeExecuteScanOrder
(
pQInfo
,
pQueryMsg
,
stableQuery
);
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
bool
groupByCol
=
isGroupbyColumn
(
pQuery
->
pGroupbyExpr
);
//
bool groupByCol = isGroupbyColumn(pQuery->pGroupbyExpr);
STimeWindow
window
=
pQuery
->
window
;
...
...
@@ -6218,7 +6227,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
window
.
skey
=
info
->
lastKey
;
void
*
buf
=
(
char
*
)
pQInfo
->
pBuf
+
index
*
sizeof
(
STableQueryInfo
);
STableQueryInfo
*
item
=
createTableQueryInfo
(
pQuery
,
info
->
pTable
,
groupByCol
,
window
,
buf
);
STableQueryInfo
*
item
=
createTableQueryInfo
(
pQuery
,
info
->
pTable
,
pQuery
->
groupbyColumn
,
window
,
buf
);
if
(
item
==
NULL
)
{
goto
_cleanup
;
}
...
...
@@ -6454,7 +6463,7 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
// load data from file to msg buffer
if
(
isTsCompQuery
(
pQuery
)
)
{
if
(
pQuery
->
tsCompQuery
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pRuntimeEnv
->
outputBuf
->
pDataBlock
,
0
);
FILE
*
f
=
*
(
FILE
**
)
pColInfoData
->
pData
;
// TODO refactor
...
...
src/query/src/qPlan.c
浏览文件 @
0e0e6d5a
...
...
@@ -3,6 +3,39 @@
#include "qUtil.h"
#include "texpr.h"
#define QNODE_PROJECT 1
#define QNODE_FILTER 2
#define QNODE_RELATION 3
#define QNODE_AGGREGATE 4
#define QNODE_GROUPBY 5
#define QNODE_LIMIT 6
#define QNODE_JOIN 7
#define QNODE_DIST 8
#define QNODE_SORT 9
#define QNODE_UNIONALL 10
#define QNODE_TIMEWINDOW 11
typedef
struct
SQueryNode
{
int32_t
type
;
// previous operator to generated result for current node to process
// in case of join, multiple prev nodes exist.
struct
SQueryNode
*
prevNode
;
}
SQueryNode
;
// TODO create the query plan
SQueryNode
*
qCreateQueryPlan
(
SQueryInfo
*
pQueryInfo
)
{
return
NULL
;
}
char
*
queryPlanToString
()
{
return
NULL
;
}
SQueryNode
*
queryPlanFromString
()
{
return
NULL
;
}
UNUSED_FUNC
SArray
*
createTableScanPlan
(
SQuery
*
pQuery
)
{
SArray
*
plan
=
taosArrayInit
(
4
,
sizeof
(
int32_t
));
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录