未验证 提交 20f27624 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #4921 from taosdata/hotfix/TD-2719

[TD-2719]trace subquery states
...@@ -47,7 +47,6 @@ void tscLockByThread(int64_t *lockedBy); ...@@ -47,7 +47,6 @@ void tscLockByThread(int64_t *lockedBy);
void tscUnlockByThread(int64_t *lockedBy); void tscUnlockByThread(int64_t *lockedBy);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -317,7 +317,8 @@ typedef struct STscObj { ...@@ -317,7 +317,8 @@ typedef struct STscObj {
} STscObj; } STscObj;
typedef struct SSubqueryState { typedef struct SSubqueryState {
int32_t numOfRemain; // the number of remain unfinished subquery pthread_mutex_t mutex;
int8_t *states;
int32_t numOfSub; // the number of total sub-queries int32_t numOfSub; // the number of total sub-queries
uint64_t numOfRetrievedRows; // total number of points in this query uint64_t numOfRetrievedRows; // total number of points in this query
} SSubqueryState; } SSubqueryState;
......
...@@ -1427,6 +1427,10 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { ...@@ -1427,6 +1427,10 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
tscResetForNextRetrieve(pRes); tscResetForNextRetrieve(pRes);
if (pSql->signature != pSql || pRes == NULL || pRes->pLocalReducer == NULL) { // all data has been processed if (pSql->signature != pSql || pRes == NULL || pRes->pLocalReducer == NULL) { // all data has been processed
if (pRes->code == TSDB_CODE_SUCCESS) {
return pRes->code;
}
tscError("%p local merge abort due to error occurs, code:%s", pSql, tstrerror(pRes->code)); tscError("%p local merge abort due to error occurs, code:%s", pSql, tstrerror(pRes->code));
return pRes->code; return pRes->code;
} }
......
...@@ -55,6 +55,58 @@ static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) { ...@@ -55,6 +55,58 @@ static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) {
} }
} }
static void subquerySetState(SSqlObj *pSql, SSubqueryState *subState, int idx, int8_t state) {
assert(idx < subState->numOfSub);
assert(subState->states);
pthread_mutex_lock(&subState->mutex);
tscDebug("subquery:%p,%d state set to %d", pSql, idx, state);
subState->states[idx] = state;
pthread_mutex_unlock(&subState->mutex);
}
static bool allSubqueryDone(SSqlObj *pParentSql) {
bool done = true;
SSubqueryState *subState = &pParentSql->subState;
//lock in caller
for (int i = 0; i < subState->numOfSub; i++) {
if (0 == subState->states[i]) {
tscDebug("%p subquery:%p,%d is NOT finished, total:%d", pParentSql, pParentSql->pSubs[i], i, subState->numOfSub);
done = false;
break;
} else {
tscDebug("%p subquery:%p,%d is finished, total:%d", pParentSql, pParentSql->pSubs[i], i, subState->numOfSub);
}
}
return done;
}
static bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) {
SSubqueryState *subState = &pParentSql->subState;
assert(idx < subState->numOfSub);
pthread_mutex_lock(&subState->mutex);
tscDebug("%p subquery:%p,%d state set to 1", pParentSql, pSql, idx);
subState->states[idx] = 1;
bool done = allSubqueryDone(pParentSql);
pthread_mutex_unlock(&subState->mutex);
return done;
}
static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJoinSupporter* pSupporter2, STimeWindow * win) { static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJoinSupporter* pSupporter2, STimeWindow * win) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
...@@ -367,10 +419,6 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -367,10 +419,6 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
// scan all subquery, if one sub query has only ts, ignore it // scan all subquery, if one sub query has only ts, ignore it
tscDebug("%p start to launch secondary subqueries, %d out of %d needs to query", pSql, numOfSub, pSql->subState.numOfSub); tscDebug("%p start to launch secondary subqueries, %d out of %d needs to query", pSql, numOfSub, pSql->subState.numOfSub);
//the subqueries that do not actually launch the secondary query to virtual node is set as completed.
SSubqueryState* pState = &pSql->subState;
pState->numOfRemain = numOfSub;
bool success = true; bool success = true;
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
...@@ -403,6 +451,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -403,6 +451,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
success = false; success = false;
break; break;
} }
tscClearSubqueryInfo(&pNew->cmd); tscClearSubqueryInfo(&pNew->cmd);
pSql->pSubs[i] = pNew; pSql->pSubs[i] = pNew;
...@@ -480,6 +529,8 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -480,6 +529,8 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
} }
} }
subquerySetState(pPrevSub, &pSql->subState, i, 0);
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
tscDebug("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s", tscDebug("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s",
pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pQueryInfo->type, taosArrayGetSize(pQueryInfo->exprList), pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pQueryInfo->type, taosArrayGetSize(pQueryInfo->exprList),
...@@ -517,20 +568,25 @@ void freeJoinSubqueryObj(SSqlObj* pSql) { ...@@ -517,20 +568,25 @@ void freeJoinSubqueryObj(SSqlObj* pSql) {
SJoinSupporter* p = pSub->param; SJoinSupporter* p = pSub->param;
tscDestroyJoinSupporter(p); tscDestroyJoinSupporter(p);
if (pSub->res.code == TSDB_CODE_SUCCESS) { taos_free_result(pSub);
taos_free_result(pSub); pSql->pSubs[i] = NULL;
}
} }
if (pSql->subState.states) {
pthread_mutex_destroy(&pSql->subState.mutex);
}
tfree(pSql->subState.states);
pSql->subState.numOfSub = 0; pSql->subState.numOfSub = 0;
} }
static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { static void quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
assert(pSqlObj->subState.numOfRemain > 0); if (subAndCheckDone(pSqlSub, pSqlObj, pSupporter->subqueryIndex)) {
tscError("%p all subquery return and query failed, global code:%s", pSqlObj, tstrerror(pSqlObj->res.code));
if (atomic_sub_fetch_32(&pSqlObj->subState.numOfRemain, 1) <= 0) {
tscError("%p all subquery return and query failed, global code:%s", pSqlObj, tstrerror(pSqlObj->res.code));
freeJoinSubqueryObj(pSqlObj); freeJoinSubqueryObj(pSqlObj);
return;
} }
//tscDestroyJoinSupporter(pSupporter); //tscDestroyJoinSupporter(pSupporter);
...@@ -777,6 +833,15 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -777,6 +833,15 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
assert(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)); assert(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY));
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, pParentSql->res.code);
quitAllSubquery(pSql, pParentSql, pSupporter);
tscAsyncResultOnError(pParentSql);
return;
}
// check for the error code firstly // check for the error code firstly
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
// todo retry if other subqueries are not failed // todo retry if other subqueries are not failed
...@@ -785,7 +850,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -785,7 +850,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex); tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
pParentSql->res.code = numOfRows; pParentSql->res.code = numOfRows;
quitAllSubquery(pParentSql, pSupporter); quitAllSubquery(pSql, pParentSql, pSupporter);
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(pParentSql);
return; return;
...@@ -802,7 +867,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -802,7 +867,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
tscError("%p failed to malloc memory", pSql); tscError("%p failed to malloc memory", pSql);
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
quitAllSubquery(pParentSql, pSupporter); quitAllSubquery(pSql, pParentSql, pSupporter);
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(pParentSql);
return; return;
...@@ -844,9 +909,10 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -844,9 +909,10 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// no data exists in next vnode, mark the <tid, tags> query completed // no data exists in next vnode, mark the <tid, tags> query completed
// only when there is no subquery exits any more, proceeds to get the intersect of the <tid, tags> tuple sets. // only when there is no subquery exits any more, proceeds to get the intersect of the <tid, tags> tuple sets.
if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
tscDebug("%p tagRetrieve:%p,%d completed, total:%d", pParentSql, tres, pSupporter->subqueryIndex, pParentSql->subState.numOfSub);
return; return;
} }
SArray *s1 = NULL, *s2 = NULL; SArray *s1 = NULL, *s2 = NULL;
int32_t code = getIntersectionOfTableTuple(pQueryInfo, pParentSql, &s1, &s2); int32_t code = getIntersectionOfTableTuple(pQueryInfo, pParentSql, &s1, &s2);
...@@ -891,8 +957,10 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -891,8 +957,10 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
((SJoinSupporter*)psub2->param)->pVgroupTables = tscVgroupTableInfoClone(pTableMetaInfo2->pVgroupTables); ((SJoinSupporter*)psub2->param)->pVgroupTables = tscVgroupTableInfoClone(pTableMetaInfo2->pVgroupTables);
pParentSql->subState.numOfSub = 2; pParentSql->subState.numOfSub = 2;
pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub;
memset(pParentSql->subState.states, 0, sizeof(pParentSql->subState.states[0]) * pParentSql->subState.numOfSub);
tscDebug("%p reset all sub states to 0", pParentSql);
for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) { for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) {
SSqlObj* sub = pParentSql->pSubs[m]; SSqlObj* sub = pParentSql->pSubs[m];
issueTSCompQuery(sub, sub->param, pParentSql); issueTSCompQuery(sub, sub->param, pParentSql);
...@@ -915,6 +983,15 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -915,6 +983,15 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
assert(!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)); assert(!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE));
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, pParentSql->res.code);
quitAllSubquery(pSql, pParentSql, pSupporter);
tscAsyncResultOnError(pParentSql);
return;
}
// check for the error code firstly // check for the error code firstly
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
// todo retry if other subqueries are not failed yet // todo retry if other subqueries are not failed yet
...@@ -922,7 +999,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -922,7 +999,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex); tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
pParentSql->res.code = numOfRows; pParentSql->res.code = numOfRows;
quitAllSubquery(pParentSql, pSupporter); quitAllSubquery(pSql, pParentSql, pSupporter);
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(pParentSql);
return; return;
...@@ -937,7 +1014,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -937,7 +1014,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
quitAllSubquery(pParentSql, pSupporter); quitAllSubquery(pSql, pParentSql, pSupporter);
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(pParentSql);
...@@ -955,7 +1032,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -955,7 +1032,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
quitAllSubquery(pParentSql, pSupporter); quitAllSubquery(pSql, pParentSql, pSupporter);
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(pParentSql);
...@@ -1009,9 +1086,9 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -1009,9 +1086,9 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
return; return;
} }
if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
return; return;
} }
tscDebug("%p all subquery retrieve ts complete, do ts block intersect", pParentSql); tscDebug("%p all subquery retrieve ts complete, do ts block intersect", pParentSql);
...@@ -1049,6 +1126,17 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR ...@@ -1049,6 +1126,17 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, pParentSql->res.code);
quitAllSubquery(pSql, pParentSql, pSupporter);
tscAsyncResultOnError(pParentSql);
return;
}
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
assert(numOfRows == taos_errno(pSql)); assert(numOfRows == taos_errno(pSql));
...@@ -1088,9 +1176,8 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR ...@@ -1088,9 +1176,8 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
} }
} }
assert(pState->numOfRemain > 0); if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) { tscDebug("%p sub:%p,%d completed, total:%d", pParentSql, tres, pSupporter->subqueryIndex, pState->numOfSub);
tscDebug("%p sub:%p completed, remain:%d, total:%d", pParentSql, tres, pState->numOfRemain, pState->numOfSub);
return; return;
} }
...@@ -1205,15 +1292,16 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { ...@@ -1205,15 +1292,16 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
} }
} }
// get the number of subquery that need to retrieve the next vnode.
if (orderedPrjQuery) { if (orderedPrjQuery) {
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) { if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) {
pSql->subState.numOfRemain++; subquerySetState(pSub, &pSql->subState, i, 0);
} }
} }
} }
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
...@@ -1270,7 +1358,19 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { ...@@ -1270,7 +1358,19 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
// retrieve data from current vnode. // retrieve data from current vnode.
tscDebug("%p retrieve data from %d subqueries", pSql, numOfFetch); tscDebug("%p retrieve data from %d subqueries", pSql, numOfFetch);
SJoinSupporter* pSupporter = NULL; SJoinSupporter* pSupporter = NULL;
pSql->subState.numOfRemain = numOfFetch;
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSql1 = pSql->pSubs[i];
if (pSql1 == NULL) {
continue;
}
SSqlRes* pRes1 = &pSql1->res;
if (pRes1->row >= pRes1->numOfRows) {
subquerySetState(pSql1, &pSql->subState, i, 0);
}
}
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSql1 = pSql->pSubs[i]; SSqlObj* pSql1 = pSql->pSubs[i];
...@@ -1372,7 +1472,8 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { ...@@ -1372,7 +1472,8 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
// retrieve actual query results from vnode during the second stage join subquery // retrieve actual query results from vnode during the second stage join subquery
if (pParentSql->res.code != TSDB_CODE_SUCCESS) { if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code, pParentSql->res.code); tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code, pParentSql->res.code);
quitAllSubquery(pParentSql, pSupporter); quitAllSubquery(pSql, pParentSql, pSupporter);
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(pParentSql);
return; return;
...@@ -1384,7 +1485,8 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { ...@@ -1384,7 +1485,8 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
tscError("%p abort query, code:%s, global code:%s", pSql, tstrerror(code), tstrerror(pParentSql->res.code)); tscError("%p abort query, code:%s, global code:%s", pSql, tstrerror(code), tstrerror(pParentSql->res.code));
pParentSql->res.code = code; pParentSql->res.code = code;
quitAllSubquery(pParentSql, pSupporter);
quitAllSubquery(pSql, pParentSql, pSupporter);
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(pParentSql);
return; return;
...@@ -1408,9 +1510,9 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { ...@@ -1408,9 +1510,9 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
// In case of consequence query from other vnode, do not wait for other query response here. // In case of consequence query from other vnode, do not wait for other query response here.
if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) { if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) {
if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
return; return;
} }
} }
tscSetupOutputColumnIndex(pParentSql); tscSetupOutputColumnIndex(pParentSql);
...@@ -1422,6 +1524,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { ...@@ -1422,6 +1524,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data
pSql->cmd.command = TSDB_SQL_FETCH; pSql->cmd.command = TSDB_SQL_FETCH;
tscProcessSql(pSql); tscProcessSql(pSql);
} else { // first retrieve from vnode during the secondary stage sub-query } else { // first retrieve from vnode during the secondary stage sub-query
// set the command flag must be after the semaphore been correctly set. // set the command flag must be after the semaphore been correctly set.
...@@ -1457,8 +1560,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter ...@@ -1457,8 +1560,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
pSql->pSubs[pSql->subState.numOfRemain++] = pNew; pSql->pSubs[tableIndex] = pNew;
assert(pSql->subState.numOfRemain <= pSql->subState.numOfSub);
if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) { if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
addGroupInfoForSubquery(pSql, pNew, 0, tableIndex); addGroupInfoForSubquery(pSql, pNew, 0, tableIndex);
...@@ -1590,6 +1692,19 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { ...@@ -1590,6 +1692,19 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
pSql->subState.numOfSub = pQueryInfo->numOfTables; pSql->subState.numOfSub = pQueryInfo->numOfTables;
if (pSql->subState.states == NULL) {
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states));
if (pSql->subState.states == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
pthread_mutex_init(&pSql->subState.mutex, NULL);
}
memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub);
tscDebug("%p reset all sub states to 0", pSql);
bool hasEmptySub = false; bool hasEmptySub = false;
tscDebug("%p start subquery, total:%d", pSql, pQueryInfo->numOfTables); tscDebug("%p start subquery, total:%d", pSql, pQueryInfo->numOfTables);
...@@ -1622,14 +1737,25 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { ...@@ -1622,14 +1737,25 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
(*pSql->fp)(pSql->param, pSql, 0); (*pSql->fp)(pSql->param, pSql, 0);
} else { } else {
int fail = 0;
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
if (fail) {
(*pSub->fp)(pSub->param, pSub, 0);
continue;
}
if ((code = tscProcessSql(pSub)) != TSDB_CODE_SUCCESS) { if ((code = tscProcessSql(pSub)) != TSDB_CODE_SUCCESS) {
pSql->subState.numOfRemain = i - 1; // the already sent request will continue and do not go to the error process routine pRes->code = code;
break; (*pSub->fp)(pSub->param, pSub, 0);
fail = 1;
} }
} }
if(fail) {
return;
}
pSql->cmd.command = TSDB_SQL_TABLE_JOIN_RETRIEVE; pSql->cmd.command = TSDB_SQL_TABLE_JOIN_RETRIEVE;
} }
...@@ -1728,7 +1854,21 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { ...@@ -1728,7 +1854,21 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
return ret; return ret;
} }
pState->numOfRemain = pState->numOfSub; if (pState->states == NULL) {
pState->states = calloc(pState->numOfSub, sizeof(*pState->states));
if (pState->states == NULL) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscAsyncResultOnError(pSql);
tfree(pMemoryBuf);
return ret;
}
pthread_mutex_init(&pState->mutex, NULL);
}
memset(pState->states, 0, sizeof(*pState->states) * pState->numOfSub);
tscDebug("%p reset all sub states to 0", pSql);
pRes->code = TSDB_CODE_SUCCESS; pRes->code = TSDB_CODE_SUCCESS;
int32_t i = 0; int32_t i = 0;
...@@ -1877,7 +2017,6 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO ...@@ -1877,7 +2017,6 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
assert(pSql != NULL); assert(pSql != NULL);
SSubqueryState* pState = &pParentSql->subState; SSubqueryState* pState = &pParentSql->subState;
assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0);
// retrieved in subquery failed. OR query cancelled in retrieve phase. // retrieved in subquery failed. OR query cancelled in retrieve phase.
if (taos_errno(pSql) == TSDB_CODE_SUCCESS && pParentSql->res.code != TSDB_CODE_SUCCESS) { if (taos_errno(pSql) == TSDB_CODE_SUCCESS && pParentSql->res.code != TSDB_CODE_SUCCESS) {
...@@ -1908,14 +2047,12 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO ...@@ -1908,14 +2047,12 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
} }
} }
int32_t remain = -1; if (!subAndCheckDone(pSql, pParentSql, subqueryIndex)) {
if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) { tscDebug("%p sub:%p,%d freed, not finished, total:%d", pParentSql, pSql, trsupport->subqueryIndex, pState->numOfSub);
tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex,
pState->numOfSub - remain);
tscFreeRetrieveSup(pSql); tscFreeRetrieveSup(pSql);
return; return;
} }
// all subqueries are failed // all subqueries are failed
tscError("%p retrieve from %d vnode(s) completed,code:%s.FAILED.", pParentSql, pState->numOfSub, tscError("%p retrieve from %d vnode(s) completed,code:%s.FAILED.", pParentSql, pState->numOfSub,
...@@ -1980,14 +2117,12 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p ...@@ -1980,14 +2117,12 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
return; return;
} }
int32_t remain = -1; if (!subAndCheckDone(pSql, pParentSql, idx)) {
if ((remain = atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1)) > 0) { tscDebug("%p sub:%p orderOfSub:%d freed, not finished", pParentSql, pSql, trsupport->subqueryIndex);
tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex,
pState->numOfSub - remain);
tscFreeRetrieveSup(pSql); tscFreeRetrieveSup(pSql);
return; return;
} }
// all sub-queries are returned, start to local merge process // all sub-queries are returned, start to local merge process
pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage; pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage;
...@@ -2033,7 +2168,6 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR ...@@ -2033,7 +2168,6 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
SSqlObj * pParentSql = trsupport->pParentSql; SSqlObj * pParentSql = trsupport->pParentSql;
SSubqueryState* pState = &pParentSql->subState; SSubqueryState* pState = &pParentSql->subState;
assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
SVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; SVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
...@@ -2254,7 +2388,8 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -2254,7 +2388,8 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
} }
} }
if (atomic_sub_fetch_32(&pParentObj->subState.numOfRemain, 1) > 0) { if (!subAndCheckDone(tres, pParentObj, pSupporter->index)) {
tscDebug("%p insert:%p,%d completed, total:%d", pParentObj, tres, pSupporter->index, pParentObj->subState.numOfSub);
return; return;
} }
...@@ -2288,6 +2423,8 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -2288,6 +2423,8 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, pSql->cmd.clauseIndex, 0); STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, pSql->cmd.clauseIndex, 0);
tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL); tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL);
subquerySetState(pSql, &pParentObj->subState, i, 0);
tscDebug("%p, failed sub:%d, %p", pParentObj, i, pSql); tscDebug("%p, failed sub:%d, %p", pParentObj, i, pSql);
} }
} }
...@@ -2302,7 +2439,6 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -2302,7 +2439,6 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
} }
pParentObj->cmd.parseFinished = false; pParentObj->cmd.parseFinished = false;
pParentObj->subState.numOfRemain = numOfFailed;
tscResetSqlCmdObj(&pParentObj->cmd); tscResetSqlCmdObj(&pParentObj->cmd);
...@@ -2378,7 +2514,19 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { ...@@ -2378,7 +2514,19 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
// the number of already initialized subqueries // the number of already initialized subqueries
int32_t numOfSub = 0; int32_t numOfSub = 0;
pSql->subState.numOfRemain = pSql->subState.numOfSub; if (pSql->subState.states == NULL) {
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states));
if (pSql->subState.states == NULL) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
pthread_mutex_init(&pSql->subState.mutex, NULL);
}
memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub);
tscDebug("%p reset all sub states to 0", pSql);
pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
if (pSql->pSubs == NULL) { if (pSql->pSubs == NULL) {
goto _error; goto _error;
......
...@@ -441,6 +441,12 @@ static void tscFreeSubobj(SSqlObj* pSql) { ...@@ -441,6 +441,12 @@ static void tscFreeSubobj(SSqlObj* pSql) {
pSql->pSubs[i] = NULL; pSql->pSubs[i] = NULL;
} }
if (pSql->subState.states) {
pthread_mutex_destroy(&pSql->subState.mutex);
}
tfree(pSql->subState.states);
pSql->subState.numOfSub = 0; pSql->subState.numOfSub = 0;
} }
......
...@@ -15,6 +15,7 @@ print =============== step1 - login ...@@ -15,6 +15,7 @@ print =============== step1 - login
system_content curl 127.0.0.1:7111/rest/ system_content curl 127.0.0.1:7111/rest/
print 1-> $system_content print 1-> $system_content
if $system_content != @{"status":"error","code":4357,"desc":"no auth info input"}@ then if $system_content != @{"status":"error","code":4357,"desc":"no auth info input"}@ then
print $system_content
return -1 return -1
endi endi
......
...@@ -415,6 +415,7 @@ sql select count(join_mt0.c1), sum(join_mt1.c2), first(join_mt0.c5), last(join_m ...@@ -415,6 +415,7 @@ sql select count(join_mt0.c1), sum(join_mt1.c2), first(join_mt0.c5), last(join_m
$val = 100 $val = 100
if $rows != $val then if $rows != $val then
print $rows
return -1 return -1
endi endi
...@@ -514,4 +515,4 @@ sql drop table tm2; ...@@ -514,4 +515,4 @@ sql drop table tm2;
sql select count(*) from m1, m2 where m1.ts=m2.ts and m1.b=m2.a; sql select count(*) from m1, m2 where m1.ts=m2.ts and m1.b=m2.a;
sql drop database ux1; sql drop database ux1;
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册