Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
7358e65d
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
7358e65d
编写于
1月 06, 2020
作者:
H
hjxilinx
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix bugs during union all processing
上级
dd41425f
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
229 addition
and
247 deletion
+229
-247
src/client/inc/tscSecondaryMerge.h
src/client/inc/tscSecondaryMerge.h
+1
-1
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+2
-2
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+5
-3
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+5
-6
src/client/src/tscJoinProcess.c
src/client/src/tscJoinProcess.c
+6
-3
src/client/src/tscPrepare.c
src/client/src/tscPrepare.c
+2
-0
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+1
-5
src/client/src/tscSecondaryMerge.c
src/client/src/tscSecondaryMerge.c
+34
-38
src/client/src/tscServer.c
src/client/src/tscServer.c
+12
-12
src/client/src/tscSql.c
src/client/src/tscSql.c
+144
-160
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+17
-17
未找到文件。
src/client/inc/tscSecondaryMerge.h
浏览文件 @
7358e65d
...
...
@@ -120,7 +120,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
void
tscDestroyLocalReducer
(
SSqlObj
*
pSql
);
int32_t
tsc
LocalDoR
educe
(
SSqlObj
*
pSql
);
int32_t
tsc
DoLocalr
educe
(
SSqlObj
*
pSql
);
#ifdef __cplusplus
}
...
...
src/client/inc/tscUtil.h
浏览文件 @
7358e65d
...
...
@@ -187,7 +187,7 @@ SMeterMetaInfo* tscGetMeterMetaInfoFromQueryInfo(SQueryInfo *pQueryInfo, int32_t
SQueryInfo
*
tscGetQueryInfoDetail
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
);
int32_t
tscGetQueryInfoDetailSafely
(
SSqlCmd
*
pCmd
,
int32_t
subClauseIndex
,
SQueryInfo
**
pQueryInfo
);
SMeterMetaInfo
*
tscGetMeterMetaInfoByUid
(
SQueryInfo
*
pQueryInfo
,
int32_t
subClauseIndex
,
uint64_t
uid
,
int32_t
*
index
);
SMeterMetaInfo
*
tscGetMeterMetaInfoByUid
(
SQueryInfo
*
pQueryInfo
,
uint64_t
uid
,
int32_t
*
index
);
void
tscClearMeterMetaInfo
(
SMeterMetaInfo
*
pMeterMetaInfo
,
bool
removeFromCache
);
SMeterMetaInfo
*
tscAddMeterMetaInfo
(
SQueryInfo
*
pQueryInfo
,
const
char
*
name
,
SMeterMeta
*
pMeterMeta
,
SMetricMeta
*
pMetricMeta
,
...
...
@@ -197,7 +197,7 @@ int32_t tscAddSubqueryInfo(SSqlCmd *pCmd);
void
tscFreeSubqueryInfo
(
SSqlCmd
*
pCmd
);
void
tscClearSubqueryInfo
(
SSqlCmd
*
pCmd
);
void
tscGetMetricMetaCacheKey
(
S
SqlCmd
*
pCmd
,
int32_t
subClauseIndex
,
char
*
keyStr
,
uint64_t
uid
);
void
tscGetMetricMetaCacheKey
(
S
QueryInfo
*
pQueryInfo
,
char
*
keyStr
,
uint64_t
uid
);
int
tscGetMetricMeta
(
SSqlObj
*
pSql
,
int32_t
clauseIndex
);
int
tscGetMeterMeta
(
SSqlObj
*
pSql
,
SMeterMetaInfo
*
pMeterMetaInfo
);
int
tscGetMeterMetaEx
(
SSqlObj
*
pSql
,
SMeterMetaInfo
*
pMeterMetaInfo
,
bool
createIfNotExists
);
...
...
src/client/inc/tsclient.h
浏览文件 @
7358e65d
...
...
@@ -273,8 +273,10 @@ struct STSBuf;
typedef
struct
{
uint8_t
code
;
int
numOfRows
;
// num of results in current retrieved
int
numOfTotal
;
// num of total results
int64_t
numOfRows
;
// num of results in current retrieved
int64_t
numOfTotal
;
// num of total results
int64_t
numOfTotalInCurrentClause
;
// num of total result in current subclause
char
*
pRsp
;
int
rspType
;
int
rspLen
;
...
...
@@ -431,7 +433,7 @@ void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql);
void
tscKillMetricQuery
(
SSqlObj
*
pSql
);
void
tscInitResObjForLocalQuery
(
SSqlObj
*
pObj
,
int32_t
numOfRes
,
int32_t
rowLen
);
bool
tscIsUpdateQuery
(
STscObj
*
pObj
);
bool
tscHasReachLimitation
(
S
SqlObj
*
pSql
);
bool
tscHasReachLimitation
(
S
QueryInfo
*
pQueryInfo
,
SSqlRes
*
pRes
);
char
*
tscGetErrorMsgPayload
(
SSqlCmd
*
pCmd
);
...
...
src/client/src/tscAsync.c
浏览文件 @
7358e65d
...
...
@@ -118,7 +118,7 @@ static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOf
SSqlRes
*
pRes
=
&
pSql
->
res
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
// sequentially retrieve data from remain vnodes first, query vnode specified by vnodeIdx
if
(
numOfRows
==
0
&&
tscProjectionQueryOnSTable
(
pQueryInfo
,
0
))
{
...
...
@@ -285,7 +285,7 @@ void tscProcessAsyncRetrieve(void *param, TAOS_RES *tres, int numOfRows) {
}
/* update the limit value according to current retrieval results */
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
pQueryInfo
->
limit
.
limit
=
pCmd
->
globalLimit
-
pRes
->
numOfTotal
;
if
((
++
pMeterMetaInfo
->
vnodeIndex
)
<=
pMeterMetaInfo
->
pMetricMeta
->
numOfVnodes
)
{
...
...
@@ -312,9 +312,8 @@ void tscProcessFetchRow(SSchedMsg *pMsg) {
SSqlObj
*
pSql
=
(
SSqlObj
*
)
pMsg
->
ahandle
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
assert
(
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
assert
(
pCmd
->
numOfCols
==
pQueryInfo
->
fieldsInfo
.
numOfOutputCols
);
for
(
int
i
=
0
;
i
<
pCmd
->
numOfCols
;
++
i
)
...
...
@@ -497,7 +496,7 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
if
(
pSql
->
pStream
==
NULL
)
{
// check if it is a sub-query of metric query first, if true, enter another routine
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
if
((
pQueryInfo
->
type
&
TSDB_QUERY_TYPE_STABLE_SUBQUERY
)
==
TSDB_QUERY_TYPE_STABLE_SUBQUERY
)
{
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfoFromQueryInfo
(
pQueryInfo
,
0
);
...
...
@@ -552,7 +551,7 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
* transfer the sql function for metric query before get meter/metric meta,
* since in callback functions, only tscProcessSql(pStream->pSql) is executed!
*/
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
tscTansformSQLFunctionForSTableQuery
(
pQueryInfo
);
tscIncStreamExecutionCount
(
pSql
->
pStream
);
...
...
src/client/src/tscJoinProcess.c
浏览文件 @
7358e65d
...
...
@@ -441,7 +441,10 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
taos_fetch_rows_a
(
tres
,
joinRetrieveCallback
,
param
);
}
else
if
(
numOfRows
==
0
)
{
// no data from this vnode anymore
if
(
tscProjectionQueryOnSTable
(
pQueryInfo
,
0
))
{
SQueryInfo
*
pParentQueryInfo
=
tscGetQueryInfoDetail
(
&
pParentSql
->
cmd
,
pParentSql
->
cmd
.
clauseIndex
);
//todo refactor
if
(
tscProjectionQueryOnSTable
(
pParentQueryInfo
,
0
))
{
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfoFromQueryInfo
(
pQueryInfo
,
0
);
assert
(
pQueryInfo
->
numOfTables
==
1
);
...
...
@@ -548,11 +551,11 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
if
(
tscProjectionQueryOnSTable
(
pQueryInfo
,
0
))
{
if
(
pRes
->
row
>=
pRes
->
numOfRows
&&
pMeterMetaInfo
->
vnodeIndex
<
pMeterMetaInfo
->
pMetricMeta
->
numOfVnodes
&&
(
!
tscHasReachLimitation
(
p
Sql
->
pSubs
[
i
]
)))
{
(
!
tscHasReachLimitation
(
p
QueryInfo
,
pRes
)))
{
numOfFetch
++
;
}
}
else
{
if
(
pRes
->
row
>=
pRes
->
numOfRows
&&
(
!
tscHasReachLimitation
(
p
Sql
->
pSubs
[
i
]
)))
{
if
(
pRes
->
row
>=
pRes
->
numOfRows
&&
(
!
tscHasReachLimitation
(
p
QueryInfo
,
pRes
)))
{
numOfFetch
++
;
}
}
...
...
src/client/src/tscPrepare.c
浏览文件 @
7358e65d
...
...
@@ -448,6 +448,8 @@ static int insertStmtExecute(STscStmt* stmt) {
SSqlRes
*
pRes
=
&
pSql
->
res
;
pRes
->
numOfRows
=
0
;
pRes
->
numOfTotal
=
0
;
pRes
->
numOfTotalInCurrentClause
=
0
;
pRes
->
qhandle
=
0
;
pSql
->
thandle
=
NULL
;
...
...
src/client/src/tscSQLParser.c
浏览文件 @
7358e65d
...
...
@@ -520,7 +520,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
}
// set the command/globallimit parameters from the first subclause to the sqlcmd object
// set the command/global
limit parameters from the first subclause to the sqlcmd object
SQueryInfo
*
pQueryInfo1
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
pCmd
->
command
=
pQueryInfo1
->
command
;
...
...
@@ -5576,10 +5576,6 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
}
}
// handle the limit offset value, validate the limit
pQueryInfo
->
limit
=
pQuerySql
->
limit
;
// temporarily save the original limitation value
if
((
code
=
parseLimitClause
(
pQueryInfo
,
index
,
pQuerySql
,
pSql
))
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
src/client/src/tscSecondaryMerge.c
浏览文件 @
7358e65d
...
...
@@ -58,7 +58,7 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SSqlRes *pRes, SLocalReducer *pRedu
* the fields and offset attributes in pCmd and pModel may be different due to
* merge requirement. So, the final result in pRes structure is formatted in accordance with the pCmd object.
*/
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutputCols
;
++
i
)
{
SQLFunctionCtx
*
pCtx
=
&
pReducer
->
pCtx
[
i
];
...
...
@@ -215,7 +215,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
#ifdef _DEBUG_VIEW
printf
(
"load data page into mem for build loser tree: %ld rows
\n
"
,
pDS
->
filePage
.
numOfElems
);
SSrcColumnInfo
colInfo
[
256
]
=
{
0
};
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
tscGetSrcColumnInfo
(
colInfo
,
pQueryInfo
);
...
...
@@ -241,7 +241,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
param
->
pLocalData
=
pReducer
->
pLocalDataSrc
;
param
->
pDesc
=
pReducer
->
pDesc
;
param
->
numOfElems
=
pReducer
->
pLocalDataSrc
[
0
]
->
pMemBuffer
->
numOfElemsPerPage
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
param
->
groupOrderType
=
pQueryInfo
->
groupbyExpr
.
orderType
;
...
...
@@ -434,7 +434,7 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
}
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
// there is no more result, so we release all allocated resource
SLocalReducer
*
pLocalReducer
=
(
SLocalReducer
*
)
atomic_exchange_ptr
(
&
pRes
->
pLocalReducer
,
NULL
);
...
...
@@ -500,7 +500,7 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
static
int32_t
createOrderDescriptor
(
tOrderDescriptor
**
pOrderDesc
,
SSqlCmd
*
pCmd
,
tColModel
*
pModel
)
{
int32_t
numOfGroupByCols
=
0
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
if
(
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
>
0
)
{
numOfGroupByCols
=
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
;
...
...
@@ -541,7 +541,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
}
bool
isSameGroup
(
SSqlCmd
*
pCmd
,
SLocalReducer
*
pReducer
,
char
*
pPrev
,
tFilePage
*
tmpBuffer
)
{
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
int16_t
functionId
=
tscSqlExprGet
(
pQueryInfo
,
0
)
->
functionId
;
...
...
@@ -787,7 +787,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
tFilePage
*
pFinalDataPage
=
pLocalReducer
->
pResultBuf
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
if
(
pRes
->
pLocalReducer
!=
pLocalReducer
)
{
/*
...
...
@@ -802,7 +802,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
// no interval query, no interpolation
pRes
->
data
=
pLocalReducer
->
pFinalRes
;
pRes
->
numOfRows
=
pFinalDataPage
->
numOfElems
;
pRes
->
numOfTotal
+=
pRes
->
numOfRows
;
pRes
->
numOfTotal
InCurrentClause
+=
pRes
->
numOfRows
;
if
(
pQueryInfo
->
limit
.
offset
>
0
)
{
if
(
pQueryInfo
->
limit
.
offset
<
pRes
->
numOfRows
)
{
...
...
@@ -813,23 +813,23 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
tColModelCompact
(
pLocalReducer
->
resColModel
,
pFinalDataPage
,
prevSize
);
pRes
->
numOfRows
-=
pQueryInfo
->
limit
.
offset
;
pRes
->
numOfTotal
-=
pQueryInfo
->
limit
.
offset
;
pRes
->
numOfTotal
InCurrentClause
-=
pQueryInfo
->
limit
.
offset
;
pQueryInfo
->
limit
.
offset
=
0
;
}
else
{
pQueryInfo
->
limit
.
offset
-=
pRes
->
numOfRows
;
pRes
->
numOfRows
=
0
;
pRes
->
numOfTotal
=
0
;
pRes
->
numOfTotal
InCurrentClause
=
0
;
}
}
if
(
pQueryInfo
->
limit
.
limit
>=
0
&&
pRes
->
numOfTotal
>
pQueryInfo
->
limit
.
limit
)
{
if
(
pQueryInfo
->
limit
.
limit
>=
0
&&
pRes
->
numOfTotal
InCurrentClause
>
pQueryInfo
->
limit
.
limit
)
{
/* impose the limitation of output rows on the final result */
int32_t
prevSize
=
pFinalDataPage
->
numOfElems
;
int32_t
overFlow
=
pRes
->
numOfTotal
-
pQueryInfo
->
limit
.
limit
;
int32_t
overFlow
=
pRes
->
numOfTotal
InCurrentClause
-
pQueryInfo
->
limit
.
limit
;
assert
(
overFlow
<
pRes
->
numOfRows
);
pRes
->
numOfTotal
=
pQueryInfo
->
limit
.
limit
;
pRes
->
numOfTotal
InCurrentClause
=
pQueryInfo
->
limit
.
limit
;
pRes
->
numOfRows
-=
overFlow
;
pFinalDataPage
->
numOfElems
-=
overFlow
;
...
...
@@ -898,7 +898,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
pRes
->
data
=
pLocalReducer
->
pFinalRes
;
pRes
->
numOfRows
=
newRows
;
pRes
->
numOfTotal
+=
newRows
;
pRes
->
numOfTotal
InCurrentClause
+=
newRows
;
pQueryInfo
->
limit
.
offset
=
0
;
break
;
...
...
@@ -924,13 +924,13 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
}
if
(
pRes
->
numOfRows
>
0
)
{
if
(
pQueryInfo
->
limit
.
limit
>=
0
&&
pRes
->
numOfTotal
>
pQueryInfo
->
limit
.
limit
)
{
int32_t
overFlow
=
pRes
->
numOfTotal
-
pQueryInfo
->
limit
.
limit
;
if
(
pQueryInfo
->
limit
.
limit
>=
0
&&
pRes
->
numOfTotal
InCurrentClause
>
pQueryInfo
->
limit
.
limit
)
{
int32_t
overFlow
=
pRes
->
numOfTotal
InCurrentClause
-
pQueryInfo
->
limit
.
limit
;
pRes
->
numOfRows
-=
overFlow
;
assert
(
pRes
->
numOfRows
>=
0
);
pRes
->
numOfTotal
=
pQueryInfo
->
limit
.
limit
;
pRes
->
numOfTotal
InCurrentClause
=
pQueryInfo
->
limit
.
limit
;
pFinalDataPage
->
numOfElems
-=
overFlow
;
/* set remain data to be discarded, and reset the interpolation information */
...
...
@@ -974,7 +974,7 @@ static void savePreviousRow(SLocalReducer *pLocalReducer, tFilePage *tmpBuffer)
static
void
doExecuteSecondaryMerge
(
SSqlCmd
*
pCmd
,
SLocalReducer
*
pLocalReducer
,
bool
needInit
)
{
// the tag columns need to be set before all functions execution
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
for
(
int32_t
j
=
0
;
j
<
pQueryInfo
->
fieldsInfo
.
numOfOutputCols
;
++
j
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
j
);
...
...
@@ -1129,7 +1129,7 @@ static bool saveGroupResultInfo(SSqlObj *pSql) {
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
pRes
->
numOfGroups
+=
1
;
// the output group is limited by the slimit clause
...
...
@@ -1139,7 +1139,7 @@ static bool saveGroupResultInfo(SSqlObj *pSql) {
// pRes->pGroupRec = realloc(pRes->pGroupRec, pRes->numOfGroups*sizeof(SResRec));
// pRes->pGroupRec[pRes->numOfGroups-1].numOfRows = pRes->numOfRows;
// pRes->pGroupRec[pRes->numOfGroups-1].numOfTotal
= pRes->numOfTotal
;
// pRes->pGroupRec[pRes->numOfGroups-1].numOfTotal
InCurrentClause = pRes->numOfTotalInCurrentClause
;
return
false
;
}
...
...
@@ -1155,7 +1155,7 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
tFilePage
*
pResBuf
=
pLocalReducer
->
pResultBuf
;
tColModel
*
pModel
=
pLocalReducer
->
resColModel
;
...
...
@@ -1207,8 +1207,9 @@ void resetOutputBuf(SQueryInfo* pQueryInfo, SLocalReducer *pLocalReducer) { //
static
void
resetEnvForNewResultset
(
SSqlRes
*
pRes
,
SSqlCmd
*
pCmd
,
SLocalReducer
*
pLocalReducer
)
{
// In handling data in other groups, we need to reset the interpolation information for a new group data
pRes
->
numOfRows
=
0
;
pRes
->
numOfTotal
=
0
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
pRes
->
numOfTotalInCurrentClause
=
0
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
pQueryInfo
->
limit
.
offset
=
pLocalReducer
->
offset
;
...
...
@@ -1233,7 +1234,7 @@ static bool doInterpolationForCurrentGroup(SSqlObj *pSql) {
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
SLocalReducer
*
pLocalReducer
=
pRes
->
pLocalReducer
;
SInterpolationInfo
*
pInterpoInfo
=
&
pLocalReducer
->
interpolationInfo
;
...
...
@@ -1269,7 +1270,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
bool
prevGroupCompleted
=
(
!
pLocalReducer
->
discard
)
&&
pLocalReducer
->
hasUnprocessedRow
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfoFromQueryInfo
(
pQueryInfo
,
0
);
int8_t
precision
=
pMeterMetaInfo
->
pMeterMeta
->
precision
;
...
...
@@ -1313,7 +1314,7 @@ static void doMergeWithPrevRows(SSqlObj *pSql, int32_t numOfRes) {
SSqlRes
*
pRes
=
&
pSql
->
res
;
SLocalReducer
*
pLocalReducer
=
pRes
->
pLocalReducer
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
for
(
int32_t
k
=
0
;
k
<
pQueryInfo
->
fieldsInfo
.
numOfOutputCols
;
++
k
)
{
SSqlExpr
*
pExpr
=
tscSqlExprGet
(
pQueryInfo
,
k
);
...
...
@@ -1330,26 +1331,21 @@ static void doMergeWithPrevRows(SSqlObj *pSql, int32_t numOfRes) {
doExecuteSecondaryMerge
(
pCmd
,
pLocalReducer
,
true
);
}
int32_t
tsc
LocalDoR
educe
(
SSqlObj
*
pSql
)
{
int32_t
tsc
DoLocalr
educe
(
SSqlObj
*
pSql
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
tscResetForNextRetrieve
(
pRes
);
if
(
pSql
->
signature
!=
pSql
||
pRes
==
NULL
||
pRes
->
pLocalReducer
==
NULL
)
{
// all data has been processed
tscTrace
(
"%s call the drop local reducer"
,
__FUNCTION__
);
tscDestroyLocalReducer
(
pSql
);
if
(
pRes
)
{
pRes
->
numOfRows
=
0
;
pRes
->
row
=
0
;
}
return
0
;
}
pRes
->
row
=
0
;
pRes
->
numOfRows
=
0
;
SLocalReducer
*
pLocalReducer
=
pRes
->
pLocalReducer
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
// set the data merge in progress
int32_t
prevStatus
=
...
...
@@ -1397,7 +1393,7 @@ int32_t tscLocalDoReduce(SSqlObj *pSql) {
#if defined(_DEBUG_VIEW)
printf
(
"chosen row:
\t
"
);
SSrcColumnInfo
colInfo
[
256
]
=
{
0
};
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
tscGetSrcColumnInfo
(
colInfo
,
pQueryInfo
);
...
...
src/client/src/tscServer.c
浏览文件 @
7358e65d
...
...
@@ -652,18 +652,18 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
}
static
SSqlObj
*
tscCreateSqlObjForSubquery
(
SSqlObj
*
pSql
,
SRetrieveSupport
*
trsupport
,
SSqlObj
*
prevSqlObj
);
static
int
tscLaunch
MetricSubQ
ueries
(
SSqlObj
*
pSql
);
static
int
tscLaunch
STableSubq
ueries
(
SSqlObj
*
pSql
);
// todo merge with callback
int32_t
tscLaunchJoinSubquery
(
SSqlObj
*
pSql
,
int16_t
tableIndex
,
SJoinSubquerySupporter
*
pSupporter
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
pSql
->
res
.
qhandle
=
0x1
;
pSql
->
res
.
numOfRows
=
0
;
if
(
pSql
->
pSubs
==
NULL
)
{
pSql
->
pSubs
=
malloc
(
POINTER_BYTES
*
pSupporter
->
pState
->
numOfTotal
);
pSql
->
pSubs
=
calloc
(
pSupporter
->
pState
->
numOfTotal
,
POINTER_BYTES
);
if
(
pSql
->
pSubs
==
NULL
)
{
return
TSDB_CODE_CLI_OUT_OF_MEMORY
;
}
...
...
@@ -874,7 +874,7 @@ int tscProcessSql(SSqlObj *pSql) {
if
(
tscIsTwoStageMergeMetricQuery
(
pQueryInfo
,
0
))
{
/*
* (ref. line: 964)
* Before this function returns from tscLaunch
MetricSubQ
ueries and continues, pSql may have been released at user
* Before this function returns from tscLaunch
STableSubq
ueries and continues, pSql may have been released at user
* program context after retrieving all data from vnodes. User function is called at tscRetrieveFromVnodeCallBack.
*
* when pSql being released, pSql->fp == NULL, it may pass the check of pSql->fp == NULL,
...
...
@@ -882,7 +882,7 @@ int tscProcessSql(SSqlObj *pSql) {
*/
void
*
fp
=
pSql
->
fp
;
if
(
tscLaunch
MetricSubQ
ueries
(
pSql
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
tscLaunch
STableSubq
ueries
(
pSql
)
!=
TSDB_CODE_SUCCESS
)
{
return
pRes
->
code
;
}
...
...
@@ -923,7 +923,7 @@ static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState
free
(
pState
);
}
int
tscLaunch
MetricSubQ
ueries
(
SSqlObj
*
pSql
)
{
int
tscLaunch
STableSubq
ueries
(
SSqlObj
*
pSql
)
{
SSqlRes
*
pRes
=
&
pSql
->
res
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
...
...
@@ -1217,7 +1217,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
#ifdef _DEBUG_VIEW
printf
(
"received data from vnode: %d rows
\n
"
,
pRes
->
numOfRows
);
SSrcColumnInfo
colInfo
[
256
]
=
{
0
};
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
tscGetSrcColumnInfo
(
colInfo
,
pQueryInfo
);
tColModelDisplayEx
(
pDesc
->
pSchema
,
pRes
->
data
,
pRes
->
numOfRows
,
pRes
->
numOfRows
,
colInfo
);
...
...
@@ -2575,8 +2575,8 @@ int tscProcessRetrieveMetricRsp(SSqlObj *pSql) {
SSqlRes
*
pRes
=
&
pSql
->
res
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
pRes
->
code
=
tsc
LocalDoR
educe
(
pSql
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
pRes
->
code
=
tsc
DoLocalr
educe
(
pSql
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
if
(
pRes
->
code
==
TSDB_CODE_SUCCESS
&&
pRes
->
numOfRows
>
0
)
{
tscSetResultPointer
(
pQueryInfo
,
pRes
);
...
...
@@ -3223,7 +3223,7 @@ int tscProcessMetricMetaRsp(SSqlObj *pSql) {
char
name
[
TSDB_MAX_TAGS_LEN
+
1
]
=
{
0
};
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfoFromQueryInfo
(
pQueryInfo
,
i
);
tscGetMetricMetaCacheKey
(
&
pSql
->
cmd
,
0
,
name
,
pMeterMetaInfo
->
pMeterMeta
->
uid
);
tscGetMetricMetaCacheKey
(
pQueryInfo
,
name
,
pMeterMetaInfo
->
pMeterMeta
->
uid
);
#ifdef _DEBUG_VIEW
printf
(
"generate the metric key:%s, index:%d
\n
"
,
name
,
i
);
...
...
@@ -3646,7 +3646,7 @@ int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) {
char
tagstr
[
TSDB_MAX_TAGS_LEN
+
1
]
=
{
0
};
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfoFromQueryInfo
(
pQueryInfo
,
i
);
tscGetMetricMetaCacheKey
(
p
Cmd
,
clauseIndex
,
tagstr
,
pMeterMetaInfo
->
pMeterMeta
->
uid
);
tscGetMetricMetaCacheKey
(
p
QueryInfo
,
tagstr
,
pMeterMetaInfo
->
pMeterMeta
->
uid
);
taosRemoveDataFromCache
(
tscCacheHandle
,
(
void
**
)
&
(
pMeterMetaInfo
->
pMetricMeta
),
false
);
...
...
@@ -3712,7 +3712,7 @@ int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) {
char
tagstr
[
TSDB_MAX_TAGS_LEN
]
=
{
0
};
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfoFromQueryInfo
(
pQueryInfo
,
i
);
tscGetMetricMetaCacheKey
(
p
Cmd
,
0
,
tagstr
,
pMeterMetaInfo
->
pMeterMeta
->
uid
);
tscGetMetricMetaCacheKey
(
p
QueryInfo
,
tagstr
,
pMeterMetaInfo
->
pMeterMeta
->
uid
);
#ifdef _DEBUG_VIEW
printf
(
"create metric key:%s, index:%d
\n
"
,
tagstr
,
i
);
...
...
src/client/src/tscSql.c
浏览文件 @
7358e65d
...
...
@@ -200,6 +200,8 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
pRes
->
numOfRows
=
1
;
pRes
->
numOfTotal
=
0
;
pRes
->
numOfTotalInCurrentClause
=
0
;
pSql
->
asyncTblPos
=
NULL
;
if
(
NULL
!=
pSql
->
pTableHashList
)
{
taosCleanUpHashTable
(
pSql
->
pTableHashList
);
...
...
@@ -367,7 +369,7 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
// secondary merge has handle this situation
if
(
pCmd
->
command
!=
TSDB_SQL_RETRIEVE_METRIC
)
{
pRes
->
numOfTotal
+=
pRes
->
numOfRows
;
pRes
->
numOfTotal
InCurrentClause
+=
pRes
->
numOfRows
;
}
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
...
...
@@ -457,7 +459,7 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
* available, go on
*/
if
(
pMetaInfo
->
vnodeIndex
<
pMetaInfo
->
pMetricMeta
->
numOfVnodes
&&
pRes1
->
row
<
pRes1
->
numOfRows
&&
(
!
tscHasReachLimitation
(
p
Sql
->
pSubs
[
i
]
)))
{
(
!
tscHasReachLimitation
(
p
QueryInfo1
,
pRes1
)))
{
allSubqueryExhausted
=
false
;
break
;
}
...
...
@@ -469,7 +471,7 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
SSqlRes
*
pRes1
=
&
pSql
->
pSubs
[
i
]
->
res
;
SQueryInfo
*
pQueryInfo1
=
tscGetQueryInfoDetail
(
&
pSql
->
pSubs
[
i
]
->
cmd
,
0
);
if
((
pRes1
->
row
>=
pRes1
->
numOfRows
&&
tscHasReachLimitation
(
p
Sql
->
pSubs
[
i
]
)
&&
if
((
pRes1
->
row
>=
pRes1
->
numOfRows
&&
tscHasReachLimitation
(
p
QueryInfo1
,
pRes1
)
&&
tscProjectionQueryOnTable
(
pQueryInfo1
))
||
(
pRes1
->
numOfRows
==
0
))
{
hasData
=
false
;
...
...
@@ -552,6 +554,94 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) {
return
pRes
->
tsrow
;
}
/**
* If current vnode query does not return results anymore (pRes->numOfRows == 0), try the next vnode if exists,
* in case of multi-vnode super table projection query and the result does not reach the limitation.
*/
static
bool
hasMoreVnodesToTry
(
SSqlObj
*
pSql
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
return
pRes
->
numOfRows
==
0
&&
tscProjectionQueryOnSTable
(
pQueryInfo
,
0
)
&&
!
tscHasReachLimitation
(
pQueryInfo
,
pRes
);
}
static
void
tscTryQueryNextVnode
(
SSqlObj
*
pSql
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
/*
* no result returned from the current virtual node anymore, try the next vnode if exists
* if case of: multi-vnode super table projection query
*/
assert
(
pRes
->
numOfRows
==
0
&&
tscProjectionQueryOnSTable
(
pQueryInfo
,
0
)
&&
!
tscHasReachLimitation
(
pQueryInfo
,
pRes
));
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfoFromQueryInfo
(
pQueryInfo
,
0
);
int32_t
totalVnode
=
pMeterMetaInfo
->
pMetricMeta
->
numOfVnodes
;
while
(
++
pMeterMetaInfo
->
vnodeIndex
<
totalVnode
)
{
tscTrace
(
"%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d"
,
pSql
,
pMeterMetaInfo
->
vnodeIndex
-
1
,
pMeterMetaInfo
->
vnodeIndex
,
totalVnode
,
pRes
->
numOfTotalInCurrentClause
);
/*
* update the limit and offset value for the query on the next vnode,
* according to current retrieval results
*
* NOTE:
* if the pRes->offset is larger than 0, the start returned position has not reached yet.
* Therefore, the pRes->numOfRows, as well as pRes->numOfTotalInCurrentClause, must be 0.
* The pRes->offset value will be updated by virtual node, during query execution.
*/
if
(
pQueryInfo
->
clauseLimit
>=
0
)
{
pQueryInfo
->
limit
.
limit
=
pQueryInfo
->
clauseLimit
-
pRes
->
numOfTotalInCurrentClause
;
}
pQueryInfo
->
limit
.
offset
=
pRes
->
offset
;
assert
((
pRes
->
offset
>=
0
&&
pRes
->
numOfRows
==
0
)
||
(
pRes
->
offset
==
0
&&
pRes
->
numOfRows
>=
0
));
tscTrace
(
"%p new query to next vnode, vnode index:%d, limit:%"
PRId64
", offset:%"
PRId64
", glimit:%"
PRId64
,
pSql
,
pMeterMetaInfo
->
vnodeIndex
,
pQueryInfo
->
limit
.
limit
,
pQueryInfo
->
limit
.
offset
,
pQueryInfo
->
clauseLimit
);
/*
* For project query with super table join, the numOfSub is equalled to the number of all subqueries.
* Therefore, we need to reset the value of numOfSubs to be 0.
*
* For super table join with projection query, if anyone of the subquery is exhausted, the query completed.
*/
pSql
->
numOfSubs
=
0
;
pCmd
->
command
=
TSDB_SQL_SELECT
;
assert
(
pSql
->
fp
==
NULL
);
int32_t
ret
=
tscProcessSql
(
pSql
);
// todo check for failure
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
pSql
->
res
.
code
=
ret
;
return
;
}
// retrieve data
assert
(
pCmd
->
command
==
TSDB_SQL_SELECT
);
pCmd
->
command
=
TSDB_SQL_FETCH
;
if
((
ret
=
tscProcessSql
(
pSql
))
!=
TSDB_CODE_SUCCESS
)
{
pSql
->
res
.
code
=
ret
;
return
;
}
// if the result from current virtual node are empty, try next if exists. otherwise, return the results.
if
(
pRes
->
numOfRows
>
0
)
{
break
;
}
}
if
(
pRes
->
numOfRows
==
0
)
{
tscTrace
(
"%p all vnodes exhausted, prj query completed. total res:%d"
,
pSql
,
totalVnode
,
pRes
->
numOfTotal
);
}
}
TAOS_ROW
taos_fetch_row_impl
(
TAOS_RES
*
res
)
{
SSqlObj
*
pSql
=
(
SSqlObj
*
)
res
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
...
...
@@ -572,7 +662,13 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) {
return
NULL
;
}
}
else
if
(
pRes
->
row
>=
pRes
->
numOfRows
)
{
// not a join query
}
else
if
(
pRes
->
row
>=
pRes
->
numOfRows
)
{
/**
* NOT a join query
*
* If the data block of current result set have been consumed already, try fetch next result
* data block from virtual node.
*/
tscResetForNextRetrieve
(
pRes
);
if
(
pCmd
->
command
<
TSDB_SQL_LOCAL
)
{
...
...
@@ -580,89 +676,17 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) {
}
tscProcessSql
(
pSql
);
// retrieve data from virtual node
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
/*
* no result returned from the current virtual node anymore, try the next vnode if exists
* if case of: multi-vnode super table projection query
*/
if
(
pRes
->
numOfRows
==
0
&&
tscProjectionQueryOnSTable
(
pQueryInfo
,
0
))
{
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfoFromQueryInfo
(
pQueryInfo
,
0
);
int32_t
totalVnode
=
pMeterMetaInfo
->
pMetricMeta
->
numOfVnodes
;
while
(
++
pMeterMetaInfo
->
vnodeIndex
<
totalVnode
)
{
tscTrace
(
"%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d"
,
pSql
,
pMeterMetaInfo
->
vnodeIndex
-
1
,
pMeterMetaInfo
->
vnodeIndex
,
totalVnode
,
pRes
->
numOfTotal
);
// reach the maximum number of output rows, abort
if
(
tscHasReachLimitation
(
pSql
))
{
return
NULL
;
}
/*
* update the limit and offset value for the query on the next vnode,
* according to current retrieval results
*
* NOTE:
* if the pRes->offset is larger than 0, the start returned position has not reached yet.
* Therefore, the pRes->numOfRows, as well as pRes->numOfTotal, must be 0.
* The pRes->offset value will be updated by virtual node, during query execution.
*/
if
(
pQueryInfo
->
clauseLimit
>=
0
)
{
pQueryInfo
->
limit
.
limit
=
pQueryInfo
->
clauseLimit
-
pRes
->
numOfTotal
;
}
pQueryInfo
->
limit
.
offset
=
pRes
->
offset
;
assert
((
pRes
->
offset
>=
0
&&
pRes
->
numOfRows
==
0
)
||
(
pRes
->
offset
==
0
&&
pRes
->
numOfRows
>=
0
));
tscTrace
(
"%p new query to next vnode, vnode index:%d, limit:%"
PRId64
", offset:%"
PRId64
", glimit:%"
PRId64
,
pSql
,
pMeterMetaInfo
->
vnodeIndex
,
pQueryInfo
->
limit
.
limit
,
pQueryInfo
->
limit
.
offset
,
pQueryInfo
->
clauseLimit
);
/*
* For project query with super table join, the numOfSub is equalled to the number of all subqueries.
* Therefore, we need to reset the value of numOfSubs to be 0.
*
* For super table join with projection query, if anyone of the subquery is exhausted, the query completed.
*/
pSql
->
numOfSubs
=
0
;
pCmd
->
command
=
TSDB_SQL_SELECT
;
assert
(
pSql
->
fp
==
NULL
);
int32_t
ret
=
tscProcessSql
(
pSql
);
// todo check for failure
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
pSql
->
res
.
code
=
ret
;
return
NULL
;
}
// retrieve data
assert
(
pCmd
->
command
==
TSDB_SQL_SELECT
);
pCmd
->
command
=
TSDB_SQL_FETCH
;
if
((
ret
=
tscProcessSql
(
pSql
))
!=
TSDB_CODE_SUCCESS
)
{
pSql
->
res
.
code
=
ret
;
return
NULL
;
}
// if the result from current virtual node are empty, try next if exists. otherwise, return the results.
if
(
pRes
->
numOfRows
>
0
)
{
break
;
}
}
if
(
pRes
->
numOfRows
==
0
)
{
tscTrace
(
"%p all vnodes exhausted, prj query completed. total res:%d"
,
pSql
,
totalVnode
,
pRes
->
numOfTotal
);
}
if
(
hasMoreVnodesToTry
(
pSql
))
{
tscTryQueryNextVnode
(
pSql
);
}
/*
* local reducer has handle this case,
* so no need to add the pRes->numOfRows for
metric retrieve
* so no need to add the pRes->numOfRows for
super table query
*/
if
(
pCmd
->
command
!=
TSDB_SQL_RETRIEVE_METRIC
)
{
pRes
->
numOfTotal
+=
pRes
->
numOfRows
;
pRes
->
numOfTotal
InCurrentClause
+=
pRes
->
numOfRows
;
}
if
(
pRes
->
numOfRows
==
0
)
{
...
...
@@ -675,73 +699,43 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) {
TAOS_ROW
taos_fetch_row
(
TAOS_RES
*
res
)
{
SSqlObj
*
pSql
=
(
SSqlObj
*
)
res
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
if
(
pSql
==
NULL
||
pSql
->
signature
!=
pSql
)
{
globalCode
=
TSDB_CODE_DISCONNECTED
;
return
NULL
;
}
// projection query on metric, pipeline retrieve data from vnode list, instead of two-stage merge
/*
* projection query on super table, access each virtual node sequentially retrieve data from vnode list,
* instead of two-stage merge
*/
TAOS_ROW
rows
=
taos_fetch_row_impl
(
res
);
pRes
->
numOfTotal
+=
pRes
->
numOfTotalInCurrentClause
;
pRes
->
numOfTotalInCurrentClause
=
0
;
// current subclause is completed, try the next subclause
while
(
rows
==
NULL
&&
pCmd
->
clauseIndex
<
pCmd
->
numOfClause
-
1
)
{
pSql
->
cmd
.
command
=
TSDB_SQL_SELECT
;
pCmd
->
clauseIndex
++
;
assert
(
pSql
->
fp
==
NULL
);
tscTrace
(
"%p try data in the next subclause:%d, total subclause:%d"
,
pSql
,
pCmd
->
clauseIndex
,
pCmd
->
numOfClause
);
tscProcessSql
(
pSql
);
// SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
// while (rows == NULL && tscProjectionQueryOnSTable(pQueryInfo, 0)) {
// SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
//
// // reach the maximum number of output rows, abort
// if (tscHasReachLimitation(pSql)) {
// return NULL;
// }
//
// /*
// * update the limit and offset value according to current retrieval results
// * Note: if pRes->offset > 0, pRes->numOfRows = 0, pRes->numOfTotal = 0;
// */
// pQueryInfo->limit.limit = pCmd->globalLimit - pRes->numOfTotal;
// pQueryInfo->limit.offset = pRes->offset;
//
// assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0));
//
// /*
// * For project query with super table join, the numOfSub is equalled to the number of all subqueries, so
// * we need to reset the value of numOfSubs to be 0.
// *
// * For super table join with projection query, if anyone of the subquery is exhausted, the query completed.
// */
// pSql->numOfSubs = 0;
//
// if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) {
// pCmd->command = TSDB_SQL_SELECT;
// assert(pSql->fp == NULL);
// tscProcessSql(pSql);
// rows = taos_fetch_row_impl(res);
// }
//
// // check!!!
// if (rows != NULL || pMeterMetaInfo->vnodeIndex >= pMeterMetaInfo->pMetricMeta->numOfVnodes) {
// break;
// }
// }
//
// // current subclause is completed, try the next subclause
// if (rows == NULL && pCmd->clauseIndex < pCmd->numOfClause - 1) {
// pSql->cmd.command = TSDB_SQL_SELECT;
// pCmd->clauseIndex++;
//
// assert(pSql->fp == NULL);
//
// tscTrace("%p start next subclause:%d, total subclause:%d", pSql, pCmd->clauseIndex, pCmd->numOfClause);
// tscProcessSql(pSql);
//
// rows = taos_fetch_row_impl(res);
// }
// if the rows is not NULL, return immediately
rows
=
taos_fetch_row_impl
(
res
);
}
return
rows
;
}
int
taos_fetch_block
(
TAOS_RES
*
res
,
TAOS_ROW
*
rows
)
{
SSqlObj
*
pSql
=
(
SSqlObj
*
)
res
;
SSql
Res
*
pRes
=
&
pSql
->
res
;
SSql
Cmd
*
pCmd
=
&
pSql
->
cmd
;
int
nRows
=
0
;
...
...
@@ -755,32 +749,19 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
// instead of two-stage mergevnodeProcessMsgFromShell free qhandle
nRows
=
taos_fetch_block_impl
(
res
,
rows
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
&
pSql
->
cmd
,
pSql
->
cmd
.
clauseIndex
);
while
(
*
rows
==
NULL
&&
tscProjectionQueryOnSTable
(
pQueryInfo
,
0
))
{
/* reach the maximum number of output rows, abort */
if
(
tscHasReachLimitation
(
pSql
))
{
return
0
;
}
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfoFromQueryInfo
(
pQueryInfo
,
0
);
/* update the limit value according to current retrieval results */
pQueryInfo
->
limit
.
limit
=
pSql
->
cmd
.
globalLimit
-
pRes
->
numOfTotal
;
pQueryInfo
->
limit
.
offset
=
pRes
->
offset
;
if
((
++
pMeterMetaInfo
->
vnodeIndex
)
<
pMeterMetaInfo
->
pMetricMeta
->
numOfVnodes
)
{
pSql
->
cmd
.
command
=
TSDB_SQL_SELECT
;
assert
(
pSql
->
fp
==
NULL
);
tscProcessSql
(
pSql
);
nRows
=
taos_fetch_block_impl
(
res
,
rows
);
}
// check!!!
if
(
*
rows
!=
NULL
||
pMeterMetaInfo
->
vnodeIndex
>=
pMeterMetaInfo
->
pMetricMeta
->
numOfVnodes
)
{
break
;
}
// current subclause is completed, try the next subclause
while
(
rows
==
NULL
&&
pCmd
->
clauseIndex
<
pCmd
->
numOfClause
-
1
)
{
pSql
->
cmd
.
command
=
TSDB_SQL_SELECT
;
pCmd
->
clauseIndex
++
;
assert
(
pSql
->
fp
==
NULL
);
tscTrace
(
"%p try data in the next subclause:%d, total subclause:%d"
,
pSql
,
pCmd
->
clauseIndex
,
pCmd
->
numOfClause
);
tscProcessSql
(
pSql
);
nRows
=
taos_fetch_block_impl
(
res
,
rows
);
}
return
nRows
;
}
...
...
@@ -1042,7 +1023,8 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
pRes
->
numOfRows
=
1
;
pRes
->
numOfTotal
=
0
;
pRes
->
numOfTotalInCurrentClause
=
0
;
tscTrace
(
"%p Valid SQL: %s pObj:%p"
,
pSql
,
sql
,
pObj
);
int32_t
sqlLen
=
strlen
(
sql
);
...
...
@@ -1172,6 +1154,8 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
SSqlRes
*
pRes
=
&
pSql
->
res
;
pRes
->
numOfTotal
=
0
;
// the number of getting table meta from server
pRes
->
numOfTotalInCurrentClause
=
0
;
pRes
->
code
=
0
;
assert
(
pSql
->
fp
==
NULL
);
...
...
src/client/src/tscUtil.c
浏览文件 @
7358e65d
...
...
@@ -37,12 +37,9 @@
* fullmetername + '.' + '(nil)' + '.' + '(nil)' + relation + '.' + [tagId1,
* tagId2,...] + '.' + group_orderType
*/
void
tscGetMetricMetaCacheKey
(
S
SqlCmd
*
pCmd
,
int32_t
subClauseIndex
,
char
*
str
,
uint64_t
uid
)
{
void
tscGetMetricMetaCacheKey
(
S
QueryInfo
*
pQueryInfo
,
char
*
str
,
uint64_t
uid
)
{
int32_t
index
=
-
1
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfoByUid
(
pQueryInfo
,
subClauseIndex
,
uid
,
&
index
);
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfoByUid
(
pQueryInfo
,
uid
,
&
index
);
int32_t
len
=
0
;
char
tagIdBuf
[
128
]
=
{
0
};
...
...
@@ -395,7 +392,8 @@ void tscFreeSqlObjPartial(SSqlObj* pSql) {
pRes
->
row
=
0
;
pRes
->
numOfRows
=
0
;
pRes
->
numOfTotal
=
0
;
pRes
->
numOfTotalInCurrentClause
=
0
;
pRes
->
numOfGroups
=
0
;
tfree
(
pRes
->
pGroupRec
);
...
...
@@ -1591,13 +1589,14 @@ SMeterMetaInfo* tscGetMeterMetaInfo(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
}
SMeterMetaInfo
*
tscGetMeterMetaInfoFromQueryInfo
(
SQueryInfo
*
pQueryInfo
,
int32_t
tableIndex
)
{
assert
(
pQueryInfo
!=
NULL
);
if
(
pQueryInfo
->
pMeterInfo
==
NULL
)
{
assert
(
pQueryInfo
->
numOfTables
==
0
);
return
NULL
;
}
assert
(
pQueryInfo
!=
NULL
&&
tableIndex
>=
0
&&
tableIndex
<=
pQueryInfo
->
numOfTables
&&
pQueryInfo
->
pMeterInfo
!=
NULL
);
assert
(
tableIndex
>=
0
&&
tableIndex
<=
pQueryInfo
->
numOfTables
&&
pQueryInfo
->
pMeterInfo
!=
NULL
);
return
pQueryInfo
->
pMeterInfo
[
tableIndex
];
}
...
...
@@ -1628,7 +1627,7 @@ int32_t tscGetQueryInfoDetailSafely(SSqlCmd* pCmd, int32_t subClauseIndex, SQuer
return
TSDB_CODE_SUCCESS
;
}
SMeterMetaInfo
*
tscGetMeterMetaInfoByUid
(
SQueryInfo
*
pQueryInfo
,
int32_t
subClauseIndex
,
uint64_t
uid
,
int32_t
*
index
)
{
SMeterMetaInfo
*
tscGetMeterMetaInfoByUid
(
SQueryInfo
*
pQueryInfo
,
uint64_t
uid
,
int32_t
*
index
)
{
int32_t
k
=
-
1
;
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
numOfTables
;
++
i
)
{
...
...
@@ -1642,6 +1641,7 @@ SMeterMetaInfo* tscGetMeterMetaInfoByUid(SQueryInfo* pQueryInfo, int32_t subClau
*
index
=
k
;
}
assert
(
k
!=
-
1
);
return
tscGetMeterMetaInfoFromQueryInfo
(
pQueryInfo
,
k
);
}
...
...
@@ -1777,6 +1777,10 @@ void tscClearMeterMetaInfo(SMeterMetaInfo* pMeterMetaInfo, bool removeFromCache)
}
void
tscResetForNextRetrieve
(
SSqlRes
*
pRes
)
{
if
(
pRes
==
NULL
)
{
return
;
}
pRes
->
row
=
0
;
pRes
->
numOfRows
=
0
;
}
...
...
@@ -1877,7 +1881,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
pNew
->
param
=
param
;
char
key
[
TSDB_MAX_TAGS_LEN
+
1
]
=
{
0
};
tscGetMetricMetaCacheKey
(
p
Cmd
,
pCmd
->
clauseIndex
,
key
,
uid
);
tscGetMetricMetaCacheKey
(
p
QueryInfo
,
key
,
uid
);
#ifdef _DEBUG_VIEW
printf
(
"the metricmeta key is:%s
\n
"
,
key
);
...
...
@@ -1980,13 +1984,9 @@ int32_t tscInvalidSQLErrMsg(char* msg, const char* additionalInfo, const char* s
return
TSDB_CODE_INVALID_SQL
;
}
bool
tscHasReachLimitation
(
SSqlObj
*
pSql
)
{
assert
(
pSql
!=
NULL
&&
pSql
->
cmd
.
globalLimit
!=
0
);
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
return
(
pCmd
->
globalLimit
>
0
&&
pRes
->
numOfTotal
>=
pCmd
->
globalLimit
);
bool
tscHasReachLimitation
(
SQueryInfo
*
pQueryInfo
,
SSqlRes
*
pRes
)
{
assert
(
pQueryInfo
!=
NULL
&&
pQueryInfo
->
clauseLimit
!=
0
);
return
(
pQueryInfo
->
clauseLimit
>
0
&&
pRes
->
numOfTotalInCurrentClause
>=
pQueryInfo
->
clauseLimit
);
}
char
*
tscGetErrorMsgPayload
(
SSqlCmd
*
pCmd
)
{
return
pCmd
->
payload
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录