diff --git a/src/client/inc/tscLocalMerge.h b/src/client/inc/tscLocalMerge.h
index 19230e34f1f8769229d0f386a8276a6c2395b279..c073f40546c6a5ceaeabeafd3d731fdf211402df 100644
--- a/src/client/inc/tscLocalMerge.h
+++ b/src/client/inc/tscLocalMerge.h
@@ -13,8 +13,8 @@
* along with this program. If not, see .
*/
-#ifndef TDENGINE_TSCSECONARYMERGE_H
-#define TDENGINE_TSCSECONARYMERGE_H
+#ifndef TDENGINE_TSCLOCALMERGE_H
+#define TDENGINE_TSCLOCALMERGE_H
#ifdef __cplusplus
extern "C" {
@@ -27,14 +27,7 @@ extern "C" {
#include "tsclient.h"
#define MAX_NUM_OF_SUBQUERY_RETRY 3
-
-/*
- * @version 0.1
- * @date 2018/01/05
- * @author liaohj
- * management of client-side reducer for metric query
- */
-
+
struct SQLFunctionCtx;
typedef struct SLocalDataSource {
@@ -60,7 +53,6 @@ typedef struct SLocalReducer {
char * prevRowOfInput;
tFilePage * pResultBuf;
int32_t nResultBufSize;
-// char * pBufForInterpo; // intermediate buffer for interpolation
tFilePage * pTempBuffer;
struct SQLFunctionCtx *pCtx;
int32_t rowSize; // size of each intermediate result.
@@ -81,13 +73,8 @@ typedef struct SLocalReducer {
} SLocalReducer;
typedef struct SSubqueryState {
- /*
- * the number of completed retrieval subquery, once this value equals to numOfVnodes,
- * all retrieval are completed.Local merge is launched.
- */
- int32_t numOfCompleted;
- int32_t numOfTotal; // number of total sub-queries
- int32_t code; // code from subqueries
+ int32_t numOfRemain; // the number of remain unfinished subquery
+ int32_t numOfTotal; // the number of total sub-queries
uint64_t numOfRetrievedRows; // total number of points in this query
} SSubqueryState;
@@ -128,4 +115,4 @@ int32_t tscDoLocalMerge(SSqlObj *pSql);
}
#endif
-#endif // TDENGINE_TSCSECONARYMERGE_H
+#endif // TDENGINE_TSCLOCALMERGE_H
diff --git a/src/client/inc/tscSubquery.h b/src/client/inc/tscSubquery.h
index 368fe2250a37e3ef226174b80a39e63e0c990fda..82d490376aa8de18814c6cf9a1f3fbfb5be6ce75 100644
--- a/src/client/inc/tscSubquery.h
+++ b/src/client/inc/tscSubquery.h
@@ -26,11 +26,9 @@ extern "C" {
void tscFetchDatablockFromSubquery(SSqlObj* pSql);
void tscSetupOutputColumnIndex(SSqlObj* pSql);
-int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql);
void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code);
SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index);
-void tscDestroyJoinSupporter(SJoinSupporter* pSupporter);
int32_t tscHandleMasterJoinQuery(SSqlObj* pSql);
diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h
index 581ce00a62ed4e450d1c4a546e227c0e10580549..d6562f008de5ce8a42969633d53936d19fcc07ef 100644
--- a/src/client/inc/tscUtil.h
+++ b/src/client/inc/tscUtil.h
@@ -64,7 +64,8 @@ typedef struct SJoinSupporter {
SSubqueryState* pState;
SSqlObj* pObj; // parent SqlObj
int32_t subqueryIndex; // index of sub query
- int64_t interval; // interval time
+ int64_t intervalTime; // interval time
+ int64_t slidingTime; // sliding time
SLimitVal limit; // limit info
uint64_t uid; // query meter uid
SArray* colList; // previous query information, no need to use this attribute, and the corresponding attribution
diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c
index 4e02325be605a954c9695bb808e063e3b2fc9f68..d25afcf00fd951e5f60d91226e106057114e159a 100644
--- a/src/client/src/tscSubquery.c
+++ b/src/client/src/tscSubquery.c
@@ -12,12 +12,12 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
+#include "os.h"
#include "tscSubquery.h"
-#include
-#include
-#include
-#include "os.h"
+#include "qast.h"
+#include "tcompare.h"
+#include "tschemautil.h"
#include "qtsbuf.h"
#include "tscLog.h"
#include "tsclient.h"
@@ -169,7 +169,8 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, in
pSupporter->subqueryIndex = index;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
- pSupporter->interval = pQueryInfo->intervalTime;
+ pSupporter->intervalTime = pQueryInfo->intervalTime;
+ pSupporter->slidingTime = pQueryInfo->slidingTime;
pSupporter->limit = pQueryInfo->limit;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, index);
@@ -186,7 +187,7 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, in
return pSupporter;
}
-void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
+static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
if (pSupporter == NULL) {
return;
}
@@ -217,7 +218,7 @@ void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
* primary timestamp column , the secondary query is not necessary
*
*/
-bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
+static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
for (int32_t i = 0; i < numOfCols; ++i) {
@@ -233,14 +234,11 @@ bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
/*
* launch secondary stage query to fetch the result that contains timestamp in set
*/
-int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
+static int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
int32_t numOfSub = 0;
SJoinSupporter* pSupporter = NULL;
- /*
- * If the columns are not involved in the final select clause,
- * the corresponding query will not be issued.
- */
+ //If the columns are not involved in the final select clause, the corresponding query will not be issued.
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
pSupporter = pSql->pSubs[i]->param;
if (taosArrayGetSize(pSupporter->exprList) > 0) {
@@ -251,13 +249,12 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
assert(numOfSub > 0);
// scan all subquery, if one sub query has only ts, ignore it
- tscTrace("%p start to launch secondary subquery, total:%d, only:%d needs to query, others are not retrieve in "
- "select clause", pSql, pSql->numOfSubs, numOfSub);
+ tscTrace("%p start to launch secondary subquery, total:%d, only:%d needs to query", pSql, pSql->numOfSubs, numOfSub);
//the subqueries that do not actually launch the secondary query to virtual node is set as completed.
SSubqueryState* pState = pSupporter->pState;
pState->numOfTotal = pSql->numOfSubs;
- pState->numOfCompleted = (pSql->numOfSubs - numOfSub);
+ pState->numOfRemain = numOfSub;
bool success = true;
@@ -301,7 +298,8 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
// set the second stage sub query for join process
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE);
- pQueryInfo->intervalTime = pSupporter->interval;
+ pQueryInfo->intervalTime = pSupporter->intervalTime;
+ pQueryInfo->slidingTime = pSupporter->slidingTime;
pQueryInfo->groupbyExpr = pSupporter->groupbyExpr;
tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond);
@@ -375,6 +373,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
if (pSql->pSubs[i] == NULL) {
continue;
}
+
tscDoQuery(pSql->pSubs[i]);
}
@@ -405,11 +404,9 @@ void freeJoinSubqueryObj(SSqlObj* pSql) {
}
static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
- int32_t numOfTotal = pSupporter->pState->numOfTotal;
- int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1);
-
- pSqlObj->res.code = pSupporter->pState->code;
- if (finished >= numOfTotal) {
+ assert(pSupporter->pState->numOfRemain > 0);
+
+ if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) <= 0) {
tscError("%p all subquery return and query failed, global code:%d", pSqlObj, pSqlObj->res.code);
freeJoinSubqueryObj(pSqlObj);
}
@@ -421,7 +418,7 @@ static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) {
pQueryInfo->window = *win;
}
-static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj* pSql) {
+static UNUSED_FUNC void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj* pSql) {
SSqlObj* pParentSql = pSupporter->pObj;
SQueryInfo* pParentQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
@@ -443,22 +440,7 @@ static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj*
// return;
// }
// }
-
- int32_t numOfTotal = pSupporter->pState->numOfTotal;
- int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1);
-
- if (finished >= numOfTotal) {
- assert(finished == numOfTotal);
-
- if (pSupporter->pState->code != TSDB_CODE_SUCCESS) {
- tscTrace("%p sub:%p, numOfSub:%d, quit from further procedure due to other queries failure", pParentSql, pSql,
- pSupporter->subqueryIndex);
- freeJoinSubqueryObj(pParentSql);
- return;
- }
-
- tscTrace("%p all subqueries retrieve ts complete, do ts block intersect", pParentSql);
-
+
SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
@@ -471,7 +453,6 @@ static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj*
updateQueryTimeRange(pParentQueryInfo, &win);
tscLaunchSecondPhaseSubqueries(pParentSql);
}
- }
}
int32_t tscCompareTidTags(const void* p1, const void* p2) {
@@ -545,11 +526,13 @@ static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj*
tscAddSpecialColumnForSelect(pQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL);
// set the tags value for ts_comp function
- SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, 0);
- int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid);
- pExpr->param->i64Key = tagColId;
- pExpr->numOfParams = 1;
-
+ if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
+ SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, 0);
+ int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid);
+ pExpr->param->i64Key = tagColId;
+ pExpr->numOfParams = 1;
+ }
+
// add the filter tag column
if (pSupporter->colList != NULL) {
size_t s = taosArrayGetSize(pSupporter->colList);
@@ -575,7 +558,7 @@ static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj*
tscProcessSql(pSql);
}
-static bool checkForIdenticalTagVal(SQueryInfo* pQueryInfo, SJoinSupporter* p1, void* pSql) {
+static bool checkForDuplicateTagVal(SQueryInfo* pQueryInfo, SJoinSupporter* p1, SSqlObj* pPSqlObj) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);// todo: tags mismatch, tags not completed
@@ -587,8 +570,8 @@ static bool checkForIdenticalTagVal(SQueryInfo* pQueryInfo, SJoinSupporter* p1,
STidTags* p = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize);
if (doCompare(prev->tag, p->tag, pColSchema->type, pColSchema->bytes) == 0) {
- tscError("%p join tags have same value for different table, free all sub SqlObj and quit", pSql);
- p1->pState->code = TSDB_CODE_QRY_DUP_JOIN_KEY;
+ tscError("%p join tags have same value for different table, free all sub SqlObj and quit", pPSqlObj);
+ pPSqlObj->res.code = TSDB_CODE_QRY_DUP_JOIN_KEY;
return false;
}
}
@@ -596,8 +579,8 @@ static bool checkForIdenticalTagVal(SQueryInfo* pQueryInfo, SJoinSupporter* p1,
return true;
}
-static void getIntersectionOfTagVal(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray** s1, SArray** s2) {
- tscTrace("%p all subqueries retrieve tags complete, do tags match", pParentSql);
+static void getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray** s1, SArray** s2) {
+ tscTrace("%p all subqueries retrieve complete, do tags match", pParentSql);
SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
@@ -613,7 +596,7 @@ static void getIntersectionOfTagVal(SQueryInfo* pQueryInfo, SSqlObj* pParentSql,
*s1 = taosArrayInit(p1->num, p1->tagSize);
*s2 = taosArrayInit(p2->num, p2->tagSize);
- if (!(checkForIdenticalTagVal(pQueryInfo, p1, pParentSql) && checkForIdenticalTagVal(pQueryInfo, p2, pParentSql))) {
+ if (!(checkForDuplicateTagVal(pQueryInfo, p1, pParentSql) && checkForDuplicateTagVal(pQueryInfo, p2, pParentSql))) {
freeJoinSubqueryObj(pParentSql);
pParentSql->res.code = TSDB_CODE_QRY_DUP_JOIN_KEY;
tscQueueAsyncRes(pParentSql);
@@ -642,7 +625,7 @@ static void getIntersectionOfTagVal(SQueryInfo* pQueryInfo, SSqlObj* pParentSql,
}
}
-static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
+static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
SSqlObj* pParentSql = pSupporter->pObj;
@@ -652,239 +635,287 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
SSqlRes* pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
+ assert(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY));
- // response of tag retrieve
- if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) {
- // todo handle error
+ // check for the error code firstly
+ if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
+ // todo retry if other subqueries are not failed
- if (numOfRows == 0 || pRes->completed) {
- if (numOfRows > 0) {
- size_t validLen = pSupporter->tagSize * pRes->numOfRows;
+ assert(numOfRows < 0 && numOfRows == taos_errno(pSql));
+ tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
- size_t length = pSupporter->totalLen + validLen;
- char* tmp = realloc(pSupporter->pIdTagList, length);
- assert(tmp != NULL);
- pSupporter->pIdTagList = tmp;
+ pParentSql->res.code = numOfRows;
+ quitAllSubquery(pParentSql, pSupporter);
- memcpy(pSupporter->pIdTagList + pSupporter->totalLen,pRes->data, validLen);
- pSupporter->totalLen += validLen;
- pSupporter->num += pRes->numOfRows;
- }
+ tscQueueAsyncRes(pParentSql);
+ return;
+ }
- // tuples have been retrieved to client, try tuples from the next vnode
- if (hasMoreVnodesToTry(pSql)) {
- STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
+ // keep the results in memory
+ if (numOfRows > 0) {
+ size_t validLen = pSupporter->tagSize * pRes->numOfRows;
+ size_t length = pSupporter->totalLen + validLen;
- int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
- pTableMetaInfo->vgroupIndex += 1;
- assert(pTableMetaInfo->vgroupIndex < totalVgroups);
+ // todo handle memory error
+ char* tmp = realloc(pSupporter->pIdTagList, length);
+ if (tmp == NULL) {
+ tscError("%p failed to malloc memory", pSql);
- tscTrace("%p tid_tag from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%d",
- pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups,
- pSupporter->num);
+ pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
+ quitAllSubquery(pParentSql, pSupporter);
- pCmd->command = TSDB_SQL_SELECT;
- tscResetForNextRetrieve(&pSql->res);
+ tscQueueAsyncRes(pParentSql);
+ return;
+ }
- // set the callback function
- pSql->fp = tscJoinQueryCallback;
- tscProcessSql(pSql);
- return;
- }
+ pSupporter->pIdTagList = tmp;
- int32_t numOfTotal = pSupporter->pState->numOfTotal;
- int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1);
- if (finished < numOfTotal) {
- return;
- }
+ memcpy(pSupporter->pIdTagList + pSupporter->totalLen, pRes->data, validLen);
+ pSupporter->totalLen += validLen;
+ pSupporter->num += pRes->numOfRows;
- // all subquery are returned, start to compare the tags
- assert(finished == numOfTotal);
+ // query not completed, continue to retrieve tid + tag tuples
+ if (!pRes->completed) {
+ taos_fetch_rows_a(tres, tidTagRetrieveCallback, param);
+ return;
+ }
+ }
- SArray *s1 = NULL, *s2 = NULL;
- getIntersectionOfTagVal(pQueryInfo, pParentSql, &s1, &s2);
+ // data in current vnode has all returned to client, try next vnode if exits
+ // tuples have been retrieved to client, try tuples from the next vnode
+ if (hasMoreVnodesToTry(pSql)) {
+ STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
- if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) { // no results,return.
- tscTrace("%p free all sub SqlObj and quit", pParentSql);
- freeJoinSubqueryObj(pParentSql);
- return;
- } else {
- SSqlCmd* pSubCmd1 = &pParentSql->pSubs[0]->cmd;
- SSqlCmd* pSubCmd2 = &pParentSql->pSubs[1]->cmd;
+ int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
+ pTableMetaInfo->vgroupIndex += 1;
+ assert(pTableMetaInfo->vgroupIndex < totalVgroups);
- SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pSubCmd1, 0);
- STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo1, 0);
- tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo1, s1);
+ tscTrace("%p tid_tag from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%d",
+ pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, pSupporter->num);
- SQueryInfo* pQueryInfo2 = tscGetQueryInfoDetail(pSubCmd2, 0);
- STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0);
- tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2);
+ pCmd->command = TSDB_SQL_SELECT;
+ tscResetForNextRetrieve(&pSql->res);
- pSupporter->pState->numOfCompleted = 0;
- pSupporter->pState->code = 0;
- pSupporter->pState->numOfTotal = 2;
+ // set the callback function
+ pSql->fp = tscJoinQueryCallback;
+ tscProcessSql(pSql);
+ return;
+ }
- for (int32_t m = 0; m < pParentSql->numOfSubs; ++m) {
- SSqlObj* psub = pParentSql->pSubs[m];
- issueTSCompQuery(psub, psub->param, pParentSql);
- }
- }
+ // no data exists in next vnode, mark the query completed
+ // only when there is no subquery exits any more, proceeds to get the intersect of the tuple sets.
+ if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) > 0) {
+ return;
+ }
- } else {
- if (numOfRows < 0) { // error
- pSupporter->pState->code = numOfRows;
- quitAllSubquery(pParentSql, pSupporter);
+ SArray *s1 = NULL, *s2 = NULL;
+ getIntersectionOfTableTuple(pQueryInfo, pParentSql, &s1, &s2);
+ if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) { // no results,return.
+ tscTrace("%p free all sub SqlObj and quit", pParentSql);
+ freeJoinSubqueryObj(pParentSql);
+ return;
+ }
- pParentSql->res.code = numOfRows;
- tscQueueAsyncRes(pParentSql);
- return;
- }
+ // proceed to for ts_comp query
+ SSqlCmd* pSubCmd1 = &pParentSql->pSubs[0]->cmd;
+ SSqlCmd* pSubCmd2 = &pParentSql->pSubs[1]->cmd;
- size_t length = pSupporter->totalLen + pRes->rspLen;
- assert(length > 0);
+ SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pSubCmd1, 0);
+ STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo1, 0);
+ tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo1, s1);
- char* tmp = realloc(pSupporter->pIdTagList, length);
- assert(tmp != NULL);
+ SQueryInfo* pQueryInfo2 = tscGetQueryInfoDetail(pSubCmd2, 0);
+ STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0);
+ tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2);
- pSupporter->pIdTagList = tmp;
+ pSupporter->pState->numOfTotal = 2;
+ pSupporter->pState->numOfRemain = pSupporter->pState->numOfTotal;
- memcpy(pSupporter->pIdTagList, pRes->data, pRes->rspLen);
- pSupporter->totalLen += pRes->rspLen;
- pSupporter->num += pRes->numOfRows;
+ for (int32_t m = 0; m < pParentSql->numOfSubs; ++m) {
+ SSqlObj* sub = pParentSql->pSubs[m];
+ issueTSCompQuery(sub, sub->param, pParentSql);
+ }
+}
- // continue retrieve data from vnode
- taos_fetch_rows_a(tres, joinRetrieveCallback, param);
- }
+static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
+ SJoinSupporter* pSupporter = (SJoinSupporter*)param;
+ SSqlObj* pParentSql = pSupporter->pObj;
+
+ SSqlObj* pSql = (SSqlObj*)tres;
+ SSqlCmd* pCmd = &pSql->cmd;
+ SSqlRes* pRes = &pSql->res;
+
+ SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
+ assert(!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE));
+
+ // check for the error code firstly
+ if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
+ // todo retry if other subqueries are not failed yet
+ assert(numOfRows < 0 && numOfRows == taos_errno(pSql));
+ tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
+
+ pParentSql->res.code = numOfRows;
+ quitAllSubquery(pParentSql, pSupporter);
+
+ tscQueueAsyncRes(pParentSql);
return;
}
- if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
- if (numOfRows < 0) {
- tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
- pSupporter->pState->code = numOfRows;
- quitAllSubquery(pParentSql, pSupporter);
+ if (numOfRows > 0) { // write the compressed timestamp to disk file
+ fwrite(pRes->data, pRes->numOfRows, 1, pSupporter->f);
+ fclose(pSupporter->f);
+ pSupporter->f = NULL;
+
+ STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true);
+ if (pBuf == NULL) { // in error process, close the fd
+ tscError("%p invalid ts comp file from vnode, abort subquery, file size:%d", pSql, numOfRows);
+
+ pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
+ tscQueueAsyncRes(pParentSql);
+
return;
}
- if (numOfRows > 0) { // write the compressed timestamp to disk file
- fwrite(pRes->data, pRes->numOfRows, 1, pSupporter->f);
- fclose(pSupporter->f);
- pSupporter->f = NULL;
-
- STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true);
- if (pBuf == NULL) { // in error process, close the fd
- tscError("%p invalid ts comp file from vnode, abort subquery, file size:%d", pSql, numOfRows);
+ if (pSupporter->pTSBuf == NULL) {
+ tscTrace("%p create tmp file for ts block:%s, size:%d bytes", pSql, pBuf->path, numOfRows);
+ pSupporter->pTSBuf = pBuf;
+ } else {
+ assert(pQueryInfo->numOfTables == 1); // for subquery, only one
+ STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
- pSupporter->pState->code = TSDB_CODE_TSC_APP_ERROR; // todo set the informative code
- quitAllSubquery(pParentSql, pSupporter);
- return;
- }
+ tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vgroupIndex);
+ tsBufDestory(pBuf);
+ }
- if (pSupporter->pTSBuf == NULL) {
- tscTrace("%p create tmp file for ts block:%s, size:%d bytes", pSql, pBuf->path, numOfRows);
- pSupporter->pTSBuf = pBuf;
- } else {
- assert(pQueryInfo->numOfTables == 1); // for subquery, only one
- STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
+ // continue to retrieve ts-comp data from vnode
+ if (!pRes->completed) {
+ getTmpfilePath("ts-join", pSupporter->path);
+ pSupporter->f = fopen(pSupporter->path, "w");
+ pRes->row = pRes->numOfRows;
- tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vgroupIndex);
- tsBufDestory(pBuf);
- }
+ taos_fetch_rows_a(tres, tsCompRetrieveCallback, param);
+ return;
}
+ }
- if (pRes->completed) {
- if (hasMoreVnodesToTry(pSql)) {
- STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
+ if (hasMoreVnodesToTry(pSql)) {
+ STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
- int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
- pTableMetaInfo->vgroupIndex += 1;
- assert(pTableMetaInfo->vgroupIndex < totalVgroups);
+ int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
+ pTableMetaInfo->vgroupIndex += 1;
+ assert(pTableMetaInfo->vgroupIndex < totalVgroups);
- tscTrace("%p results from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%d",
- pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups,
- pRes->numOfClauseTotal);
+ tscTrace("%p results from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%d",
+ pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups,
+ pRes->numOfClauseTotal);
- pCmd->command = TSDB_SQL_SELECT;
- tscResetForNextRetrieve(&pSql->res);
+ pCmd->command = TSDB_SQL_SELECT;
+ tscResetForNextRetrieve(&pSql->res);
- assert(pSupporter->f == NULL);
- getTmpfilePath("ts-join", pSupporter->path);
- pSupporter->f = fopen(pSupporter->path, "w");
- pRes->row = pRes->numOfRows;
+ assert(pSupporter->f == NULL);
+ getTmpfilePath("ts-join", pSupporter->path);
+
+ // TODO check for failure
+ pSupporter->f = fopen(pSupporter->path, "w");
+ pRes->row = pRes->numOfRows;
- // set the callback function
- pSql->fp = tscJoinQueryCallback;
- tscProcessSql(pSql);
- return;
- } else {
- tSIntersectionAndLaunchSecQuery(pSupporter, pSql);
- }
+ // set the callback function
+ pSql->fp = tscJoinQueryCallback;
+ tscProcessSql(pSql);
+ return;
+ }
- } else { // open a new file to save the incoming result
- getTmpfilePath("ts-join", pSupporter->path);
- pSupporter->f = fopen(pSupporter->path, "w");
- pRes->row = pRes->numOfRows;
+ if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) > 0) {
+ return;
+ }
- taos_fetch_rows_a(tres, joinRetrieveCallback, param);
- }
- } else { // secondary stage retrieve, driven by taos_fetch_row or other functions
- if (numOfRows < 0) {
- pSupporter->pState->code = numOfRows;
- tscError("%p retrieve failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
- }
+ tscTrace("%p all subquery retrieve ts complete, do ts block intersect", pParentSql);
- if (numOfRows >= 0) {
- pRes->numOfTotal += pRes->numOfRows;
- }
+ // proceeds to launched secondary query to retrieve final data
+ SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
+ SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
- if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) {
- STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
- assert(pQueryInfo->numOfTables == 1);
+ STimeWindow win = TSWINDOW_INITIALIZER;
+ int64_t num = doTSBlockIntersect(pParentSql, p1, p2, &win);
+ if (num <= 0) { // no result during ts intersect
+ tscTrace("%p no results generated in ts intersection, free all sub SqlObj and quit", pParentSql);
+ freeJoinSubqueryObj(pParentSql);
+
+ return;
+ }
+
+ // launch the query the retrieve actual results from vnode along with the filtered timestamp
+ SQueryInfo* pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
+ updateQueryTimeRange(pPQueryInfo, &win);
+ tscLaunchSecondPhaseSubqueries(pParentSql);
+}
- // for projection query, need to try next vnode if current vnode is exhausted
- if ((++pTableMetaInfo->vgroupIndex) < pTableMetaInfo->vgroupList->numOfVgroups) {
- pSupporter->pState->numOfCompleted = 0;
- pSupporter->pState->numOfTotal = 1;
+static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfRows) {
+ SJoinSupporter* pSupporter = (SJoinSupporter*)param;
- pSql->cmd.command = TSDB_SQL_SELECT;
- pSql->fp = tscJoinQueryCallback;
- tscProcessSql(pSql);
+ SSqlObj* pParentSql = pSupporter->pObj;
+ SSubqueryState* pState = pSupporter->pState;
- return;
- }
- }
+ SSqlObj* pSql = (SSqlObj*)tres;
+ SSqlCmd* pCmd = &pSql->cmd;
+ SSqlRes* pRes = &pSql->res;
- int32_t numOfTotal = pSupporter->pState->numOfTotal;
- int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1);
+ SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
- if (finished >= numOfTotal) {
- assert(finished == numOfTotal);
- tscTrace("%p all %d secondary subquery retrieves completed, global code:%d", tres, numOfTotal,
- pParentSql->res.code);
+ // TODO put to async res?
+ if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
+ assert(numOfRows == taos_errno(pSql));
- if (pSupporter->pState->code != TSDB_CODE_SUCCESS) {
- pParentSql->res.code = pSupporter->pState->code;
- freeJoinSubqueryObj(pParentSql);
- pParentSql->res.completed = true;
- }
+ pParentSql->res.code = numOfRows;
+ tscError("%p retrieve failed, index:%d, code:%s", pSql, pSupporter->subqueryIndex, tstrerror(numOfRows));
+ }
- // update the records for each subquery in parent sql object.
- for (int32_t i = 0; i < pParentSql->numOfSubs; ++i) {
- if (pParentSql->pSubs[i] == NULL) {
- continue;
- }
+ if (numOfRows >= 0) {
+ pRes->numOfTotal += pRes->numOfRows;
+ }
- SSqlRes* pRes1 = &pParentSql->pSubs[i]->res;
- pRes1->numOfClauseTotal += pRes1->numOfRows;
- }
+ if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) {
+ STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
+ assert(pQueryInfo->numOfTables == 1);
- // data has retrieved to client, build the join results
- tscBuildResFromSubqueries(pParentSql);
- } else {
- tscTrace("%p sub:%p completed, completed:%d, total:%d", pParentSql, tres, finished, numOfTotal);
+ // for projection query, need to try next vnode if current vnode is exhausted
+ if ((++pTableMetaInfo->vgroupIndex) < pTableMetaInfo->vgroupList->numOfVgroups) {
+ pState->numOfRemain = 1;
+ pState->numOfTotal = 1;
+
+ pSql->cmd.command = TSDB_SQL_SELECT;
+ pSql->fp = tscJoinQueryCallback;
+ tscProcessSql(pSql);
+
+ return;
+ }
+ }
+
+ if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) {
+ tscTrace("%p sub:%p completed, remain:%d, total:%d", pParentSql, tres, pState->numOfRemain, pState->numOfTotal);
+ return;
+ }
+
+ tscTrace("%p all %d secondary subqueries retrieval completed, code:%d", tres, pState->numOfTotal, pParentSql->res.code);
+
+ if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
+ freeJoinSubqueryObj(pParentSql);
+ pParentSql->res.completed = true;
+ }
+
+ // update the records for each subquery in parent sql object.
+ for (int32_t i = 0; i < pParentSql->numOfSubs; ++i) {
+ if (pParentSql->pSubs[i] == NULL) {
+ continue;
}
+
+ SSqlRes* pRes1 = &pParentSql->pSubs[i]->res;
+ pRes1->numOfClauseTotal += pRes1->numOfRows;
}
+
+ // data has retrieved to client, build the join results
+ tscBuildResFromSubqueries(pParentSql);
}
static SJoinSupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch) {
@@ -902,7 +933,7 @@ static SJoinSupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch
}
pState->numOfTotal = pSql->numOfSubs;
- pState->numOfCompleted = pSql->numOfSubs - numOfFetch;
+ pState->numOfRemain = numOfFetch;
return pSupporter;
}
@@ -990,7 +1021,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
pSupporter->subqueryIndex, pTableMetaInfo->vgroupIndex);
tscResetForNextRetrieve(pRes1);
- pSql1->fp = joinRetrieveCallback;
+ pSql1->fp = joinRetrieveFinalResCallback;
if (pCmd1->command < TSDB_SQL_LOCAL) {
pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
@@ -1008,8 +1039,9 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
tscTrace("%p all subquery response, retrieve data", pSql);
+ // the column transfer support struct has been built
if (pRes->pColumnIndex != NULL) {
- return; // the column transfer support struct has been built
+ return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
@@ -1045,68 +1077,76 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
SSqlObj* pSql = (SSqlObj*)tres;
-
+
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
-
+ SSqlObj* pParentSql = pSupporter->pObj;
+
// There is only one subquery and table for each subquery.
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
assert(pQueryInfo->numOfTables == 1 && pSql->cmd.numOfClause == 1);
-
- if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
- if (code != TSDB_CODE_SUCCESS) { // direct call joinRetrieveCallback and set the error code
- joinRetrieveCallback(param, pSql, code);
- } else { // first stage query, continue to retrieve compressed time stamp data
- pSql->fp = joinRetrieveCallback;
- pSql->cmd.command = TSDB_SQL_FETCH;
- tscProcessSql(pSql);
- }
- } else { // second stage join subquery
- SSqlObj* pParentSql = pSupporter->pObj;
+ // retrieve actual query results from vnode during the second stage join subquery
+ if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
+ tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code, pParentSql->res.code);
+ quitAllSubquery(pParentSql, pSupporter);
+ tscQueueAsyncRes(pParentSql);
- if (pSupporter->pState->code != TSDB_CODE_SUCCESS) {
- tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code,
- pSupporter->pState->code);
- quitAllSubquery(pParentSql, pSupporter);
+ return;
+ }
- return;
- }
+ // TODO here retry is required, not directly returns to client
+ if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
+ assert(taos_errno(pSql) == code);
- if (code != TSDB_CODE_SUCCESS) {
- tscError("%p sub query failed, code:%s, set global code:%s, index:%d", pSql, tstrerror(code), tstrerror(code),
- pSupporter->subqueryIndex);
-
- pSupporter->pState->code = code;
- quitAllSubquery(pParentSql, pSupporter);
+ tscError("%p abort query, code:%d, global code:%d", pSql, code, pParentSql->res.code);
+ pParentSql->res.code = code;
+
+ quitAllSubquery(pParentSql, pSupporter);
+ tscQueueAsyncRes(pParentSql);
+
+ return;
+ }
+
+ // retrieve tuples from vnode
+ if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) {
+ pSql->fp = tidTagRetrieveCallback;
+ pSql->cmd.command = TSDB_SQL_FETCH;
+ tscProcessSql(pSql);
+ return;
+ }
+
+ // retrieve ts_comp info from vnode
+ if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
+ pSql->fp = tsCompRetrieveCallback;
+ pSql->cmd.command = TSDB_SQL_FETCH;
+ tscProcessSql(pSql);
+ return;
+ }
+
+ // wait for the other subqueries response from vnode
+ if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) > 0) {
+ return;
+ }
+
+ tscSetupOutputColumnIndex(pParentSql);
+ STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
+
+ /**
+ * if the query is a continue query (vgroupIndex > 0 for projection query) for next vnode, do the retrieval of
+ * data instead of returning to its invoker
+ */
+ if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
+ pSupporter->pState->numOfRemain = pSupporter->pState->numOfTotal; // reset the record value
+
+ pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data
+ pSql->cmd.command = TSDB_SQL_FETCH;
+ tscProcessSql(pSql);
+ } else { // first retrieve from vnode during the secondary stage sub-query
+ // set the command flag must be after the semaphore been correctly set.
+ if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
+ (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
} else {
- int32_t numOfTotal = pSupporter->pState->numOfTotal;
- int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1);
-
- if (finished >= numOfTotal) {
- assert(finished == numOfTotal);
-
- tscSetupOutputColumnIndex(pParentSql);
- STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
-
- /**
- * if the query is a continue query (vgroupIndex > 0 for projection query) for next vnode, do the retrieval of
- * data instead of returning to its invoker
- */
- if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
- pSupporter->pState->numOfCompleted = 0; // reset the record value
-
- pSql->fp = joinRetrieveCallback; // continue retrieve data
- pSql->cmd.command = TSDB_SQL_FETCH;
- tscProcessSql(pSql);
- } else { // first retrieve from vnode during the secondary stage sub-query
- // set the command flag must be after the semaphore been correctly set.
- if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
- (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
- } else {
- tscQueueAsyncRes(pParentSql);
- }
- }
- }
+ tscQueueAsyncRes(pParentSql);
}
}
}
@@ -1257,14 +1297,15 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) {
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
pState->numOfTotal = pQueryInfo->numOfTables;
-
+ pState->numOfRemain = pState->numOfTotal;
+
tscTrace("%p start launch subquery, total:%d", pSql, pQueryInfo->numOfTables);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i);
if (pSupporter == NULL) { // failed to create support struct, abort current query
tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i);
- pState->numOfCompleted = pQueryInfo->numOfTables - i - 1;
+ pState->numOfRemain = i;
pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return pSql->res.code;
@@ -1342,6 +1383,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tscTrace("%p retrieved query data from %d vnode(s)", pSql, pSql->numOfSubs);
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
pState->numOfTotal = pSql->numOfSubs;
+ pState->numOfRemain = pSql->numOfSubs;
+
pRes->code = TSDB_CODE_SUCCESS;
int32_t i = 0;
@@ -1436,7 +1479,7 @@ static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) {
static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows);
static void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows);
-static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t errCode) {
+static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t code) {
// set no disk space error info
#ifdef WINDOWS
LPVOID lpMsgBuf;
@@ -1448,44 +1491,44 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES
#else
tscError("sub:%p failed to flush data to disk:reason:%s", tres, strerror(errno));
#endif
-
- trsupport->pState->code = -errCode;
+
+ SSqlObj* pParentSql = trsupport->pParentSqlObj;
+
+ pParentSql->res.code = code;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
pthread_mutex_unlock(&trsupport->queryMutex);
- tscHandleSubqueryError(trsupport, tres, trsupport->pState->code);
+ tscHandleSubqueryError(trsupport, tres, pParentSql->res.code);
}
void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
- SSqlObj *pPObj = trsupport->pParentSqlObj;
+ SSqlObj *pParentSql = trsupport->pParentSqlObj;
int32_t subqueryIndex = trsupport->subqueryIndex;
assert(pSql != NULL);
SSubqueryState* pState = trsupport->pState;
- assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
- pPObj->numOfSubs == pState->numOfTotal);
+ assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal);
// retrieved in subquery failed. OR query cancelled in retrieve phase.
- if (pState->code == TSDB_CODE_SUCCESS && pPObj->res.code != TSDB_CODE_SUCCESS) {
- pState->code = pPObj->res.code;
-
+ if (taos_errno(pSql) == TSDB_CODE_SUCCESS && pParentSql->res.code != TSDB_CODE_SUCCESS) {
+
/*
* kill current sub-query connection, which may retrieve data from vnodes;
* Here we get: pPObj->res.code == TSDB_CODE_TSC_QUERY_CANCELLED
*/
pSql->res.numOfRows = 0;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; // disable retry efforts
- tscTrace("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%d", trsupport->pParentSqlObj, pSql,
- subqueryIndex, pState->code);
+ tscTrace("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%d", pParentSql, pSql,
+ subqueryIndex, pParentSql->res.code);
}
if (numOfRows >= 0) { // current query is successful, but other sub query failed, still abort current query.
- tscTrace("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pPObj, pSql, numOfRows, subqueryIndex);
- tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pPObj, pSql,
- subqueryIndex, pState->code);
+ tscTrace("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pParentSql, pSql, numOfRows, subqueryIndex);
+ tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pParentSql, pSql,
+ subqueryIndex, pParentSql->res.code);
} else {
- if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pState->code == TSDB_CODE_SUCCESS) {
+ if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pParentSql->res.code == TSDB_CODE_SUCCESS) {
/*
* current query failed, and the retry count is less than the available
* count, retry query clear previous retrieved data, then launch a new sub query
@@ -1503,8 +1546,8 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
if (pNew == NULL) {
tscError("%p sub:%p failed to create new subquery sqlObj due to out of memory, abort retry",
trsupport->pParentSqlObj, pSql);
-
- pState->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
+
+ pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
return;
}
@@ -1512,24 +1555,24 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
tscProcessSql(pNew);
return;
} else { // reach the maximum retry count, abort
- atomic_val_compare_exchange_32(&pState->code, TSDB_CODE_SUCCESS, numOfRows);
- tscError("%p sub:%p retrieve failed,code:%s,orderOfSub:%d failed.no more retry,set global code:%s", pPObj, pSql,
- tstrerror(numOfRows), subqueryIndex, tstrerror(pState->code));
+ atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows);
+ tscError("%p sub:%p retrieve failed,code:%s,orderOfSub:%d failed.no more retry,set global code:%s", pParentSql, pSql,
+ tstrerror(numOfRows), subqueryIndex, tstrerror(pParentSql->res.code));
}
}
-
- int32_t numOfTotal = pState->numOfTotal;
-
- int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1);
- if (finished < numOfTotal) {
- tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished);
+
+ int32_t remain = -1;
+ if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) {
+ tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex,
+ pState->numOfTotal - remain);
+
return tscFreeSubSqlObj(trsupport, pSql);
}
// all subqueries are failed
- tscError("%p retrieve from %d vnode(s) completed,code:%s.FAILED.", pPObj, pState->numOfTotal, tstrerror(pState->code));
- pPObj->res.code = pState->code;
-
+ tscError("%p retrieve from %d vnode(s) completed,code:%s.FAILED.", pParentSql, pState->numOfTotal,
+ tstrerror(pParentSql->res.code));
+
// release allocated resource
tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel,
pState->numOfTotal);
@@ -1538,13 +1581,13 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
tscFreeSubSqlObj(trsupport, pSql);
// in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
- SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);
+ SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0);
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
- (*pPObj->fp)(pPObj->param, pPObj, pPObj->res.code);
+ (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code);
} else { // regular super table query
- if (pPObj->res.code != TSDB_CODE_SUCCESS) {
- tscQueueAsyncRes(pPObj);
+ if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
+ tscQueueAsyncRes(pParentSql);
}
}
}
@@ -1590,14 +1633,11 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
return tscAbortFurtherRetryRetrieval(trsupport, pSql, TSDB_CODE_TSC_NO_DISKSPACE);
}
- // keep this value local variable, since the pState variable may be released by other threads, if atomic_add opertion
- // increases the finished value up to pState->numOfTotal value, which means all subqueries are completed.
- // In this case, the comparsion between finished value and released pState->numOfTotal is not safe.
- int32_t numOfTotal = pState->numOfTotal;
-
- int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1);
- if (finished < numOfTotal) {
- tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished);
+ int32_t remain = -1;
+ if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) {
+ tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex,
+ pState->numOfTotal - remain);
+
return tscFreeSubSqlObj(trsupport, pSql);
}
@@ -1632,10 +1672,10 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
- int32_t idx = trsupport->subqueryIndex;
- SSqlObj * pPObj = trsupport->pParentSqlObj;
tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
-
+ int32_t idx = trsupport->subqueryIndex;
+ SSqlObj * pPObj = trsupport->pParentSqlObj;
+
SSqlObj *pSql = (SSqlObj *)tres;
if (pSql == NULL) { // sql object has been released in error process, return immediately
tscTrace("%p subquery has been released, idx:%d, abort", pPObj, idx);
@@ -1643,13 +1683,12 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
}
SSubqueryState* pState = trsupport->pState;
- assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
- pPObj->numOfSubs == pState->numOfTotal);
+ assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pPObj->numOfSubs == pState->numOfTotal);
// query process and cancel query process may execute at the same time
pthread_mutex_lock(&trsupport->queryMutex);
- if (numOfRows < 0 || pState->code < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) {
+ if (numOfRows < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) {
return tscHandleSubqueryError(trsupport, pSql, numOfRows);
}
@@ -1738,18 +1777,16 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
SSubqueryState* pState = trsupport->pState;
- assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
- pParentSql->numOfSubs == pState->numOfTotal);
-
- if (pParentSql->res.code != TSDB_CODE_SUCCESS || pState->code != TSDB_CODE_SUCCESS) {
+ assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal);
+
+ // todo set error code
+ if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
// stable query is killed, abort further retry
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
code = pParentSql->res.code;
- } else {
- code = pState->code;
}
tscTrace("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%s", pParentSql, pSql,
@@ -1766,7 +1803,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
if (code != TSDB_CODE_SUCCESS) {
if (trsupport->numOfRetry++ >= MAX_NUM_OF_SUBQUERY_RETRY) {
tscTrace("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(code));
- atomic_val_compare_exchange_32(&pState->code, 0, code);
+ atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code);
} else { // does not reach the maximum retry time, go on
tscTrace("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry);
@@ -1775,8 +1812,8 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
if (pNew == NULL) {
tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vgId:%d, orderOfSub:%d",
trsupport->pParentSqlObj, pSql, pVgroup->vgId, trsupport->subqueryIndex);
-
- pState->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
+
+ pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
} else {
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
@@ -1789,11 +1826,11 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
}
}
- if (pState->code != TSDB_CODE_SUCCESS) { // at least one peer subquery failed, abort current query
+ if (pParentSql->res.code != TSDB_CODE_SUCCESS) { // at least one peer subquery failed, abort current query
tscTrace("%p sub:%p query failed,ip:%u,vgId:%d,orderOfSub:%d,global code:%d", pParentSql, pSql,
- pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex, pState->code);
+ pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex, pParentSql->res.code);
- tscHandleSubqueryError(param, tres, pState->code);
+ tscHandleSubqueryError(param, tres, pParentSql->res.code);
} else { // success, proceed to retrieve data from dnode
tscTrace("%p sub:%p query complete, ip:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSqlObj, pSql,
pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex);
@@ -1813,8 +1850,7 @@ static void multiVnodeInsertMerge(void* param, TAOS_RES* tres, int numOfRows) {
SSqlCmd* pParentCmd = &pParentObj->cmd;
SSubqueryState* pState = pSupporter->pState;
- int32_t total = pState->numOfTotal;
-
+
// increase the total inserted rows
if (numOfRows > 0) {
pParentObj->res.numOfRows += numOfRows;
@@ -1826,8 +1862,7 @@ static void multiVnodeInsertMerge(void* param, TAOS_RES* tres, int numOfRows) {
}
taos_free_result(tres);
- int32_t completed = atomic_add_fetch_32(&pState->numOfCompleted, 1);
- if (completed < total) {
+ if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) {
return;
}
@@ -2157,5 +2192,3 @@ static bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
return hasData;
}
-
-
diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c
index bb6eaa68e5a0159c6e1cdeeec848af839e7d1bc2..eb62d6c61a25a41d227fb330eead6fe54eef002b 100644
--- a/src/query/src/qExecutor.c
+++ b/src/query/src/qExecutor.c
@@ -1216,7 +1216,6 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
// interval query with limit applied
int32_t numOfRes = 0;
-
if (isIntervalQuery(pQuery)) {
numOfRes = doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo);
} else {
diff --git a/src/util/src/terror.c b/src/util/src/terror.c
index c040a11362a01175a126a7d954af4a945285061d..aa6925464e12a1863b8dc510d7389ed97267d142 100644
--- a/src/util/src/terror.c
+++ b/src/util/src/terror.c
@@ -15,7 +15,6 @@
#include
#include
-#include
#include
#include