Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
b6aa9da9
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
b6aa9da9
编写于
3月 26, 2021
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-2859]
上级
4a0bc100
变更
18
展开全部
隐藏空白更改
内联
并排
Showing
18 changed file
with
1535 addition
and
1005 deletion
+1535
-1005
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+51
-44
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+27
-25
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+4
-4
src/client/src/tscLocal.c
src/client/src/tscLocal.c
+24
-24
src/client/src/tscLocalMerge.c
src/client/src/tscLocalMerge.c
+82
-82
src/client/src/tscParseInsert.c
src/client/src/tscParseInsert.c
+3
-3
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+358
-352
src/client/src/tscServer.c
src/client/src/tscServer.c
+72
-73
src/client/src/tscSql.c
src/client/src/tscSql.c
+7
-7
src/client/src/tscStream.c
src/client/src/tscStream.c
+9
-9
src/client/src/tscSub.c
src/client/src/tscSub.c
+2
-2
src/client/src/tscSubquery.c
src/client/src/tscSubquery.c
+292
-112
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+426
-171
src/inc/taosmsg.h
src/inc/taosmsg.h
+9
-5
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+11
-2
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+60
-87
src/query/src/qPlan.c
src/query/src/qPlan.c
+96
-1
src/query/src/queryMain.c
src/query/src/queryMain.c
+2
-2
未找到文件。
src/client/inc/tscUtil.h
浏览文件 @
b6aa9da9
...
...
@@ -88,7 +88,7 @@ typedef struct SVgroupTableInfo {
SArray
*
itemList
;
//SArray<STableIdInfo>
}
SVgroupTableInfo
;
static
FORCE_INLINE
SQueryInfo
*
tscGetQueryInfo
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
)
{
static
FORCE_INLINE
SQuery
Node
Info
*
tscGetQueryInfo
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
)
{
assert
(
pCmd
!=
NULL
&&
subClauseIndex
>=
0
);
if
(
pCmd
->
pQueryInfo
==
NULL
||
subClauseIndex
>=
pCmd
->
numOfClause
)
{
return
NULL
;
...
...
@@ -97,7 +97,7 @@ static FORCE_INLINE SQueryInfo* tscGetQueryInfo(SSqlCmd* pCmd, int32_t subClause
return
pCmd
->
pQueryInfo
[
subClauseIndex
];
}
SQueryInfo
*
tscGetActiveQueryInfo
(
SSqlCmd
*
pCmd
);
SQuery
Node
Info
*
tscGetActiveQueryInfo
(
SSqlCmd
*
pCmd
);
int32_t
tscCreateDataBlock
(
size_t
initialSize
,
int32_t
rowSize
,
int32_t
startOffset
,
SName
*
name
,
STableMeta
*
pTableMeta
,
STableDataBlocks
**
dataBlocks
);
void
tscDestroyDataBlock
(
STableDataBlocks
*
pDataBlock
,
bool
removeMeta
);
...
...
@@ -121,27 +121,33 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i
* @param pSql sql object
* @return
*/
bool
tscIsPointInterpQuery
(
SQueryInfo
*
pQueryInfo
);
bool
tscIsTWAQuery
(
SQueryInfo
*
pQueryInfo
);
bool
tscIsSecondStageQuery
(
SQueryInfo
*
pQueryInfo
);
bool
tscGroupbyColumn
(
SQueryInfo
*
pQueryInfo
);
bool
tscNonOrderedProjectionQueryOnSTable
(
SQueryInfo
*
pQueryInfo
,
int32_t
tableIndex
);
bool
tscOrderedProjectionQueryOnSTable
(
SQueryInfo
*
pQueryInfo
,
int32_t
tableIndex
);
bool
tscIsProjectionQueryOnSTable
(
SQueryInfo
*
pQueryInfo
,
int32_t
tableIndex
);
bool
tscIsProjectionQuery
(
SQueryInfo
*
pQueryInfo
);
bool
tscIsTwoStageSTableQuery
(
SQueryInfo
*
pQueryInfo
,
int32_t
tableIndex
);
bool
tscQueryTags
(
SQueryInfo
*
pQueryInfo
);
bool
tscMultiRoundQuery
(
SQueryInfo
*
pQueryInfo
,
int32_t
tableIndex
);
bool
tscQueryBlockInfo
(
SQueryInfo
*
pQueryInfo
);
SSqlExpr
*
tscAddFuncInSelectClause
(
SQueryInfo
*
pQueryInfo
,
int32_t
outputColIndex
,
int16_t
functionId
,
bool
tscIsPointInterpQuery
(
SQueryNodeInfo
*
pQueryInfo
);
bool
tscIsTWAQuery
(
SQueryNodeInfo
*
pQueryInfo
);
bool
tscIsSecondStageQuery
(
SQueryNodeInfo
*
pQueryInfo
);
bool
tscGroupbyColumn
(
SQueryNodeInfo
*
pQueryInfo
);
bool
tscIsTopBotQuery
(
SQueryNodeInfo
*
pQueryInfo
);
bool
hasTagValOutput
(
SQueryNodeInfo
*
pQueryInfo
);
bool
timeWindowInterpoRequired
(
SQueryNodeInfo
*
pQueryNodeInfo
);
bool
isStabledev
(
SQueryNodeInfo
*
pQueryInfo
);
bool
isTsCompQuery
(
SQueryNodeInfo
*
pQueryNodeInfo
);
bool
isSimpleAggregate
(
SQueryNodeInfo
*
pQueryNodeInfo
);
bool
tscNonOrderedProjectionQueryOnSTable
(
SQueryNodeInfo
*
pQueryInfo
,
int32_t
tableIndex
);
bool
tscOrderedProjectionQueryOnSTable
(
SQueryNodeInfo
*
pQueryInfo
,
int32_t
tableIndex
);
bool
tscIsProjectionQueryOnSTable
(
SQueryNodeInfo
*
pQueryInfo
,
int32_t
tableIndex
);
bool
tscIsProjectionQuery
(
SQueryNodeInfo
*
pQueryInfo
);
bool
tscIsTwoStageSTableQuery
(
SQueryNodeInfo
*
pQueryInfo
,
int32_t
tableIndex
);
bool
tscQueryTags
(
SQueryNodeInfo
*
pQueryInfo
);
bool
tscMultiRoundQuery
(
SQueryNodeInfo
*
pQueryInfo
,
int32_t
tableIndex
);
bool
tscQueryBlockInfo
(
SQueryNodeInfo
*
pQueryInfo
);
SExprInfo
*
tscAddFuncInSelectClause
(
SQueryNodeInfo
*
pQueryInfo
,
int32_t
outputColIndex
,
int16_t
functionId
,
SColumnIndex
*
pIndex
,
SSchema
*
pColSchema
,
int16_t
colType
);
int32_t
tscSetTableFullName
(
STableMetaInfo
*
pTableMetaInfo
,
SStrToken
*
pzTableName
,
SSqlObj
*
pSql
);
void
tscClearInterpInfo
(
SQueryInfo
*
pQueryInfo
);
void
tscClearInterpInfo
(
SQuery
Node
Info
*
pQueryInfo
);
bool
tscIsInsertData
(
char
*
sqlstr
);
...
...
@@ -155,12 +161,12 @@ SInternalField* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_F
SInternalField
*
tscFieldInfoGetInternalField
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
);
TAOS_FIELD
*
tscFieldInfoGetField
(
SFieldInfo
*
pFieldInfo
,
int32_t
index
);
void
tscFieldInfoUpdateOffset
(
SQueryInfo
*
pQueryInfo
);
void
tscFieldInfoUpdateOffset
(
SQuery
Node
Info
*
pQueryInfo
);
int16_t
tscFieldInfoGetOffset
(
SQueryInfo
*
pQueryInfo
,
int32_t
index
);
int16_t
tscFieldInfoGetOffset
(
SQuery
Node
Info
*
pQueryInfo
,
int32_t
index
);
void
tscFieldInfoClear
(
SFieldInfo
*
pFieldInfo
);
static
FORCE_INLINE
int32_t
tscNumOfFields
(
SQueryInfo
*
pQueryInfo
)
{
return
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
}
static
FORCE_INLINE
int32_t
tscNumOfFields
(
SQuery
Node
Info
*
pQueryInfo
)
{
return
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
}
int32_t
tscFieldInfoCompare
(
const
SFieldInfo
*
pFieldInfo1
,
const
SFieldInfo
*
pFieldInfo2
);
...
...
@@ -168,24 +174,24 @@ void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes)
int32_t
tscGetResRowLength
(
SArray
*
pExprList
);
S
SqlExpr
*
tscSqlExprInsert
(
SQuery
Info
*
pQueryInfo
,
int32_t
index
,
int16_t
functionId
,
SColumnIndex
*
pColIndex
,
int16_t
type
,
S
ExprInfo
*
tscSqlExprInsert
(
SQueryNode
Info
*
pQueryInfo
,
int32_t
index
,
int16_t
functionId
,
SColumnIndex
*
pColIndex
,
int16_t
type
,
int16_t
size
,
int16_t
resColId
,
int16_t
interSize
,
bool
isTagCol
);
S
SqlExpr
*
tscSqlExprAppend
(
SQuery
Info
*
pQueryInfo
,
int16_t
functionId
,
SColumnIndex
*
pColIndex
,
int16_t
type
,
S
ExprInfo
*
tscSqlExprAppend
(
SQueryNode
Info
*
pQueryInfo
,
int16_t
functionId
,
SColumnIndex
*
pColIndex
,
int16_t
type
,
int16_t
size
,
int16_t
resColId
,
int16_t
interSize
,
bool
isTagCol
);
S
SqlExpr
*
tscSqlExprUpdate
(
SQuery
Info
*
pQueryInfo
,
int32_t
index
,
int16_t
functionId
,
int16_t
srcColumnIndex
,
int16_t
type
,
S
ExprInfo
*
tscSqlExprUpdate
(
SQueryNode
Info
*
pQueryInfo
,
int32_t
index
,
int16_t
functionId
,
int16_t
srcColumnIndex
,
int16_t
type
,
int16_t
size
);
size_t
tscSqlExprNumOfExprs
(
SQueryInfo
*
pQueryInfo
);
void
tscInsertPrimaryTsSourceColumn
(
SQueryInfo
*
pQueryInfo
,
SColumnIndex
*
pIndex
);
size_t
tscSqlExprNumOfExprs
(
SQuery
Node
Info
*
pQueryInfo
);
void
tscInsertPrimaryTsSourceColumn
(
SQuery
Node
Info
*
pQueryInfo
,
SColumnIndex
*
pIndex
);
S
SqlExpr
*
tscSqlExprGet
(
SQuery
Info
*
pQueryInfo
,
int32_t
index
);
S
ExprInfo
*
tscSqlExprGet
(
SQueryNode
Info
*
pQueryInfo
,
int32_t
index
);
int32_t
tscSqlExprCopy
(
SArray
*
dst
,
const
SArray
*
src
,
uint64_t
uid
,
bool
deepcopy
);
void
tscSqlExprAssign
(
SExprInfo
*
dst
,
const
SExprInfo
*
src
);
void
tscSqlExprInfoDestroy
(
SArray
*
pExprInfo
);
SColumn
*
tscColumnClone
(
const
SColumn
*
src
);
SColumn
*
tscColumnListInsert
(
SArray
*
pColList
,
SColumnIndex
*
colIndex
);
SArray
*
tscColumnListClone
(
const
SArray
*
src
,
int16_t
tableIndex
);
SColumn
*
tscColumnListInsert
(
SArray
*
pColList
,
SColumnIndex
*
colIndex
,
SSchema
*
pSchema
);
void
tscColumnListDestroy
(
SArray
*
pColList
);
void
tscDequoteAndTrimToken
(
SStrToken
*
pToken
);
...
...
@@ -202,25 +208,25 @@ void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, SBufferWriter* bw)
int32_t
tscTagCondCopy
(
STagCond
*
dest
,
const
STagCond
*
src
);
void
tscTagCondRelease
(
STagCond
*
pCond
);
void
tscGetSrcColumnInfo
(
SSrcColumnInfo
*
pColInfo
,
SQueryInfo
*
pQueryInfo
);
void
tscGetSrcColumnInfo
(
SSrcColumnInfo
*
pColInfo
,
SQuery
Node
Info
*
pQueryInfo
);
bool
tscShouldBeFreed
(
SSqlObj
*
pSql
);
STableMetaInfo
*
tscGetTableMetaInfoFromCmd
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
,
int32_t
tableIndex
);
STableMetaInfo
*
tscGetMetaInfo
(
SQueryInfo
*
pQueryInfo
,
int32_t
tableIndex
);
STableMetaInfo
*
tscGetMetaInfo
(
SQuery
Node
Info
*
pQueryInfo
,
int32_t
tableIndex
);
void
tscInitQueryInfo
(
SQueryInfo
*
pQueryInfo
);
void
tscInitQueryInfo
(
SQuery
Node
Info
*
pQueryInfo
);
void
tscClearSubqueryInfo
(
SSqlCmd
*
pCmd
);
int32_t
tscAddQueryInfo
(
SSqlCmd
*
pCmd
);
SQueryInfo
*
tscGetQueryInfo
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
);
SQueryInfo
*
tscGetQueryInfoS
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
);
SQuery
Node
Info
*
tscGetQueryInfo
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
);
SQuery
Node
Info
*
tscGetQueryInfoS
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
);
void
tscClearTableMetaInfo
(
STableMetaInfo
*
pTableMetaInfo
);
STableMetaInfo
*
tscAddTableMetaInfo
(
SQueryInfo
*
pQueryInfo
,
SName
*
name
,
STableMeta
*
pTableMeta
,
STableMetaInfo
*
tscAddTableMetaInfo
(
SQuery
Node
Info
*
pQueryInfo
,
SName
*
name
,
STableMeta
*
pTableMeta
,
SVgroupsInfo
*
vgroupList
,
SArray
*
pTagCols
,
SArray
*
pVgroupTables
);
STableMetaInfo
*
tscAddEmptyMetaInfo
(
SQueryInfo
*
pQueryInfo
);
STableMetaInfo
*
tscAddEmptyMetaInfo
(
SQuery
Node
Info
*
pQueryInfo
);
void
tscFreeVgroupTableInfo
(
SArray
*
pVgroupTables
);
SArray
*
tscVgroupTableInfoDup
(
SArray
*
pVgroupTables
);
...
...
@@ -233,8 +239,8 @@ int tscGetTableMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool creat
void
tscResetForNextRetrieve
(
SSqlRes
*
pRes
);
void
tscDoQuery
(
SSqlObj
*
pSql
);
void
executeQuery
(
SSqlObj
*
pSql
,
SQueryInfo
*
pQueryInfo
);
void
doExecuteQuery
(
SSqlObj
*
pSql
,
SQueryInfo
*
pQueryInfo
);
void
executeQuery
(
SSqlObj
*
pSql
,
SQuery
Node
Info
*
pQueryInfo
);
void
doExecuteQuery
(
SSqlObj
*
pSql
,
SQuery
Node
Info
*
pQueryInfo
);
SVgroupsInfo
*
tscVgroupInfoClone
(
SVgroupsInfo
*
pInfo
);
void
*
tscVgroupInfoClear
(
SVgroupsInfo
*
pInfo
);
...
...
@@ -263,7 +269,7 @@ void registerSqlObj(SSqlObj* pSql);
SSqlObj
*
createSubqueryObj
(
SSqlObj
*
pSql
,
int16_t
tableIndex
,
__async_cb_func_t
fp
,
void
*
param
,
int32_t
cmd
,
SSqlObj
*
pPrevSql
);
void
addGroupInfoForSubquery
(
SSqlObj
*
pParentObj
,
SSqlObj
*
pSql
,
int32_t
subClauseIndex
,
int32_t
tableIndex
);
void
doAddGroupColumnForSubquery
(
SQueryInfo
*
pQueryInfo
,
int32_t
tagIndex
);
void
doAddGroupColumnForSubquery
(
SQuery
Node
Info
*
pQueryInfo
,
int32_t
tagIndex
);
int16_t
tscGetJoinTagColIdByUid
(
STagCond
*
pTagCond
,
uint64_t
uid
);
int16_t
tscGetTagColIndexById
(
STableMeta
*
pTableMeta
,
int16_t
colId
);
...
...
@@ -282,7 +288,7 @@ int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet
bool
tscSetSqlOwner
(
SSqlObj
*
pSql
);
void
tscClearSqlOwner
(
SSqlObj
*
pSql
);
int32_t
doArithmeticCalculate
(
SQueryInfo
*
pQueryInfo
,
tFilePage
*
pOutput
,
int32_t
rowSize
,
int32_t
finalRowSize
);
int32_t
doArithmeticCalculate
(
SQuery
Node
Info
*
pQueryInfo
,
tFilePage
*
pOutput
,
int32_t
rowSize
,
int32_t
finalRowSize
);
char
*
serializeTagData
(
STagData
*
pTagData
,
char
*
pMsg
);
int32_t
copyTagData
(
STagData
*
dst
,
const
STagData
*
src
);
...
...
@@ -294,8 +300,9 @@ uint32_t tscGetTableMetaMaxSize();
int32_t
tscCreateTableMetaFromCChildMeta
(
STableMeta
*
pChild
,
const
char
*
name
);
STableMeta
*
tscTableMetaDup
(
STableMeta
*
pTableMeta
);
void
tsCreateSQLFunctionCtx
(
SQueryInfo
*
pQueryInfo
,
SQLFunctionCtx
*
pCtx
);
void
tsCreateSQLFunctionCtx
(
SQueryNodeInfo
*
pQueryInfo
,
SQLFunctionCtx
*
pCtx
);
void
*
createQueryInfoFromQueryNode
(
SQueryNodeInfo
*
pQueryNodeInfo
,
SExprInfo
*
pExprs
,
STableGroupInfo
*
pTableGroupInfo
,
uint64_t
*
qId
,
char
*
sql
);
void
*
malloc_throw
(
size_t
size
);
void
*
calloc_throw
(
size_t
nmemb
,
size_t
size
);
...
...
src/client/inc/tsclient.h
浏览文件 @
b6aa9da9
...
...
@@ -105,8 +105,7 @@ typedef struct SColumnIndex {
typedef
struct
SInternalField
{
TAOS_FIELD
field
;
bool
visible
;
SExprInfo
*
pArithExprInfo
;
SSqlExpr
*
pSqlExpr
;
SExprInfo
*
pExpr
;
}
SInternalField
;
typedef
struct
SFieldInfo
{
...
...
@@ -117,8 +116,7 @@ typedef struct SFieldInfo {
typedef
struct
SColumn
{
SColumnIndex
colIndex
;
int32_t
numOfFilters
;
SColumnFilterInfo
*
filterInfo
;
SColumnInfo
info
;
}
SColumn
;
typedef
struct
SCond
{
...
...
@@ -181,7 +179,7 @@ typedef struct STableDataBlocks {
SParamInfo
*
params
;
}
STableDataBlocks
;
typedef
struct
SQueryInfo
{
typedef
struct
SQuery
Node
Info
{
int16_t
command
;
// the command may be different for each subclause, so keep it seperately.
uint32_t
type
;
// query/insert type
STimeWindow
window
;
// the whole query time window
...
...
@@ -196,6 +194,7 @@ typedef struct SQueryInfo {
SLimitVal
limit
;
SLimitVal
slimit
;
STagCond
tagCond
;
SOrderVal
order
;
int16_t
fillType
;
// final result fill type
int16_t
numOfTables
;
...
...
@@ -215,10 +214,13 @@ typedef struct SQueryInfo {
int32_t
bufLen
;
char
*
buf
;
struct
SQueryInfo
*
sibling
;
// sibling
SArray
*
pUpstream
;
// SArray<struct SQueryInfo>
SArray
*
pDownstream
;
// SArray<struct SQueryInfo>
}
SQueryInfo
;
SArray
*
pDSOperator
;
SArray
*
pPhyOperator
;
struct
SQueryNodeInfo
*
sibling
;
// sibling
SArray
*
pUpstream
;
// SArray<struct SQueryNodeInfo>
SArray
*
pDownstream
;
// SArray<struct SQueryNodeInfo>
}
SQueryNodeInfo
;
typedef
struct
{
int
command
;
...
...
@@ -242,10 +244,10 @@ typedef struct {
char
*
payload
;
int32_t
payloadLen
;
SQueryInfo
**
pQueryInfo
;
SQuery
Node
Info
**
pQueryInfo
;
int32_t
numOfClause
;
int32_t
clauseIndex
;
// index of multiple subclause query
SQueryInfo
*
active
;
// current active query info
SQuery
Node
Info
*
active
;
// current active query info
int32_t
batchSize
;
// for parameter ('?') binding and batch processing
int32_t
numOfParams
;
...
...
@@ -402,7 +404,7 @@ void tscInitMsgsFp();
int
tsParseSql
(
SSqlObj
*
pSql
,
bool
initial
);
void
tscProcessMsgFromServer
(
SRpcMsg
*
rpcMsg
,
SRpcEpSet
*
pEpSet
);
int
tscProcessSql
(
SSqlObj
*
pSql
,
SQueryInfo
*
pQueryInfo
);
int
tscProcessSql
(
SSqlObj
*
pSql
,
SQuery
Node
Info
*
pQueryInfo
);
int
tscRenewTableMeta
(
SSqlObj
*
pSql
,
int32_t
tableIndex
);
void
tscAsyncResultOnError
(
SSqlObj
*
pSql
);
...
...
@@ -412,12 +414,12 @@ void tscQueueAsyncError(void(*fp), void *param, int32_t code);
int
tscProcessLocalCmd
(
SSqlObj
*
pSql
);
int
tscCfgDynamicOptions
(
char
*
msg
);
int32_t
tscTansformFuncForSTableQuery
(
SQueryInfo
*
pQueryInfo
);
void
tscRestoreFuncForSTableQuery
(
SQueryInfo
*
pQueryInfo
);
int32_t
tscTansformFuncForSTableQuery
(
SQuery
Node
Info
*
pQueryInfo
);
void
tscRestoreFuncForSTableQuery
(
SQuery
Node
Info
*
pQueryInfo
);
int32_t
tscCreateResPointerInfo
(
SSqlRes
*
pRes
,
SQueryInfo
*
pQueryInfo
);
void
tscSetResRawPtr
(
SSqlRes
*
pRes
,
SQueryInfo
*
pQueryInfo
);
void
prepareInputDataFromUpstream
(
SSqlRes
*
pRes
,
SQueryInfo
*
pQueryInfo
);
int32_t
tscCreateResPointerInfo
(
SSqlRes
*
pRes
,
SQuery
Node
Info
*
pQueryInfo
);
void
tscSetResRawPtr
(
SSqlRes
*
pRes
,
SQuery
Node
Info
*
pQueryInfo
);
void
prepareInputDataFromUpstream
(
SSqlRes
*
pRes
,
SQuery
Node
Info
*
pQueryInfo
);
void
tscResetSqlCmd
(
SSqlCmd
*
pCmd
,
bool
removeMeta
);
...
...
@@ -452,7 +454,7 @@ bool tscIsUpdateQuery(SSqlObj* pSql);
char
*
tscGetSqlStr
(
SSqlObj
*
pSql
);
bool
tscIsQueryWithLimit
(
SSqlObj
*
pSql
);
bool
tscHasReachLimitation
(
SQueryInfo
*
pQueryInfo
,
SSqlRes
*
pRes
);
bool
tscHasReachLimitation
(
SQuery
Node
Info
*
pQueryInfo
,
SSqlRes
*
pRes
);
char
*
tscGetErrorMsgPayload
(
SSqlCmd
*
pCmd
);
...
...
@@ -471,15 +473,15 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField
UNUSED
(
pData
);
// user defined constant value output columns
if
(
pInfo
->
p
SqlExpr
!=
NULL
&&
TSDB_COL_IS_UD_COL
(
pInfo
->
pSqlExpr
->
colInfo
.
flag
))
{
if
(
pInfo
->
p
Expr
!=
NULL
&&
TSDB_COL_IS_UD_COL
(
pInfo
->
pExpr
->
base
.
colInfo
.
flag
))
{
if
(
type
==
TSDB_DATA_TYPE_NCHAR
||
type
==
TSDB_DATA_TYPE_BINARY
)
{
pData
=
pInfo
->
p
SqlExpr
->
param
[
1
].
pz
;
pRes
->
length
[
columnIndex
]
=
pInfo
->
p
SqlExpr
->
param
[
1
].
nLen
;
pRes
->
tsrow
[
columnIndex
]
=
(
pInfo
->
p
SqlExpr
->
param
[
1
].
nType
==
TSDB_DATA_TYPE_NULL
)
?
NULL
:
(
unsigned
char
*
)
pData
;
pData
=
pInfo
->
p
Expr
->
base
.
param
[
1
].
pz
;
pRes
->
length
[
columnIndex
]
=
pInfo
->
p
Expr
->
base
.
param
[
1
].
nLen
;
pRes
->
tsrow
[
columnIndex
]
=
(
pInfo
->
p
Expr
->
base
.
param
[
1
].
nType
==
TSDB_DATA_TYPE_NULL
)
?
NULL
:
(
unsigned
char
*
)
pData
;
}
else
{
assert
(
bytes
==
tDataTypes
[
type
].
bytes
);
pRes
->
tsrow
[
columnIndex
]
=
isNull
(
pData
,
type
)
?
NULL
:
(
unsigned
char
*
)
&
pInfo
->
p
SqlExpr
->
param
[
1
].
i64
;
pRes
->
tsrow
[
columnIndex
]
=
isNull
(
pData
,
type
)
?
NULL
:
(
unsigned
char
*
)
&
pInfo
->
p
Expr
->
base
.
param
[
1
].
i64
;
pRes
->
length
[
columnIndex
]
=
bytes
;
}
}
else
{
...
...
@@ -488,7 +490,7 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField
assert
(
realLen
<=
bytes
-
VARSTR_HEADER_SIZE
);
pRes
->
tsrow
[
columnIndex
]
=
(
isNull
(
pData
,
type
))
?
NULL
:
(
unsigned
char
*
)((
tstr
*
)
pData
)
->
data
;
if
(
realLen
<
pInfo
->
p
SqlExpr
->
resBytes
-
VARSTR_HEADER_SIZE
)
{
// todo refactor
if
(
realLen
<
pInfo
->
p
Expr
->
base
.
resBytes
-
VARSTR_HEADER_SIZE
)
{
// todo refactor
*
(
pData
+
realLen
+
VARSTR_HEADER_SIZE
)
=
0
;
}
...
...
@@ -516,7 +518,7 @@ extern int tscNumOfObj; // number of existed sqlObj in current process.
extern
int
(
*
tscBuildMsg
[
TSDB_SQL_MAX
])(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
);
void
tscBuildVgroupTableInfo
(
SSqlObj
*
pSql
,
STableMetaInfo
*
pTableMetaInfo
,
SArray
*
tables
);
int16_t
getNewResColId
(
SQueryInfo
*
pQueryInfo
);
int16_t
getNewResColId
(
SQuery
Node
Info
*
pQueryInfo
);
#ifdef __cplusplus
}
...
...
src/client/src/tscAsync.c
浏览文件 @
b6aa9da9
...
...
@@ -69,7 +69,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
return
;
}
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
executeQuery
(
pSql
,
pQueryInfo
);
}
...
...
@@ -255,7 +255,7 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
pCmd
->
command
=
(
pCmd
->
command
>
TSDB_SQL_MGMT
)
?
TSDB_SQL_RETRIEVE
:
TSDB_SQL_FETCH
;
}
SQueryInfo
*
pQueryInfo1
=
tscGetActiveQueryInfo
(
&
pSql
->
cmd
);
SQuery
Node
Info
*
pQueryInfo1
=
tscGetActiveQueryInfo
(
&
pSql
->
cmd
);
tscProcessSql
(
pSql
,
pQueryInfo1
);
}
}
...
...
@@ -333,7 +333,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
tscDebug
(
"%p get %s successfully"
,
pSql
,
msg
);
if
(
pSql
->
pStream
==
NULL
)
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
// check if it is a sub-query of super table query first, if true, enter another routine
if
(
TSDB_QUERY_HAS_TYPE
(
pQueryInfo
->
type
,
(
TSDB_QUERY_TYPE_STABLE_SUBQUERY
|
TSDB_QUERY_TYPE_SUBQUERY
|
TSDB_QUERY_TYPE_TAG_FILTER_QUERY
)))
{
...
...
@@ -414,7 +414,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
}
else
if
(
TSDB_QUERY_HAS_TYPE
(
pQueryInfo
->
type
,
TSDB_QUERY_TYPE_INSERT
))
{
tscHandleMultivnodeInsert
(
pSql
);
}
else
{
SQueryInfo
*
pQueryInfo1
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQuery
Node
Info
*
pQueryInfo1
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
executeQuery
(
pSql
,
pQueryInfo1
);
}
...
...
src/client/src/tscLocal.c
浏览文件 @
b6aa9da9
...
...
@@ -53,7 +53,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
SSqlRes
*
pRes
=
&
pSql
->
res
;
// one column for each row
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
STableMeta
*
pMeta
=
pTableMetaInfo
->
pTableMeta
;
...
...
@@ -154,14 +154,14 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
pSql
->
cmd
.
numOfCols
=
numOfCols
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
pQueryInfo
->
order
.
order
=
TSDB_ORDER_ASC
;
TAOS_FIELD
f
=
{.
type
=
TSDB_DATA_TYPE_BINARY
,
.
bytes
=
(
TSDB_COL_NAME_LEN
-
1
)
+
VARSTR_HEADER_SIZE
};
tstrncpy
(
f
.
name
,
"Field"
,
sizeof
(
f
.
name
));
SInternalField
*
pInfo
=
tscFieldInfoAppend
(
&
pQueryInfo
->
fieldsInfo
,
&
f
);
pInfo
->
p
Sql
Expr
=
tscSqlExprAppend
(
pQueryInfo
,
TSDB_FUNC_TS_DUMMY
,
&
index
,
TSDB_DATA_TYPE_BINARY
,
pInfo
->
pExpr
=
tscSqlExprAppend
(
pQueryInfo
,
TSDB_FUNC_TS_DUMMY
,
&
index
,
TSDB_DATA_TYPE_BINARY
,
(
TSDB_COL_NAME_LEN
-
1
)
+
VARSTR_HEADER_SIZE
,
-
1000
,
(
TSDB_COL_NAME_LEN
-
1
),
false
);
rowLen
+=
((
TSDB_COL_NAME_LEN
-
1
)
+
VARSTR_HEADER_SIZE
);
...
...
@@ -171,7 +171,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
tstrncpy
(
f
.
name
,
"Type"
,
sizeof
(
f
.
name
));
pInfo
=
tscFieldInfoAppend
(
&
pQueryInfo
->
fieldsInfo
,
&
f
);
pInfo
->
p
Sql
Expr
=
tscSqlExprAppend
(
pQueryInfo
,
TSDB_FUNC_TS_DUMMY
,
&
index
,
TSDB_DATA_TYPE_BINARY
,
(
int16_t
)(
typeColLength
+
VARSTR_HEADER_SIZE
),
pInfo
->
pExpr
=
tscSqlExprAppend
(
pQueryInfo
,
TSDB_FUNC_TS_DUMMY
,
&
index
,
TSDB_DATA_TYPE_BINARY
,
(
int16_t
)(
typeColLength
+
VARSTR_HEADER_SIZE
),
-
1000
,
typeColLength
,
false
);
rowLen
+=
typeColLength
+
VARSTR_HEADER_SIZE
;
...
...
@@ -181,7 +181,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
tstrncpy
(
f
.
name
,
"Length"
,
sizeof
(
f
.
name
));
pInfo
=
tscFieldInfoAppend
(
&
pQueryInfo
->
fieldsInfo
,
&
f
);
pInfo
->
p
Sql
Expr
=
tscSqlExprAppend
(
pQueryInfo
,
TSDB_FUNC_TS_DUMMY
,
&
index
,
TSDB_DATA_TYPE_INT
,
sizeof
(
int32_t
),
pInfo
->
pExpr
=
tscSqlExprAppend
(
pQueryInfo
,
TSDB_FUNC_TS_DUMMY
,
&
index
,
TSDB_DATA_TYPE_INT
,
sizeof
(
int32_t
),
-
1000
,
sizeof
(
int32_t
),
false
);
rowLen
+=
sizeof
(
int32_t
);
...
...
@@ -191,7 +191,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
tstrncpy
(
f
.
name
,
"Note"
,
sizeof
(
f
.
name
));
pInfo
=
tscFieldInfoAppend
(
&
pQueryInfo
->
fieldsInfo
,
&
f
);
pInfo
->
p
Sql
Expr
=
tscSqlExprAppend
(
pQueryInfo
,
TSDB_FUNC_TS_DUMMY
,
&
index
,
TSDB_DATA_TYPE_BINARY
,
(
int16_t
)(
noteColLength
+
VARSTR_HEADER_SIZE
),
pInfo
->
pExpr
=
tscSqlExprAppend
(
pQueryInfo
,
TSDB_FUNC_TS_DUMMY
,
&
index
,
TSDB_DATA_TYPE_BINARY
,
(
int16_t
)(
noteColLength
+
VARSTR_HEADER_SIZE
),
-
1000
,
noteColLength
,
false
);
rowLen
+=
noteColLength
+
VARSTR_HEADER_SIZE
;
...
...
@@ -199,7 +199,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
}
static
int32_t
tscProcessDescribeTable
(
SSqlObj
*
pSql
)
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
assert
(
tscGetMetaInfo
(
pQueryInfo
,
0
)
->
pTableMeta
!=
NULL
);
...
...
@@ -389,7 +389,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
SColumnIndex
index
=
{
0
};
pSql
->
cmd
.
numOfCols
=
2
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
pQueryInfo
->
order
.
order
=
TSDB_ORDER_ASC
;
TAOS_FIELD
f
;
...
...
@@ -404,7 +404,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
}
SInternalField
*
pInfo
=
tscFieldInfoAppend
(
&
pQueryInfo
->
fieldsInfo
,
&
f
);
pInfo
->
p
Sql
Expr
=
tscSqlExprAppend
(
pQueryInfo
,
TSDB_FUNC_TS_DUMMY
,
&
index
,
TSDB_DATA_TYPE_BINARY
,
f
.
bytes
,
-
1000
,
f
.
bytes
-
VARSTR_HEADER_SIZE
,
false
);
pInfo
->
pExpr
=
tscSqlExprAppend
(
pQueryInfo
,
TSDB_FUNC_TS_DUMMY
,
&
index
,
TSDB_DATA_TYPE_BINARY
,
f
.
bytes
,
-
1000
,
f
.
bytes
-
VARSTR_HEADER_SIZE
,
false
);
rowLen
+=
f
.
bytes
;
...
...
@@ -417,7 +417,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
}
pInfo
=
tscFieldInfoAppend
(
&
pQueryInfo
->
fieldsInfo
,
&
f
);
pInfo
->
p
SqlExpr
=
tscSqlExprAppend
(
pQueryInfo
,
TSDB_FUNC_TS_DUMMY
,
&
index
,
TSDB_DATA_TYPE_BINARY
,
pInfo
->
p
Expr
=
tscSqlExprAppend
(
pQueryInfo
,
TSDB_FUNC_TS_DUMMY
,
&
index
,
TSDB_DATA_TYPE_BINARY
,
(
int16_t
)(
ddlLen
+
VARSTR_HEADER_SIZE
),
-
1000
,
ddlLen
,
false
);
rowLen
+=
ddlLen
+
VARSTR_HEADER_SIZE
;
...
...
@@ -427,7 +427,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
static
int32_t
tscSCreateSetValueToResObj
(
SSqlObj
*
pSql
,
int32_t
rowLen
,
const
char
*
tableName
,
const
char
*
ddl
)
{
SSqlRes
*
pRes
=
&
pSql
->
res
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
int32_t
numOfRows
=
1
;
if
(
strlen
(
ddl
)
==
0
)
{
...
...
@@ -444,7 +444,7 @@ static int32_t tscSCreateSetValueToResObj(SSqlObj *pSql, int32_t rowLen, const c
return
0
;
}
static
int32_t
tscSCreateBuildResult
(
SSqlObj
*
pSql
,
BuildType
type
,
const
char
*
str
,
const
char
*
result
)
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
int32_t
rowLen
=
tscSCreateBuildResultFields
(
pSql
,
type
,
result
);
tscFieldInfoUpdateOffset
(
pQueryInfo
);
...
...
@@ -552,7 +552,7 @@ static int32_t tscGetTableTagColumnName(SSqlObj *pSql, char **result) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
tscRebuildDDLForSubTable
(
SSqlObj
*
pSql
,
const
char
*
tableName
,
char
*
ddl
)
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
STableMeta
*
pMeta
=
pTableMetaInfo
->
pTableMeta
;
...
...
@@ -606,7 +606,7 @@ static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, ch
}
static
int32_t
tscRebuildDDLForNormalTable
(
SSqlObj
*
pSql
,
const
char
*
tableName
,
char
*
ddl
)
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
STableMeta
*
pMeta
=
pTableMetaInfo
->
pTableMeta
;
...
...
@@ -633,7 +633,7 @@ static int32_t tscRebuildDDLForNormalTable(SSqlObj *pSql, const char *tableName,
}
static
int32_t
tscRebuildDDLForSuperTable
(
SSqlObj
*
pSql
,
const
char
*
tableName
,
char
*
ddl
)
{
char
*
result
=
ddl
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
STableMeta
*
pMeta
=
pTableMetaInfo
->
pTableMeta
;
...
...
@@ -674,7 +674,7 @@ static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName,
}
static
int32_t
tscProcessShowCreateTable
(
SSqlObj
*
pSql
)
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
assert
(
pTableMetaInfo
->
pTableMeta
!=
NULL
);
...
...
@@ -700,7 +700,7 @@ static int32_t tscProcessShowCreateTable(SSqlObj *pSql) {
}
static
int32_t
tscProcessShowCreateDatabase
(
SSqlObj
*
pSql
)
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
...
...
@@ -727,7 +727,7 @@ static int32_t tscProcessShowCreateDatabase(SSqlObj *pSql) {
return
TSDB_CODE_TSC_ACTION_IN_PROGRESS
;
}
static
int32_t
tscProcessCurrentUser
(
SSqlObj
*
pSql
)
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SSqlExpr
*
pExpr
=
taosArrayGetP
(
pQueryInfo
->
exprList
,
0
);
pExpr
->
resBytes
=
TSDB_USER_LEN
+
TSDB_DATA_TYPE_BINARY
;
...
...
@@ -754,7 +754,7 @@ static int32_t tscProcessCurrentDB(SSqlObj *pSql) {
extractDBName
(
pSql
->
pTscObj
->
db
,
db
);
pthread_mutex_unlock
(
&
pSql
->
pTscObj
->
mutex
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
pSql
->
cmd
.
clauseIndex
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
pSql
->
cmd
.
clauseIndex
);
SSqlExpr
*
pExpr
=
taosArrayGetP
(
pQueryInfo
->
exprList
,
0
);
pExpr
->
resType
=
TSDB_DATA_TYPE_BINARY
;
...
...
@@ -781,7 +781,7 @@ static int32_t tscProcessCurrentDB(SSqlObj *pSql) {
static
int32_t
tscProcessServerVer
(
SSqlObj
*
pSql
)
{
const
char
*
v
=
pSql
->
pTscObj
->
sversion
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
pSql
->
cmd
.
clauseIndex
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
pSql
->
cmd
.
clauseIndex
);
SSqlExpr
*
pExpr
=
taosArrayGetP
(
pQueryInfo
->
exprList
,
0
);
pExpr
->
resType
=
TSDB_DATA_TYPE_BINARY
;
...
...
@@ -804,7 +804,7 @@ static int32_t tscProcessServerVer(SSqlObj *pSql) {
}
static
int32_t
tscProcessClientVer
(
SSqlObj
*
pSql
)
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SSqlExpr
*
pExpr
=
taosArrayGetP
(
pQueryInfo
->
exprList
,
0
);
pExpr
->
resType
=
TSDB_DATA_TYPE_BINARY
;
...
...
@@ -856,7 +856,7 @@ static int32_t tscProcessServStatus(SSqlObj *pSql) {
return
pSql
->
res
.
code
;
}
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SSqlExpr
*
pExpr
=
taosArrayGetP
(
pQueryInfo
->
exprList
,
0
);
int32_t
val
=
1
;
...
...
@@ -870,7 +870,7 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa
pCmd
->
numOfCols
=
1
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
pQueryInfo
->
order
.
order
=
TSDB_ORDER_ASC
;
tscFieldInfoClear
(
&
pQueryInfo
->
fieldsInfo
);
...
...
@@ -882,7 +882,7 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa
tscInitResObjForLocalQuery
(
pSql
,
1
,
(
int32_t
)
valueLength
);
SInternalField
*
pInfo
=
tscFieldInfoGetInternalField
(
&
pQueryInfo
->
fieldsInfo
,
0
);
pInfo
->
p
Sql
Expr
=
taosArrayGetP
(
pQueryInfo
->
exprList
,
0
);
pInfo
->
pExpr
=
taosArrayGetP
(
pQueryInfo
->
exprList
,
0
);
memcpy
(
pRes
->
data
,
val
,
pInfo
->
field
.
bytes
);
}
...
...
src/client/src/tscLocalMerge.c
浏览文件 @
b6aa9da9
此差异已折叠。
点击以展开。
src/client/src/tscParseInsert.c
浏览文件 @
b6aa9da9
...
...
@@ -759,7 +759,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
const
int32_t
STABLE_INDEX
=
1
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
char
*
sql
=
*
sqlstr
;
...
...
@@ -1055,7 +1055,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
int32_t
totalNum
=
0
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
assert
(
pQueryInfo
!=
NULL
);
STableMetaInfo
*
pTableMetaInfo
=
(
pQueryInfo
->
numOfTables
==
0
)
?
tscAddEmptyMetaInfo
(
pQueryInfo
)
:
tscGetMetaInfo
(
pQueryInfo
,
0
);
...
...
@@ -1313,7 +1313,7 @@ int tsInsertInitialCheck(SSqlObj *pSql) {
pCmd
->
count
=
0
;
pCmd
->
command
=
TSDB_SQL_INSERT
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoS
(
pCmd
,
pCmd
->
clauseIndex
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfoS
(
pCmd
,
pCmd
->
clauseIndex
);
TSDB_QUERY_SET_TYPE
(
pQueryInfo
->
type
,
TSDB_QUERY_TYPE_INSERT
|
pCmd
->
insertType
);
...
...
src/client/src/tscSQLParser.c
浏览文件 @
b6aa9da9
此差异已折叠。
点击以展开。
src/client/src/tscServer.c
浏览文件 @
b6aa9da9
...
...
@@ -302,7 +302,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
return
;
}
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
if
(
pQueryInfo
!=
NULL
&&
pQueryInfo
->
type
==
TSDB_QUERY_TYPE_FREE_RESOURCE
)
{
tscDebug
(
"%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p"
,
pSql
,
pCmd
->
command
,
pQueryInfo
->
type
,
pObj
,
pObj
->
signature
);
...
...
@@ -469,7 +469,7 @@ int doProcessSql(SSqlObj *pSql) {
return
TSDB_CODE_SUCCESS
;
}
int
tscProcessSql
(
SSqlObj
*
pSql
,
SQueryInfo
*
pQueryInfo
)
{
int
tscProcessSql
(
SSqlObj
*
pSql
,
SQuery
Node
Info
*
pQueryInfo
)
{
char
name
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
...
...
@@ -509,7 +509,7 @@ int tscProcessSql(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
int
tscBuildFetchMsg
(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
)
{
SRetrieveTableMsg
*
pRetrieveMsg
=
(
SRetrieveTableMsg
*
)
pSql
->
cmd
.
payload
;
SQueryInfo
*
pQueryInfo
=
tscGetActiveQueryInfo
(
&
pSql
->
cmd
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetActiveQueryInfo
(
&
pSql
->
cmd
);
pRetrieveMsg
->
free
=
htons
(
pQueryInfo
->
type
);
pRetrieveMsg
->
qid
=
htobe64
(
pSql
->
res
.
qid
);
...
...
@@ -549,7 +549,7 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
int
tscBuildSubmitMsg
(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
)
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
STableMeta
*
pTableMeta
=
tscGetMetaInfo
(
pQueryInfo
,
0
)
->
pTableMeta
;
char
*
pMsg
=
pSql
->
cmd
.
payload
;
...
...
@@ -588,7 +588,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql, int32_t clauseIndex) {
const
static
int32_t
MIN_QUERY_MSG_PKT_SIZE
=
TSDB_MAX_BYTES_PER_ROW
*
5
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
clauseIndex
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
clauseIndex
);
int32_t
srcColListSize
=
(
int32_t
)(
taosArrayGetSize
(
pQueryInfo
->
colList
)
*
sizeof
(
SColumnInfo
));
...
...
@@ -618,7 +618,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql, int32_t clauseIndex) {
}
static
char
*
doSerializeTableInfo
(
SQueryTableMsg
*
pQueryMsg
,
SSqlObj
*
pSql
,
char
*
pMsg
)
{
SQueryInfo
*
pQueryInfo
=
tscGetActiveQueryInfo
(
&
pSql
->
cmd
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetActiveQueryInfo
(
&
pSql
->
cmd
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
TSKEY
dfltKey
=
htobe64
(
pQueryMsg
->
window
.
skey
);
...
...
@@ -704,7 +704,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return
TSDB_CODE_TSC_INVALID_SQL
;
// todo add test for this
}
SQueryInfo
*
pQueryInfo
=
tscGetActiveQueryInfo
(
pCmd
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetActiveQueryInfo
(
pCmd
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
STableMeta
*
pTableMeta
=
pTableMetaInfo
->
pTableMeta
;
...
...
@@ -779,21 +779,20 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
char
n
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
tNameExtractFullName
(
&
pTableMetaInfo
->
name
,
n
);
tscError
(
"%p tid:%d uid:%"
PRIu64
" id:%s, column index out of range, numOfColumns:%d, index:%d, column name:%s"
,
pSql
,
pTableMeta
->
id
.
tid
,
pTableMeta
->
id
.
uid
,
n
,
tscGetNumOfColumns
(
pTableMeta
),
pCol
->
colIndex
.
columnIndex
,
pColSchema
->
name
);
return
TSDB_CODE_TSC_INVALID_SQL
;
}
pQueryMsg
->
colList
[
i
].
colId
=
htons
(
pCol
Schema
->
colId
);
pQueryMsg
->
colList
[
i
].
bytes
=
htons
(
pCol
Schema
->
bytes
);
pQueryMsg
->
colList
[
i
].
type
=
htons
(
pCol
Schema
->
type
);
pQueryMsg
->
colList
[
i
].
numOfFilters
=
htons
(
pCol
->
numOfFilters
);
pQueryMsg
->
colList
[
i
].
colId
=
htons
(
pCol
->
info
.
colId
);
pQueryMsg
->
colList
[
i
].
bytes
=
htons
(
pCol
->
info
.
bytes
);
pQueryMsg
->
colList
[
i
].
type
=
htons
(
pCol
->
info
.
type
);
pQueryMsg
->
colList
[
i
].
numOfFilters
=
htons
(
pCol
->
info
.
numOfFilters
);
// append the filter information after the basic column information
for
(
int32_t
f
=
0
;
f
<
pCol
->
numOfFilters
;
++
f
)
{
SColumnFilterInfo
*
pColFilter
=
&
pCol
->
filterInfo
[
f
];
for
(
int32_t
f
=
0
;
f
<
pCol
->
info
.
numOfFilters
;
++
f
)
{
SColumnFilterInfo
*
pColFilter
=
&
pCol
->
info
.
filterInfo
[
f
];
SColumnFilterInfo
*
pFilterMsg
=
(
SColumnFilterInfo
*
)
pMsg
;
pFilterMsg
->
filterstr
=
htons
(
pColFilter
->
filterstr
);
...
...
@@ -822,7 +821,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlExpr
*
pSqlFuncExpr
=
(
SSqlExpr
*
)
pMsg
;
for
(
int32_t
i
=
0
;
i
<
tscSqlExprNumOfExprs
(
pQueryInfo
);
++
i
)
{
S
SqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
i
);
S
ExprInfo
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
i
);
// the queried table has been removed and a new table with the same name has already been created already
// return error msg
...
...
@@ -831,48 +830,48 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return
TSDB_CODE_TSC_INVALID_TABLE_NAME
;
}
if
(
!
tscValidateColumnId
(
pTableMetaInfo
,
pExpr
->
colInfo
.
colId
,
pExpr
->
numOfParams
))
{
if
(
!
tscValidateColumnId
(
pTableMetaInfo
,
pExpr
->
base
.
colInfo
.
colId
,
pExpr
->
base
.
numOfParams
))
{
tscError
(
"%p table schema is not matched with parsed sql"
,
pSql
);
return
TSDB_CODE_TSC_INVALID_SQL
;
}
assert
(
pExpr
->
resColId
<
0
);
assert
(
pExpr
->
base
.
resColId
<
0
);
pSqlFuncExpr
->
colInfo
.
colId
=
htons
(
pExpr
->
colInfo
.
colId
);
pSqlFuncExpr
->
colInfo
.
colIndex
=
htons
(
pExpr
->
colInfo
.
colIndex
);
pSqlFuncExpr
->
colInfo
.
flag
=
htons
(
pExpr
->
colInfo
.
flag
);
pSqlFuncExpr
->
colInfo
.
colId
=
htons
(
pExpr
->
base
.
colInfo
.
colId
);
pSqlFuncExpr
->
colInfo
.
colIndex
=
htons
(
pExpr
->
base
.
colInfo
.
colIndex
);
pSqlFuncExpr
->
colInfo
.
flag
=
htons
(
pExpr
->
base
.
colInfo
.
flag
);
pSqlFuncExpr
->
colType
=
htons
(
pExpr
->
colType
);
pSqlFuncExpr
->
colBytes
=
htons
(
pExpr
->
colBytes
);
pSqlFuncExpr
->
colType
=
htons
(
pExpr
->
base
.
colType
);
pSqlFuncExpr
->
colBytes
=
htons
(
pExpr
->
base
.
colBytes
);
if
(
TSDB_COL_IS_UD_COL
(
pExpr
->
colInfo
.
flag
)
||
pExpr
->
colInfo
.
colId
==
TSDB_TBNAME_COLUMN_INDEX
)
{
pSqlFuncExpr
->
resType
=
htons
(
pExpr
->
resType
);
pSqlFuncExpr
->
resBytes
=
htons
(
pExpr
->
resBytes
);
}
else
if
(
pExpr
->
colInfo
.
colId
==
TSDB_BLOCK_DIST_COLUMN_INDEX
)
{
if
(
TSDB_COL_IS_UD_COL
(
pExpr
->
base
.
colInfo
.
flag
)
||
pExpr
->
base
.
colInfo
.
colId
==
TSDB_TBNAME_COLUMN_INDEX
)
{
pSqlFuncExpr
->
resType
=
htons
(
pExpr
->
base
.
resType
);
pSqlFuncExpr
->
resBytes
=
htons
(
pExpr
->
base
.
resBytes
);
}
else
if
(
pExpr
->
base
.
colInfo
.
colId
==
TSDB_BLOCK_DIST_COLUMN_INDEX
)
{
SSchema
s
=
tGetBlockDistColumnSchema
();
pSqlFuncExpr
->
resType
=
htons
(
s
.
type
);
pSqlFuncExpr
->
resBytes
=
htons
(
s
.
bytes
);
}
else
{
SSchema
*
s
=
tscGetColumnSchemaById
(
pTableMeta
,
pExpr
->
colInfo
.
colId
);
SSchema
*
s
=
tscGetColumnSchemaById
(
pTableMeta
,
pExpr
->
base
.
colInfo
.
colId
);
pSqlFuncExpr
->
resType
=
htons
(
s
->
type
);
pSqlFuncExpr
->
resBytes
=
htons
(
s
->
bytes
);
}
pSqlFuncExpr
->
functionId
=
htons
(
pExpr
->
functionId
);
pSqlFuncExpr
->
numOfParams
=
htons
(
pExpr
->
numOfParams
);
pSqlFuncExpr
->
resColId
=
htons
(
pExpr
->
resColId
);
pSqlFuncExpr
->
functionId
=
htons
(
pExpr
->
base
.
functionId
);
pSqlFuncExpr
->
numOfParams
=
htons
(
pExpr
->
base
.
numOfParams
);
pSqlFuncExpr
->
resColId
=
htons
(
pExpr
->
base
.
resColId
);
pMsg
+=
sizeof
(
SSqlExpr
);
for
(
int32_t
j
=
0
;
j
<
pExpr
->
numOfParams
;
++
j
)
{
// todo add log
pSqlFuncExpr
->
param
[
j
].
nType
=
htons
((
uint16_t
)
pExpr
->
param
[
j
].
nType
);
pSqlFuncExpr
->
param
[
j
].
nLen
=
htons
(
pExpr
->
param
[
j
].
nLen
);
for
(
int32_t
j
=
0
;
j
<
pExpr
->
base
.
numOfParams
;
++
j
)
{
// todo add log
pSqlFuncExpr
->
param
[
j
].
nType
=
htons
((
uint16_t
)
pExpr
->
base
.
param
[
j
].
nType
);
pSqlFuncExpr
->
param
[
j
].
nLen
=
htons
(
pExpr
->
base
.
param
[
j
].
nLen
);
if
(
pExpr
->
param
[
j
].
nType
==
TSDB_DATA_TYPE_BINARY
)
{
memcpy
(
pMsg
,
pExpr
->
param
[
j
].
pz
,
pExpr
->
param
[
j
].
nLen
);
pMsg
+=
pExpr
->
param
[
j
].
nLen
;
if
(
pExpr
->
base
.
param
[
j
].
nType
==
TSDB_DATA_TYPE_BINARY
)
{
memcpy
(
pMsg
,
pExpr
->
base
.
param
[
j
].
pz
,
pExpr
->
base
.
param
[
j
].
nLen
);
pMsg
+=
pExpr
->
base
.
param
[
j
].
nLen
;
}
else
{
pSqlFuncExpr
->
param
[
j
].
i64
=
htobe64
(
pExpr
->
param
[
j
].
i64
);
pSqlFuncExpr
->
param
[
j
].
i64
=
htobe64
(
pExpr
->
base
.
param
[
j
].
i64
);
}
}
...
...
@@ -888,10 +887,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
for
(
int32_t
i
=
0
;
i
<
output
;
++
i
)
{
SInternalField
*
pField
=
tscFieldInfoGetInternalField
(
&
pQueryInfo
->
fieldsInfo
,
i
);
S
SqlExpr
*
pExpr
=
pField
->
pSql
Expr
;
S
ExprInfo
*
pExpr
=
pField
->
p
Expr
;
// this should be switched to projection query
if
(
pExpr
!
=
NULL
)
{
if
(
pExpr
->
pExpr
=
=
NULL
)
{
// the queried table has been removed and a new table with the same name has already been created already
// return error msg
if
(
pExpr
->
uid
!=
pTableMeta
->
id
.
uid
)
{
...
...
@@ -899,25 +898,25 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return
TSDB_CODE_TSC_INVALID_TABLE_NAME
;
}
if
(
!
tscValidateColumnId
(
pTableMetaInfo
,
pExpr
->
colInfo
.
colId
,
pExpr
->
numOfParams
))
{
if
(
!
tscValidateColumnId
(
pTableMetaInfo
,
pExpr
->
base
.
colInfo
.
colId
,
pExpr
->
base
.
numOfParams
))
{
tscError
(
"%p table schema is not matched with parsed sql"
,
pSql
);
return
TSDB_CODE_TSC_INVALID_SQL
;
}
pExpr1
->
numOfParams
=
0
;
// no params for projection query
pExpr1
->
functionId
=
htons
(
TSDB_FUNC_PRJ
);
pExpr1
->
colInfo
.
colId
=
htons
(
pExpr
->
resColId
);
pExpr1
->
colInfo
.
colId
=
htons
(
pExpr
->
base
.
resColId
);
pExpr1
->
colInfo
.
flag
=
htons
(
TSDB_COL_NORMAL
);
pExpr1
->
colType
=
htons
(
pExpr
->
resType
);
pExpr1
->
colBytes
=
htons
(
pExpr
->
resBytes
);
pExpr1
->
colType
=
htons
(
pExpr
->
base
.
resType
);
pExpr1
->
colBytes
=
htons
(
pExpr
->
base
.
resBytes
);
bool
assign
=
false
;
for
(
int32_t
f
=
0
;
f
<
tscSqlExprNumOfExprs
(
pQueryInfo
);
++
f
)
{
S
SqlExpr
*
pe
=
tscSqlExprGet
(
pQueryInfo
,
f
);
S
ExprInfo
*
pe
=
tscSqlExprGet
(
pQueryInfo
,
f
);
if
(
pe
==
pExpr
)
{
pExpr1
->
colInfo
.
colIndex
=
htons
(
f
);
pExpr1
->
resType
=
htons
(
pe
->
resType
);
pExpr1
->
resBytes
=
htons
(
pe
->
resBytes
);
pExpr1
->
resType
=
htons
(
pe
->
base
.
resType
);
pExpr1
->
resBytes
=
htons
(
pe
->
base
.
resBytes
);
assign
=
true
;
break
;
}
...
...
@@ -927,8 +926,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg
+=
sizeof
(
SSqlExpr
);
pExpr1
=
(
SSqlExpr
*
)
pMsg
;
}
else
{
assert
(
pField
->
p
ArithExprInfo
!=
NULL
);
SExprInfo
*
pExprInfo
=
pField
->
p
ArithExprInfo
;
assert
(
pField
->
p
Expr
!=
NULL
);
SExprInfo
*
pExprInfo
=
pField
->
p
Expr
;
pExpr1
->
colInfo
.
colId
=
htons
(
pExprInfo
->
base
.
colInfo
.
colId
);
pExpr1
->
colType
=
htons
(
pExprInfo
->
base
.
colType
);
...
...
@@ -1050,21 +1049,21 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
// compressed ts block
pQueryMsg
->
tsOffset
=
htonl
((
int32_t
)(
pMsg
-
pCmd
->
payload
));
pQueryMsg
->
ts
Buf
.
ts
Offset
=
htonl
((
int32_t
)(
pMsg
-
pCmd
->
payload
));
if
(
pQueryInfo
->
tsBuf
!=
NULL
)
{
// note: here used the index instead of actual vnode id.
int32_t
vnodeIndex
=
pTableMetaInfo
->
vgroupIndex
;
int32_t
code
=
dumpFileBlockByGroupId
(
pQueryInfo
->
tsBuf
,
vnodeIndex
,
pMsg
,
&
pQueryMsg
->
ts
Len
,
&
pQueryMsg
->
tsNumOfBlocks
);
int32_t
code
=
dumpFileBlockByGroupId
(
pQueryInfo
->
tsBuf
,
vnodeIndex
,
pMsg
,
&
pQueryMsg
->
ts
Buf
.
tsLen
,
&
pQueryMsg
->
tsBuf
.
tsNumOfBlocks
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
pMsg
+=
pQueryMsg
->
tsLen
;
pMsg
+=
pQueryMsg
->
ts
Buf
.
ts
Len
;
pQueryMsg
->
tsOrder
=
htonl
(
pQueryInfo
->
tsBuf
->
tsOrder
);
pQueryMsg
->
ts
Len
=
htonl
(
pQueryMsg
->
tsLen
);
pQueryMsg
->
ts
NumOfBlocks
=
htonl
(
pQueryMsg
->
tsNumOfBlocks
);
pQueryMsg
->
ts
Buf
.
ts
Order
=
htonl
(
pQueryInfo
->
tsBuf
->
tsOrder
);
pQueryMsg
->
ts
Buf
.
tsLen
=
htonl
(
pQueryMsg
->
tsBuf
.
tsLen
);
pQueryMsg
->
ts
Buf
.
tsNumOfBlocks
=
htonl
(
pQueryMsg
->
tsBuf
.
tsNumOfBlocks
);
}
memcpy
(
pMsg
,
pSql
->
sqlstr
,
sqlLen
);
...
...
@@ -1383,7 +1382,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSchema
*
pSchema
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
// Reallocate the payload size
...
...
@@ -1472,7 +1471,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
int
tscEstimateAlterTableMsgLength
(
SSqlCmd
*
pCmd
)
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
return
minMsgSize
()
+
sizeof
(
SAlterTableMsg
)
+
sizeof
(
SSchema
)
*
tscNumOfFields
(
pQueryInfo
)
+
TSDB_EXTRA_PAYLOAD_SIZE
;
}
...
...
@@ -1481,7 +1480,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int
msgLen
=
0
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
...
...
@@ -1530,7 +1529,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
SUpdateTableTagValMsg
*
pUpdateMsg
=
(
SUpdateTableTagValMsg
*
)
pCmd
->
payload
;
pCmd
->
payloadLen
=
htonl
(
pUpdateMsg
->
head
.
contLen
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
STableMeta
*
pTableMeta
=
tscGetMetaInfo
(
pQueryInfo
,
0
)
->
pTableMeta
;
SNewVgroupInfo
vgroupInfo
=
{.
vgId
=
-
1
};
...
...
@@ -1566,7 +1565,7 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SRetrieveTableMsg
*
pRetrieveMsg
=
(
SRetrieveTableMsg
*
)
pCmd
->
payload
;
pRetrieveMsg
->
qid
=
htobe64
(
pSql
->
res
.
qid
);
pRetrieveMsg
->
free
=
htons
(
pQueryInfo
->
type
);
...
...
@@ -1590,7 +1589,7 @@ static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
pRes
->
row
=
0
;
pRes
->
rspType
=
1
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
if
(
tscCreateResPointerInfo
(
pRes
,
pQueryInfo
)
!=
TSDB_CODE_SUCCESS
)
{
return
pRes
->
code
;
}
...
...
@@ -1641,7 +1640,7 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
pRes
->
code
=
tscDoLocalMerge
(
pSql
);
if
(
pRes
->
code
==
TSDB_CODE_SUCCESS
&&
pRes
->
numOfRows
>
0
)
{
SQueryInfo
*
pQueryInfo
=
tscGetActiveQueryInfo
(
pCmd
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetActiveQueryInfo
(
pCmd
);
tscCreateResPointerInfo
(
pRes
,
pQueryInfo
);
tscSetResRawPtr
(
pRes
,
pQueryInfo
);
}
...
...
@@ -1695,7 +1694,7 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int
tscBuildTableMetaMsg
(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
STableInfoMsg
*
pInfoMsg
=
(
STableInfoMsg
*
)
pCmd
->
payload
;
...
...
@@ -1765,7 +1764,7 @@ int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
char
*
pMsg
=
pCmd
->
payload
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SSTableVgroupMsg
*
pStableVgroupMsg
=
(
SSTableVgroupMsg
*
)
pMsg
;
pStableVgroupMsg
->
numOfTables
=
htonl
(
pQueryInfo
->
numOfTables
);
...
...
@@ -2108,7 +2107,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
SSqlRes
*
pRes
=
&
pSql
->
res
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
...
...
@@ -2143,12 +2142,12 @@ int tscProcessShowRsp(SSqlObj *pSql) {
for
(
int16_t
i
=
0
;
i
<
pMetaMsg
->
numOfColumns
;
++
i
,
++
pSchema
)
{
index
.
columnIndex
=
i
;
tscColumnListInsert
(
pQueryInfo
->
colList
,
&
index
);
tscColumnListInsert
(
pQueryInfo
->
colList
,
&
index
,
&
pSchema
[
i
]
);
TAOS_FIELD
f
=
tscCreateField
(
pSchema
->
type
,
pSchema
->
name
,
pSchema
->
bytes
);
SInternalField
*
pInfo
=
tscFieldInfoAppend
(
pFieldInfo
,
&
f
);
pInfo
->
p
Sql
Expr
=
tscSqlExprAppend
(
pQueryInfo
,
TSDB_FUNC_TS_DUMMY
,
&
index
,
pInfo
->
pExpr
=
tscSqlExprAppend
(
pQueryInfo
,
TSDB_FUNC_TS_DUMMY
,
&
index
,
pTableSchema
[
i
].
type
,
pTableSchema
[
i
].
bytes
,
getNewResColId
(
pQueryInfo
),
pTableSchema
[
i
].
bytes
,
false
);
}
...
...
@@ -2167,7 +2166,7 @@ static void createHbObj(STscObj* pObj) {
pSql
->
fp
=
tscProcessHeartBeatRsp
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoS
(
&
pSql
->
cmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfoS
(
&
pSql
->
cmd
,
0
);
if
(
pQueryInfo
==
NULL
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
tfree
(
pSql
);
...
...
@@ -2324,7 +2323,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
pRes
->
completed
=
(
pRetrieve
->
completed
==
1
);
pRes
->
data
=
pRetrieve
->
data
;
SQueryInfo
*
pQueryInfo
=
tscGetActiveQueryInfo
(
pCmd
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetActiveQueryInfo
(
pCmd
);
if
(
tscCreateResPointerInfo
(
pRes
,
pQueryInfo
)
!=
TSDB_CODE_SUCCESS
)
{
return
pRes
->
code
;
}
...
...
@@ -2381,7 +2380,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
tscAddQueryInfo
(
&
pNew
->
cmd
);
SQueryInfo
*
pNewQueryInfo
=
tscGetQueryInfoS
(
&
pNew
->
cmd
,
0
);
SQuery
Node
Info
*
pNewQueryInfo
=
tscGetQueryInfoS
(
&
pNew
->
cmd
,
0
);
pNew
->
cmd
.
autoCreated
=
pSql
->
cmd
.
autoCreated
;
// create table if not exists
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
&
pNew
->
cmd
,
TSDB_DEFAULT_PAYLOAD_SIZE
+
pSql
->
cmd
.
payloadLen
))
{
...
...
@@ -2470,7 +2469,7 @@ int tscGetTableMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create
int
tscRenewTableMeta
(
SSqlObj
*
pSql
,
int32_t
tableIndex
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
tableIndex
);
char
name
[
TSDB_TABLE_FNAME_LEN
]
=
{
0
};
...
...
@@ -2494,7 +2493,7 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
}
static
bool
allVgroupInfoRetrieved
(
SSqlCmd
*
pCmd
,
int32_t
clauseIndex
)
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
clauseIndex
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
clauseIndex
);
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
numOfTables
;
++
i
)
{
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
i
);
if
(
pTableMetaInfo
->
vgroupList
==
NULL
)
{
...
...
@@ -2521,13 +2520,13 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
pNew
->
cmd
.
command
=
TSDB_SQL_STABLEVGROUP
;
// TODO TEST IT
SQueryInfo
*
pNewQueryInfo
=
tscGetQueryInfoS
(
&
pNew
->
cmd
,
0
);
SQuery
Node
Info
*
pNewQueryInfo
=
tscGetQueryInfoS
(
&
pNew
->
cmd
,
0
);
if
(
pNewQueryInfo
==
NULL
)
{
tscFreeSqlObj
(
pNew
);
return
code
;
}
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
clauseIndex
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
clauseIndex
);
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
numOfTables
;
++
i
)
{
STableMetaInfo
*
pMInfo
=
tscGetMetaInfo
(
pQueryInfo
,
i
);
STableMeta
*
pTableMeta
=
tscTableMetaDup
(
pMInfo
->
pTableMeta
);
...
...
src/client/src/tscSql.c
浏览文件 @
b6aa9da9
...
...
@@ -373,7 +373,7 @@ int taos_num_fields(TAOS_RES *res) {
if
(
pSql
==
NULL
||
pSql
->
signature
!=
pSql
)
return
0
;
int32_t
num
=
0
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
if
(
pQueryInfo
==
NULL
)
{
return
num
;
}
...
...
@@ -407,7 +407,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
SSqlObj
*
pSql
=
(
SSqlObj
*
)
res
;
if
(
pSql
==
NULL
||
pSql
->
signature
!=
pSql
)
return
0
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
if
(
pQueryInfo
==
NULL
)
{
return
NULL
;
}
...
...
@@ -558,7 +558,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) {
return
true
;
}
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
if
((
pQueryInfo
==
NULL
)
||
tscIsTwoStageSTableQuery
(
pQueryInfo
,
0
))
{
return
true
;
...
...
@@ -671,7 +671,7 @@ char *taos_get_client_info() { return version; }
static
void
tscKillSTableQuery
(
SSqlObj
*
pSql
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
if
(
!
tscIsTwoStageSTableQuery
(
pQueryInfo
,
0
))
{
return
;
...
...
@@ -722,7 +722,7 @@ void taos_stop_query(TAOS_RES *res) {
// set the error code for master pSqlObj firstly
pSql
->
res
.
code
=
TSDB_CODE_TSC_QUERY_CANCELLED
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
pCmd
->
clauseIndex
);
if
(
tscIsTwoStageSTableQuery
(
pQueryInfo
,
0
))
{
assert
(
pSql
->
rpcRid
<=
0
);
...
...
@@ -752,7 +752,7 @@ bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
return
true
;
}
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
if
(
pQueryInfo
==
NULL
)
{
return
true
;
}
...
...
@@ -932,7 +932,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t
int
code
=
TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH
;
char
*
str
=
(
char
*
)
tblNameList
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoS
(
pCmd
,
pCmd
->
clauseIndex
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfoS
(
pCmd
,
pCmd
->
clauseIndex
);
if
(
pQueryInfo
==
NULL
)
{
pSql
->
res
.
code
=
terrno
;
return
terrno
;
...
...
src/client/src/tscStream.c
浏览文件 @
b6aa9da9
...
...
@@ -35,10 +35,10 @@ static int64_t getDelayValueAfterTimewindowClosed(SSqlStream* pStream, int64_t l
return
taosGetTimestamp
(
pStream
->
precision
)
+
launchDelay
-
pStream
->
stime
-
1
;
}
static
bool
isProjectStream
(
SQueryInfo
*
pQueryInfo
)
{
static
bool
isProjectStream
(
SQuery
Node
Info
*
pQueryInfo
)
{
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
++
i
)
{
S
SqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
i
);
if
(
pExpr
->
functionId
!=
TSDB_FUNC_PRJ
)
{
S
ExprInfo
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
i
);
if
(
pExpr
->
base
.
functionId
!=
TSDB_FUNC_PRJ
)
{
return
false
;
}
}
...
...
@@ -89,7 +89,7 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) {
return
;
}
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
code
=
tscGetTableMeta
(
pSql
,
pTableMetaInfo
);
...
...
@@ -130,7 +130,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
pStream
->
numOfRes
=
0
;
// reset the numOfRes.
SSqlObj
*
pSql
=
pStream
->
pSql
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
tscDebug
(
"%p add into timer"
,
pSql
);
if
(
pStream
->
isProject
)
{
...
...
@@ -208,7 +208,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
static
void
tscStreamFillTimeGap
(
SSqlStream
*
pStream
,
TSKEY
ts
)
{
#if 0
SSqlObj * pSql = pStream->pSql;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQuery
Node
Info* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
if (pQueryInfo->fillType != TSDB_FILL_SET_VALUE && pQueryInfo->fillType != TSDB_FILL_NULL) {
return;
...
...
@@ -421,7 +421,7 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
int64_t
minIntervalTime
=
(
pStream
->
precision
==
TSDB_TIME_PRECISION_MICRO
)
?
tsMinIntervalTime
*
1000L
:
tsMinIntervalTime
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
if
(
!
pStream
->
isProject
&&
pQueryInfo
->
interval
.
interval
==
0
)
{
sprintf
(
pSql
->
cmd
.
payload
,
"the interval value is 0"
);
...
...
@@ -471,7 +471,7 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
}
static
int64_t
tscGetStreamStartTimestamp
(
SSqlObj
*
pSql
,
SSqlStream
*
pStream
,
int64_t
stime
)
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
&
pSql
->
cmd
,
0
);
if
(
pStream
->
isProject
)
{
// no data in table, flush all data till now to destination meter, 10sec delay
...
...
@@ -530,7 +530,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
return
;
}
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
STableComInfo
tinfo
=
tscGetTableInfo
(
pTableMetaInfo
->
pTableMeta
);
...
...
src/client/src/tscSub.c
浏览文件 @
b6aa9da9
...
...
@@ -284,7 +284,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
}
size_t
numOfTables
=
taosArrayGetSize
(
tables
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SArray
*
progress
=
taosArrayInit
(
numOfTables
,
sizeof
(
SSubscriptionProgress
));
for
(
size_t
i
=
0
;
i
<
numOfTables
;
i
++
)
{
STidTags
*
tt
=
taosArrayGet
(
tables
,
i
);
...
...
@@ -502,7 +502,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
SSqlRes
*
pRes
=
&
pSql
->
res
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
pCmd
,
pCmd
->
clauseIndex
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
SQuery
Node
Info
*
pQueryInfo
=
tscGetQueryInfo
(
pCmd
,
0
);
if
(
taosArrayGetSize
(
pSub
->
progress
)
>
0
)
{
// fix crash in single table subscription
size_t
size
=
taosArrayGetSize
(
pSub
->
progress
);
...
...
src/client/src/tscSubquery.c
浏览文件 @
b6aa9da9
此差异已折叠。
点击以展开。
src/client/src/tscUtil.c
浏览文件 @
b6aa9da9
此差异已折叠。
点击以展开。
src/inc/taosmsg.h
浏览文件 @
b6aa9da9
...
...
@@ -451,7 +451,7 @@ typedef struct SColumnInfo {
int16_t
numOfFilters
;
union
{
int64_t
placeholder
;
SColumnFilterInfo
*
filter
s
;
SColumnFilterInfo
*
filter
Info
;
};
}
SColumnInfo
;
...
...
@@ -466,6 +466,13 @@ typedef struct STimeWindow {
TSKEY
ekey
;
}
STimeWindow
;
typedef
struct
{
int32_t
tsOffset
;
// offset value in current msg body, NOTE: ts list is compressed
int32_t
tsLen
;
// total length of ts comp block
int32_t
tsNumOfBlocks
;
// ts comp block numbers
int32_t
tsOrder
;
// ts comp block order
}
STsBufInfo
;
typedef
struct
{
SMsgHead
head
;
char
version
[
TSDB_VERSION_LEN
];
...
...
@@ -492,10 +499,7 @@ typedef struct {
int16_t
fillType
;
// interpolate type
uint64_t
fillVal
;
// default value array list
int32_t
secondStageOutput
;
int32_t
tsOffset
;
// offset value in current msg body, NOTE: ts list is compressed
int32_t
tsLen
;
// total length of ts comp block
int32_t
tsNumOfBlocks
;
// ts comp block numbers
int32_t
tsOrder
;
// ts comp block order
STsBufInfo
tsBuf
;
// tsBuf info
int32_t
numOfTags
;
// number of tags columns involved
int32_t
sqlstrLen
;
// sql query string
int32_t
prevResultLen
;
// previous result length
...
...
src/query/inc/qExecutor.h
浏览文件 @
b6aa9da9
...
...
@@ -177,6 +177,8 @@ typedef struct SSDataBlock {
SDataBlockInfo
info
;
}
SSDataBlock
;
// The basic query information extracted from the SQueryNodeInfo tree to support the
// execution of query in a data node.
typedef
struct
SQuery
{
SLimitVal
limit
;
...
...
@@ -187,6 +189,10 @@ typedef struct SQuery {
bool
timeWindowInterpo
;
// if the time window start/end required interpolation
bool
queryBlockDist
;
// if query data block distribution
bool
stabledev
;
// super table stddev query
bool
tsCompQuery
;
// is tscomp query
bool
simpleAgg
;
bool
pointInterpQuery
;
// point interpolation query
bool
needReverseScan
;
// need reverse scan
int32_t
interBufSize
;
// intermediate buffer sizse
SOrderVal
order
;
...
...
@@ -199,7 +205,6 @@ typedef struct SQuery {
int16_t
precision
;
int16_t
numOfOutput
;
int16_t
fillType
;
int16_t
checkResultBuf
;
// check if the buffer is full during scan each block
int32_t
srcRowSize
;
// todo extract struct
int32_t
resultRowSize
;
...
...
@@ -438,9 +443,13 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t nu
SSqlGroupbyExpr
*
createGroupbyExprFromMsg
(
SQueryTableMsg
*
pQueryMsg
,
SColIndex
*
pColIndex
,
int32_t
*
code
);
SQInfo
*
createQInfoImpl
(
SQueryTableMsg
*
pQueryMsg
,
SSqlGroupbyExpr
*
pGroupbyExpr
,
SExprInfo
*
pExprs
,
SExprInfo
*
pSecExprs
,
STableGroupInfo
*
pTableGroupInfo
,
SColumnInfo
*
pTagCols
,
bool
stableQuery
,
char
*
sql
,
uint64_t
*
qId
);
int32_t
initQInfo
(
SQueryTableMsg
*
pQueryMsg
,
void
*
tsdb
,
int32_t
vgId
,
SQInfo
*
pQInfo
,
SQueryParam
*
param
,
bool
isSTable
);
int32_t
initQInfo
(
STsBufInfo
*
pTsBufInfo
,
void
*
tsdb
,
int32_t
vgId
,
SQInfo
*
pQInfo
,
SQueryParam
*
param
,
char
*
start
,
int32_t
prevResultLen
,
bool
isSTable
);
void
freeColumnFilterInfo
(
SColumnFilterInfo
*
pFilter
,
int32_t
numOfFilters
);
STableQueryInfo
*
createTableQueryInfo
(
SQuery
*
pQuery
,
void
*
pTable
,
bool
groupbyColumn
,
STimeWindow
win
,
void
*
buf
);
bool
isQueryKilled
(
SQInfo
*
pQInfo
);
int32_t
checkForQueryBuf
(
size_t
numOfTables
);
bool
doBuildResCheck
(
SQInfo
*
pQInfo
);
...
...
src/query/src/qExecutor.c
浏览文件 @
b6aa9da9
...
...
@@ -163,7 +163,7 @@ static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo);
static
void
setTableScanFilterOperatorInfo
(
STableScanInfo
*
pTableScanInfo
,
SOperatorInfo
*
pDownstream
);
static
int32_t
getNumOfScanTimes
(
SQuery
*
pQuery
);
static
bool
isFixedOutputQuery
(
SQuery
*
pQuery
);
//
static bool isFixedOutputQuery(SQuery* pQuery);
static
SOperatorInfo
*
createDataBlocksOptScanInfo
(
void
*
pTsdbQueryHandle
,
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
repeatTime
,
int32_t
reverseTime
);
static
SOperatorInfo
*
createTableScanOperator
(
void
*
pTsdbQueryHandle
,
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
repeatTime
);
...
...
@@ -1790,8 +1790,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
if
(
pQuery
->
pExpr2
!=
NULL
)
{
pRuntimeEnv
->
proot
=
createArithOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQuery
->
pExpr2
,
pQuery
->
numOfExpr2
);
}
}
else
if
(
isFixedOutputQuery
(
pQuery
)
)
{
if
(
pQuery
->
stableQuery
&&
!
isTsCompQuery
(
pQuery
)
)
{
}
else
if
(
pQuery
->
simpleAgg
)
{
if
(
pQuery
->
stableQuery
&&
!
pQuery
->
tsCompQuery
)
{
pRuntimeEnv
->
proot
=
createMultiTableAggOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
pTableScanner
,
pQuery
->
pExpr1
,
pQuery
->
numOfOutput
);
}
else
{
...
...
@@ -1805,7 +1805,6 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pRuntimeEnv
->
proot
=
createArithOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
proot
,
pQuery
->
pExpr2
,
pQuery
->
numOfExpr2
);
}
}
else
{
// diff/add/multiply/subtract/division
assert
(
pQuery
->
checkResultBuf
==
1
);
if
(
!
onlyQueryTags
(
pQuery
))
{
pRuntimeEnv
->
proot
=
createArithOperatorInfo
(
pRuntimeEnv
,
pRuntimeEnv
->
pTableScanner
,
pQuery
->
pExpr1
,
pQuery
->
numOfOutput
);
...
...
@@ -1905,36 +1904,30 @@ bool isQueryKilled(SQInfo *pQInfo) {
void
setQueryKilled
(
SQInfo
*
pQInfo
)
{
pQInfo
->
code
=
TSDB_CODE_TSC_QUERY_CANCELLED
;}
static
bool
isFixedOutputQuery
(
SQuery
*
pQuery
)
{
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
{
return
false
;
}
// Note:top/bottom query is fixed output query
if
(
pQuery
->
topBotQuery
||
pQuery
->
groupbyColumn
||
isTsCompQuery
(
pQuery
))
{
return
true
;
}
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
SSqlExpr
*
pExpr
=
&
pQuery
->
pExpr1
[
i
].
base
;
// ignore the ts_comp function
if
(
i
==
0
&&
pExpr
->
functionId
==
TSDB_FUNC_PRJ
&&
pExpr
->
numOfParams
==
1
&&
pExpr
->
colInfo
.
colIndex
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
)
{
continue
;
}
if
(
pExpr
->
functionId
==
TSDB_FUNC_TS
||
pExpr
->
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
continue
;
}
if
(
!
IS_MULTIOUTPUT
(
aAggs
[
pExpr
->
functionId
].
status
))
{
return
true
;
}
}
return
false
;
}
//static bool isFixedOutputQuery(SQuery* pQuery) {
// if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
// return false;
// }
//
// // Note:top/bottom query is fixed output query
// if (pQuery->topBotQuery || pQuery->groupbyColumn || pQuery->tsCompQuery) {
// return true;
// }
//
// for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
// SSqlExpr *pExpr = &pQuery->pExpr1[i].base;
//
// if (pExpr->functionId == TSDB_FUNC_TS || pExpr->functionId == TSDB_FUNC_TS_DUMMY) {
// continue;
// }
//
// if (!IS_MULTIOUTPUT(aAggs[pExpr->functionId].status)) {
// return true;
// }
// }
//
// return false;
//}
// todo refactor with isLastRowQuery
bool
isPointInterpoQuery
(
SQuery
*
pQuery
)
{
...
...
@@ -2040,29 +2033,6 @@ void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int6
}
}
static
void
setScanLimitationByResultBuffer
(
SQuery
*
pQuery
)
{
if
(
isTopBottomQuery
(
pQuery
))
{
pQuery
->
checkResultBuf
=
0
;
}
else
if
(
isGroupbyColumn
(
pQuery
->
pGroupbyExpr
))
{
pQuery
->
checkResultBuf
=
0
;
}
else
{
bool
hasMultioutput
=
false
;
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
SSqlExpr
*
pExpr
=
&
pQuery
->
pExpr1
[
i
].
base
;
if
(
pExpr
->
functionId
==
TSDB_FUNC_TS
||
pExpr
->
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
continue
;
}
hasMultioutput
=
IS_MULTIOUTPUT
(
aAggs
[
pExpr
->
functionId
].
status
);
if
(
!
hasMultioutput
)
{
break
;
}
}
pQuery
->
checkResultBuf
=
hasMultioutput
?
1
:
0
;
}
}
/*
* todo add more parameters to check soon..
*/
...
...
@@ -3175,7 +3145,7 @@ static bool hasMainOutput(SQuery *pQuery) {
return
false
;
}
static
STableQueryInfo
*
createTableQueryInfo
(
SQuery
*
pQuery
,
void
*
pTable
,
bool
groupbyColumn
,
STimeWindow
win
,
void
*
buf
)
{
STableQueryInfo
*
createTableQueryInfo
(
SQuery
*
pQuery
,
void
*
pTable
,
bool
groupbyColumn
,
STimeWindow
win
,
void
*
buf
)
{
STableQueryInfo
*
pTableQueryInfo
=
buf
;
pTableQueryInfo
->
win
=
win
;
...
...
@@ -3893,8 +3863,8 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
&&
(
pRuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
==
1
)
&&
(
cond
.
order
==
TSDB_ORDER_ASC
)
&&
(
!
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
&&
(
!
isGroupbyColumn
(
pQuery
->
pGroupbyExpr
)
)
&&
(
!
isFixedOutputQuery
(
pQuery
)
)
&&
(
!
pQuery
->
groupbyColumn
)
&&
(
!
pQuery
->
simpleAgg
)
)
{
SArray
*
pa
=
GET_TABLEGROUP
(
pRuntimeEnv
,
0
);
STableQueryInfo
*
pCheckInfo
=
taosArrayGetP
(
pa
,
0
);
...
...
@@ -3971,11 +3941,11 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
pRuntimeEnv
->
prevResult
=
prevResult
;
pRuntimeEnv
->
qinfo
=
pQInfo
;
setScanLimitationByResultBuffer
(
pQuery
);
int32_t
code
=
setupQueryHandle
(
tsdb
,
pQInfo
,
isSTableQuery
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
if
(
tsdb
!=
NULL
)
{
int32_t
code
=
setupQueryHandle
(
tsdb
,
pQInfo
,
isSTableQuery
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
}
pQuery
->
tsdb
=
tsdb
;
...
...
@@ -4013,7 +3983,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
getIntermediateBufInfo
(
pRuntimeEnv
,
&
ps
,
&
pQuery
->
intermediateResultRowSize
);
int32_t
TENMB
=
1024
*
1024
*
10
;
code
=
createDiskbasedResultBuffer
(
&
pRuntimeEnv
->
pResultBuf
,
ps
,
TENMB
,
pQInfo
);
int32_t
code
=
createDiskbasedResultBuffer
(
&
pRuntimeEnv
->
pResultBuf
,
ps
,
TENMB
,
pQInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
@@ -4265,6 +4235,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"TableScanOperator"
;
pOperator
->
operatorType
=
OP_TableScan
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
...
...
@@ -5498,10 +5469,10 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
pQueryMsg
->
numOfOutput
=
htons
(
pQueryMsg
->
numOfOutput
);
pQueryMsg
->
numOfGroupCols
=
htons
(
pQueryMsg
->
numOfGroupCols
);
pQueryMsg
->
tagCondLen
=
htons
(
pQueryMsg
->
tagCondLen
);
pQueryMsg
->
ts
Offset
=
htonl
(
pQueryMsg
->
tsOffset
);
pQueryMsg
->
ts
Len
=
htonl
(
pQueryMsg
->
tsLen
);
pQueryMsg
->
ts
NumOfBlocks
=
htonl
(
pQueryMsg
->
tsNumOfBlocks
);
pQueryMsg
->
ts
Order
=
htonl
(
pQueryMsg
->
tsOrder
);
pQueryMsg
->
ts
Buf
.
tsOffset
=
htonl
(
pQueryMsg
->
tsBuf
.
tsOffset
);
pQueryMsg
->
ts
Buf
.
tsLen
=
htonl
(
pQueryMsg
->
tsBuf
.
tsLen
);
pQueryMsg
->
ts
Buf
.
tsNumOfBlocks
=
htonl
(
pQueryMsg
->
tsBuf
.
tsNumOfBlocks
);
pQueryMsg
->
ts
Buf
.
tsOrder
=
htonl
(
pQueryMsg
->
tsBuf
.
tsOrder
);
pQueryMsg
->
numOfTags
=
htonl
(
pQueryMsg
->
numOfTags
);
pQueryMsg
->
tbnameCondLen
=
htonl
(
pQueryMsg
->
tbnameCondLen
);
pQueryMsg
->
secondStageOutput
=
htonl
(
pQueryMsg
->
secondStageOutput
);
...
...
@@ -5533,8 +5504,8 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
int32_t
numOfFilters
=
pColInfo
->
numOfFilters
;
if
(
numOfFilters
>
0
)
{
pColInfo
->
filter
s
=
calloc
(
numOfFilters
,
sizeof
(
SColumnFilterInfo
));
if
(
pColInfo
->
filter
s
==
NULL
)
{
pColInfo
->
filter
Info
=
calloc
(
numOfFilters
,
sizeof
(
SColumnFilterInfo
));
if
(
pColInfo
->
filter
Info
==
NULL
)
{
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
goto
_cleanup
;
}
...
...
@@ -5543,7 +5514,7 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
for
(
int32_t
f
=
0
;
f
<
numOfFilters
;
++
f
)
{
SColumnFilterInfo
*
pFilterMsg
=
(
SColumnFilterInfo
*
)
pMsg
;
SColumnFilterInfo
*
pColFilter
=
&
pColInfo
->
filter
s
[
f
];
SColumnFilterInfo
*
pColFilter
=
&
pColInfo
->
filter
Info
[
f
];
pColFilter
->
filterstr
=
htons
(
pFilterMsg
->
filterstr
);
pMsg
+=
sizeof
(
SColumnFilterInfo
);
...
...
@@ -5756,8 +5727,8 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
}
//skip ts buf
if
((
pQueryMsg
->
ts
Offset
+
pQueryMsg
->
tsLen
)
>
0
)
{
pMsg
=
(
char
*
)
pQueryMsg
+
pQueryMsg
->
ts
Offset
+
pQueryMsg
->
tsLen
;
if
((
pQueryMsg
->
ts
Buf
.
tsOffset
+
pQueryMsg
->
tsBuf
.
tsLen
)
>
0
)
{
pMsg
=
(
char
*
)
pQueryMsg
+
pQueryMsg
->
ts
Buf
.
tsOffset
+
pQueryMsg
->
tsBuf
.
tsLen
;
}
param
->
sql
=
strndup
(
pMsg
,
pQueryMsg
->
sqlstrLen
);
...
...
@@ -5772,7 +5743,7 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
"outputCols:%d, numOfCols:%d, interval:%"
PRId64
", fillType:%d, comptsLen:%d, compNumOfBlocks:%d, limit:%"
PRId64
", offset:%"
PRId64
,
pQueryMsg
,
pQueryMsg
->
numOfTables
,
pQueryMsg
->
queryType
,
pQueryMsg
->
window
.
skey
,
pQueryMsg
->
window
.
ekey
,
pQueryMsg
->
numOfGroupCols
,
pQueryMsg
->
order
,
pQueryMsg
->
numOfOutput
,
pQueryMsg
->
numOfCols
,
pQueryMsg
->
interval
.
interval
,
pQueryMsg
->
fillType
,
pQueryMsg
->
ts
Len
,
pQueryMsg
->
tsNumOfBlocks
,
pQueryMsg
->
limit
,
pQueryMsg
->
offset
);
pQueryMsg
->
fillType
,
pQueryMsg
->
ts
Buf
.
tsLen
,
pQueryMsg
->
tsBuf
.
tsNumOfBlocks
,
pQueryMsg
->
limit
,
pQueryMsg
->
offset
);
qDebug
(
"qmsg:%p, sql:%s"
,
pQueryMsg
,
param
->
sql
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -6030,7 +6001,7 @@ static int32_t createFilterInfo(void *pQInfo, SQuery *pQuery) {
for
(
int32_t
f
=
0
;
f
<
pFilterInfo
->
numOfFilters
;
++
f
)
{
SColumnFilterElem
*
pSingleColFilter
=
&
pFilterInfo
->
pFilters
[
f
];
pSingleColFilter
->
filterInfo
=
pQuery
->
colList
[
i
].
filter
s
[
f
];
pSingleColFilter
->
filterInfo
=
pQuery
->
colList
[
i
].
filter
Info
[
f
];
int32_t
lower
=
pSingleColFilter
->
filterInfo
.
lowerRelOptr
;
int32_t
upper
=
pSingleColFilter
->
filterInfo
.
upperRelOptr
;
...
...
@@ -6166,7 +6137,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
pQuery
->
maxSrcColumnSize
=
0
;
for
(
int16_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
pQuery
->
colList
[
i
]
=
pQueryMsg
->
colList
[
i
];
pQuery
->
colList
[
i
].
filter
s
=
tFilterInfoDup
(
pQueryMsg
->
colList
[
i
].
filters
,
pQuery
->
colList
[
i
].
numOfFilters
);
pQuery
->
colList
[
i
].
filter
Info
=
tFilterInfoDup
(
pQueryMsg
->
colList
[
i
].
filterInfo
,
pQuery
->
colList
[
i
].
numOfFilters
);
pQuery
->
srcRowSize
+=
pQuery
->
colList
[
i
].
bytes
;
if
(
pQuery
->
maxSrcColumnSize
<
pQuery
->
colList
[
i
].
bytes
)
{
...
...
@@ -6309,29 +6280,31 @@ bool isValidQInfo(void *param) {
return
(
sig
==
(
uint64_t
)
pQInfo
);
}
int32_t
initQInfo
(
SQueryTableMsg
*
pQueryMsg
,
void
*
tsdb
,
int32_t
vgId
,
SQInfo
*
pQInfo
,
SQueryParam
*
param
,
bool
isSTable
)
{
int32_t
initQInfo
(
STsBufInfo
*
pTsBufInfo
,
void
*
tsdb
,
int32_t
vgId
,
SQInfo
*
pQInfo
,
SQueryParam
*
param
,
char
*
start
,
int32_t
prevResultLen
,
bool
isSTable
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
STSBuf
*
pTsBuf
=
NULL
;
if
(
p
QueryMsg
->
tsLen
>
0
)
{
// open new file to save the result
char
*
tsBlock
=
(
char
*
)
pQueryMsg
+
pQueryMsg
->
tsOffset
;
pTsBuf
=
tsBufCreateFromCompBlocks
(
tsBlock
,
p
QueryMsg
->
tsNumOfBlocks
,
pQueryMsg
->
tsLen
,
pQueryMsg
->
tsOrder
,
vgId
);
if
(
p
TsBufInfo
->
tsLen
>
0
)
{
// open new file to save the result
char
*
tsBlock
=
start
+
pTsBufInfo
->
tsOffset
;
pTsBuf
=
tsBufCreateFromCompBlocks
(
tsBlock
,
p
TsBufInfo
->
tsNumOfBlocks
,
pTsBufInfo
->
tsLen
,
pTsBufInfo
->
tsOrder
,
vgId
);
tsBufResetPos
(
pTsBuf
);
bool
ret
=
tsBufNextPos
(
pTsBuf
);
UNUSED
(
ret
);
}
SArray
*
prevResult
=
NULL
;
if
(
p
QueryMsg
->
p
revResultLen
>
0
)
{
prevResult
=
interResFromBinary
(
param
->
prevResult
,
p
QueryMsg
->
p
revResultLen
);
if
(
prevResultLen
>
0
)
{
prevResult
=
interResFromBinary
(
param
->
prevResult
,
prevResultLen
);
}
pQuery
->
precision
=
tsdbGetCfg
(
tsdb
)
->
precision
;
if
(
tsdb
!=
NULL
)
{
pQuery
->
precision
=
tsdbGetCfg
(
tsdb
)
->
precision
;
}
if
((
QUERY_IS_ASC_QUERY
(
pQuery
)
&&
(
pQuery
->
window
.
skey
>
pQuery
->
window
.
ekey
))
||
(
!
QUERY_IS_ASC_QUERY
(
pQuery
)
&&
(
pQuery
->
window
.
ekey
>
pQuery
->
window
.
skey
)))
{
...
...
@@ -6449,7 +6422,7 @@ void freeQInfo(SQInfo *pQInfo) {
if
(
pQuery
->
colList
!=
NULL
)
{
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfCols
;
i
++
)
{
SColumnInfo
*
column
=
pQuery
->
colList
+
i
;
freeColumnFilterInfo
(
column
->
filter
s
,
column
->
numOfFilters
);
freeColumnFilterInfo
(
column
->
filter
Info
,
column
->
numOfFilters
);
}
tfree
(
pQuery
->
colList
);
}
...
...
src/query/src/qPlan.c
浏览文件 @
b6aa9da9
#include "os.h"
#include "tsclient.h"
#include "qUtil.h"
#include "texpr.h"
UNUSED_FUNC
SArray
*
createTableScanPlan
(
SQuery
*
pQuery
)
{
SArray
*
plan
=
taosArrayInit
(
4
,
sizeof
(
int32_t
));
int32_t
op
=
0
;
if
(
onlyQueryTags
(
pQuery
))
{
// op = OP_TagScan;
}
else
if
(
pQuery
->
queryBlockDist
)
{
op
=
OP_TableBlockInfoScan
;
}
else
if
(
pQuery
->
tsCompQuery
||
pQuery
->
pointInterpQuery
)
{
op
=
OP_TableSeqScan
;
}
else
if
(
pQuery
->
needReverseScan
)
{
op
=
OP_DataBlocksOptScan
;
}
else
{
op
=
OP_TableScan
;
}
taosArrayPush
(
plan
,
&
op
);
return
plan
;
}
UNUSED_FUNC
SArray
*
createExecOperatorPlan
(
SQuery
*
pQuery
)
{
SArray
*
plan
=
taosArrayInit
(
4
,
sizeof
(
int32_t
));
int32_t
op
=
0
;
if
(
onlyQueryTags
(
pQuery
))
{
// do nothing for tags query
op
=
OP_TagScan
;
taosArrayPush
(
plan
,
&
op
);
}
else
if
(
pQuery
->
interval
.
interval
>
0
)
{
if
(
pQuery
->
stableQuery
)
{
op
=
OP_MultiTableTimeInterval
;
taosArrayPush
(
plan
,
&
op
);
}
else
{
op
=
OP_TimeWindow
;
taosArrayPush
(
plan
,
&
op
);
if
(
pQuery
->
pExpr2
!=
NULL
)
{
op
=
OP_Arithmetic
;
taosArrayPush
(
plan
,
&
op
);
}
if
(
pQuery
->
fillType
!=
TSDB_FILL_NONE
&&
(
!
pQuery
->
pointInterpQuery
))
{
op
=
OP_Fill
;
taosArrayPush
(
plan
,
&
op
);
}
}
}
else
if
(
pQuery
->
groupbyColumn
)
{
op
=
OP_Groupby
;
taosArrayPush
(
plan
,
&
op
);
if
(
pQuery
->
pExpr2
!=
NULL
)
{
op
=
OP_Arithmetic
;
taosArrayPush
(
plan
,
&
op
);
}
}
else
if
(
pQuery
->
sw
.
gap
>
0
)
{
op
=
OP_SessionWindow
;
taosArrayPush
(
plan
,
&
op
);
if
(
pQuery
->
pExpr2
!=
NULL
)
{
op
=
OP_Arithmetic
;
taosArrayPush
(
plan
,
&
op
);
}
}
else
if
(
pQuery
->
simpleAgg
)
{
if
(
pQuery
->
stableQuery
&&
!
pQuery
->
tsCompQuery
)
{
op
=
OP_MultiTableAggregate
;
}
else
{
op
=
OP_Aggregate
;
}
taosArrayPush
(
plan
,
&
op
);
if
(
pQuery
->
pExpr2
!=
NULL
&&
!
pQuery
->
stableQuery
)
{
op
=
OP_Arithmetic
;
taosArrayPush
(
plan
,
&
op
);
}
}
else
{
// diff/add/multiply/subtract/division
op
=
OP_Arithmetic
;
taosArrayPush
(
plan
,
&
op
);
}
if
(
pQuery
->
limit
.
offset
>
0
)
{
op
=
OP_Offset
;
taosArrayPush
(
plan
,
&
op
);
}
if
(
pQuery
->
limit
.
limit
>
0
)
{
op
=
OP_Limit
;
taosArrayPush
(
plan
,
&
op
);
}
return
plan
;
}
src/query/src/queryMain.c
浏览文件 @
b6aa9da9
...
...
@@ -170,7 +170,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
goto
_over
;
}
code
=
initQInfo
(
pQueryMsg
,
tsdb
,
vgId
,
*
pQInfo
,
&
param
,
isSTableQuery
);
code
=
initQInfo
(
&
pQueryMsg
->
tsBuf
,
tsdb
,
vgId
,
*
pQInfo
,
&
param
,
(
char
*
)
pQueryMsg
,
pQueryMsg
->
prevResultLen
,
isSTableQuery
);
_over:
if
(
param
.
pGroupbyExpr
!=
NULL
)
{
...
...
@@ -184,7 +184,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
for
(
int32_t
i
=
0
;
i
<
pQueryMsg
->
numOfCols
;
i
++
)
{
SColumnInfo
*
column
=
pQueryMsg
->
colList
+
i
;
freeColumnFilterInfo
(
column
->
filter
s
,
column
->
numOfFilters
);
freeColumnFilterInfo
(
column
->
filter
Info
,
column
->
numOfFilters
);
}
//pQInfo already freed in initQInfo, but *pQInfo may not pointer to null;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录