提交 e4558a1f 编写于 作者: B Benguang Zhao

enh: detect conflicts during iterating over a list of subqueries optimistically

上级 40ddf1f1
...@@ -361,6 +361,7 @@ typedef struct SSubqueryState { ...@@ -361,6 +361,7 @@ typedef struct SSubqueryState {
int8_t *states; int8_t *states;
int32_t numOfSub; // the number of total sub-queries int32_t numOfSub; // the number of total sub-queries
uint64_t numOfRetrievedRows; // total number of points in this query uint64_t numOfRetrievedRows; // total number of points in this query
uint32_t version;
} SSubqueryState; } SSubqueryState;
typedef struct SSqlObj { typedef struct SSqlObj {
...@@ -440,8 +441,10 @@ typedef struct SSqlStream { ...@@ -440,8 +441,10 @@ typedef struct SSqlStream {
} SSqlStream; } SSqlStream;
SSqlObj* tscAllocSqlObj(); SSqlObj* tscAllocSqlObj();
SSqlObj* tscAcquireRefOfSubobj(SSqlObj *pSql, int32_t idx); uint32_t tscGetVersionOfSubStateWithoutLock(SSqlObj *pSql);
SSqlObj* tscAcquireRefOfSubobj(SSqlObj *pSql, int32_t idx, uint32_t stateVersion);
void tscReleaseRefOfSubobj(SSqlObj *pSql); void tscReleaseRefOfSubobj(SSqlObj *pSql);
void tscResetAllSubStates(SSqlObj* pSql);
void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable); void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable);
......
...@@ -538,6 +538,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -538,6 +538,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
int32_t numOfSub = 0; int32_t numOfSub = 0;
SJoinSupporter* pSupporter = NULL; SJoinSupporter* pSupporter = NULL;
bool success = true; bool success = true;
uint32_t stateVersion = 0;
{ pthread_mutex_lock(&pSql->subState.mutex); { pthread_mutex_lock(&pSql->subState.mutex);
...@@ -688,6 +689,8 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -688,6 +689,8 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
numOfCols, pQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name)); numOfCols, pQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name));
} }
stateVersion = tscGetVersionOfSubStateWithoutLock(pSql);
pthread_mutex_unlock(&pSql->subState.mutex); } pthread_mutex_unlock(&pSql->subState.mutex); }
//prepare the subqueries object failed, abort //prepare the subqueries object failed, abort
...@@ -695,23 +698,29 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -695,23 +698,29 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscError("0x%"PRIx64" failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d", pSql->self, tscError("0x%"PRIx64" failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d", pSql->self,
pSql->subState.numOfSub, pSql->res.code); pSql->subState.numOfSub, pSql->res.code);
freeJoinSubqueryObj(pSql); goto _error;
return pSql->res.code;
} }
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj *pSub = tscAcquireRefOfSubobj(pSql, i); // ACQ ref SSqlObj *pSub = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref
if (pSub == NULL) { if (pSub == NULL) {
if (stateVersion == tscGetVersionOfSubStateWithoutLock(pSql)) {
continue; continue;
} }
pSql->res.code = TSDB_CODE_FAILED;
tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub);
goto _error;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd); SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd);
executeQuery(pSub, pQueryInfo); executeQuery(pSub, pQueryInfo);
tscReleaseRefOfSubobj(pSub); // REL ref tscReleaseRefOfSubobj(pSub); // REL ref
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_error:
freeJoinSubqueryObj(pSql);
return pSql->res.code;
} }
void freeJoinSubqueryObj(SSqlObj* pSql) { void freeJoinSubqueryObj(SSqlObj* pSql) {
...@@ -719,6 +728,7 @@ void freeJoinSubqueryObj(SSqlObj* pSql) { ...@@ -719,6 +728,7 @@ void freeJoinSubqueryObj(SSqlObj* pSql) {
if (pSql->subState.numOfSub == 0) { if (pSql->subState.numOfSub == 0) {
goto _out; goto _out;
} }
pSql->subState.version ++;
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
...@@ -1414,9 +1424,17 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -1414,9 +1424,17 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
(*pParentSql->fp)(pParentSql->param, pParentSql, 0); (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
} else { } else {
uint32_t stateVersion = tscGetVersionOfSubStateWithoutLock(pParentSql);
for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) { for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) {
SSqlObj *psub = tscAcquireRefOfSubobj(pParentSql, m); // ACQ ref SSqlObj *psub = tscAcquireRefOfSubobj(pParentSql, m, stateVersion); // ACQ ref
if (!psub) continue; if (!psub) {
if (stateVersion == tscGetVersionOfSubStateWithoutLock(pParentSql)) {
continue;
}
tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub);
break;
}
// proceed to for ts_comp query // proceed to for ts_comp query
SSqlCmd* pSubCmd = &psub->cmd; SSqlCmd* pSubCmd = &psub->cmd;
...@@ -1428,7 +1446,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -1428,7 +1446,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
((SJoinSupporter*)psub->param)->pVgroupTables = tscVgroupTableInfoDup(pTableMetaInfo->pVgroupTables); ((SJoinSupporter*)psub->param)->pVgroupTables = tscVgroupTableInfoDup(pTableMetaInfo->pVgroupTables);
memset(pParentSql->subState.states, 0, sizeof(pParentSql->subState.states[0]) * pParentSql->subState.numOfSub); tscResetAllSubStates(pParentSql);
tscDebug("0x%"PRIx64" reset all sub states to 0", pParentSql->self); tscDebug("0x%"PRIx64" reset all sub states to 0", pParentSql->self);
issueTsCompQuery(psub, psub->param, pParentSql); issueTsCompQuery(psub, psub->param, pParentSql);
...@@ -1792,8 +1810,8 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { ...@@ -1792,8 +1810,8 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
if (numOfFetch <= 0) { if (numOfFetch <= 0) {
bool tryNextVnode = false; bool tryNextVnode = false;
bool orderedPrjQuery = false; bool orderedPrjQuery = false;
uint32_t stateVersion = 0;
{ pthread_mutex_lock(&pSql->subState.mutex); { pthread_mutex_lock(&pSql->subState.mutex);
...@@ -1819,13 +1837,20 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { ...@@ -1819,13 +1837,20 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
} }
} }
stateVersion = tscGetVersionOfSubStateWithoutLock(pSql);
pthread_mutex_unlock(&pSql->subState.mutex); } pthread_mutex_unlock(&pSql->subState.mutex); }
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i); // ACQ ref SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref
if (pSub == NULL) { if (pSub == NULL) {
if (stateVersion == tscGetVersionOfSubStateWithoutLock(pSql)) {
continue; continue;
} }
tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub);
pSql->res.code = TSDB_CODE_FAILED;
break;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd); SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSub->cmd);
...@@ -1879,7 +1904,10 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { ...@@ -1879,7 +1904,10 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
tscDebug("0x%"PRIx64" retrieve data from %d subqueries", pSql->self, numOfFetch); tscDebug("0x%"PRIx64" retrieve data from %d subqueries", pSql->self, numOfFetch);
SJoinSupporter* pSupporter = NULL; SJoinSupporter* pSupporter = NULL;
uint32_t stateVersion = 0;
{ pthread_mutex_lock(&pSql->subState.mutex); { pthread_mutex_lock(&pSql->subState.mutex);
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSql1 = pSql->pSubs[i]; SSqlObj* pSql1 = pSql->pSubs[i];
if (pSql1 == NULL) { if (pSql1 == NULL) {
...@@ -1891,13 +1919,20 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { ...@@ -1891,13 +1919,20 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
subquerySetStateWithoutLock(pSql1, &pSql->subState, i, 0); subquerySetStateWithoutLock(pSql1, &pSql->subState, i, 0);
} }
} }
stateVersion = tscGetVersionOfSubStateWithoutLock(pSql);
pthread_mutex_unlock(&pSql->subState.mutex); } pthread_mutex_unlock(&pSql->subState.mutex); }
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSql1 = tscAcquireRefOfSubobj(pSql, i); // ACQ ref SSqlObj* pSql1 = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref
if (pSql1 == NULL) { if (pSql1 == NULL) {
if (stateVersion == tscGetVersionOfSubStateWithoutLock(pSql)) {
continue; continue;
} }
tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub);
break;
}
SSqlRes* pRes1 = &pSql1->res; SSqlRes* pRes1 = &pSql1->res;
SSqlCmd* pCmd1 = &pSql1->cmd; SSqlCmd* pCmd1 = &pSql1->cmd;
...@@ -2228,6 +2263,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { ...@@ -2228,6 +2263,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
goto _error; goto _error;
} }
uint32_t stateVersion = 0;
int errflag = 0; int errflag = 0;
{ pthread_mutex_lock(&pSql->subState.mutex); { pthread_mutex_lock(&pSql->subState.mutex);
...@@ -2256,8 +2292,12 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { ...@@ -2256,8 +2292,12 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
} }
} }
stateVersion = tscGetVersionOfSubStateWithoutLock(pSql);
pthread_mutex_unlock(&pSql->subState.mutex); } pthread_mutex_unlock(&pSql->subState.mutex); }
if (errflag) goto _error; if (errflag) {
goto _error;
}
if (pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { // at least one subquery is empty, do nothing and return if (pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { // at least one subquery is empty, do nothing and return
freeJoinSubqueryObj(pSql); freeJoinSubqueryObj(pSql);
...@@ -2265,8 +2305,14 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { ...@@ -2265,8 +2305,14 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
} else { } else {
int fail = 0; int fail = 0;
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i); // ACQ ref SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref
if (!pSub) continue; if (!pSub) {
if (stateVersion == tscGetVersionOfSubStateWithoutLock(pSql)) {
continue;
}
code = TSDB_CODE_FAILED;
goto _error;
}
if (fail) { if (fail) {
(*pSub->fp)(pSub->param, pSub, 0); (*pSub->fp)(pSub->param, pSub, 0);
continue; continue;
...@@ -2289,7 +2335,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { ...@@ -2289,7 +2335,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
return; return;
_error: _error:
pRes->code = code; pRes->code = code;
tscAsyncResultOnError(pSql); tscAsyncResultOnError(pSql);
} }
...@@ -2727,14 +2773,14 @@ typedef struct SPair { ...@@ -2727,14 +2773,14 @@ typedef struct SPair {
static void doSendQueryReqs(SSchedMsg* pSchedMsg) { static void doSendQueryReqs(SSchedMsg* pSchedMsg) {
SSqlObj* pSql = pSchedMsg->ahandle; SSqlObj* pSql = pSchedMsg->ahandle;
SPair* p = pSchedMsg->msg; SPair* p = pSchedMsg->msg;
uint32_t stateVersion = tscGetVersionOfSubStateWithoutLock(pSql);
for (int32_t i = p->first; i < p->second; ++i) { for (int32_t i = p->first; i < p->second; ++i) {
if (i >= pSql->subState.numOfSub) { SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref
tfree(p); if (!pSub) {
return; tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub);
break;
} }
SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i); // ACQ ref
if (!pSub) continue;
SRetrieveSupport* pSupport = pSub->param; SRetrieveSupport* pSupport = pSub->param;
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" launch subquery, orderOfSub:%d.", pSql->self, pSub->self, pSupport->subqueryIndex); tscDebug("0x%"PRIx64" sub:0x%"PRIx64" launch subquery, orderOfSub:%d.", pSql->self, pSub->self, pSupport->subqueryIndex);
...@@ -3568,10 +3614,12 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { ...@@ -3568,10 +3614,12 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
return pRes->code; return pRes->code;
} }
uint32_t stateVersion = tscGetVersionOfSubStateWithoutLock(pSql);
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i); // ACQ ref SSqlObj* pSub = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref
if (!pSub) { if (!pSub) {
tscError("0x%"PRIx64" the %d'th one of (num:%d) sub queries is null.", pSql->self, i, pSql->subState.numOfSub); tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub);
pRes->code = TSDB_CODE_FAILED; pRes->code = TSDB_CODE_FAILED;
return pRes->code; return pRes->code;
} }
...@@ -3597,6 +3645,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { ...@@ -3597,6 +3645,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
goto _error; goto _error;
} }
uint32_t stateVersion = 0;
int32_t numOfSub = 0; int32_t numOfSub = 0;
int errflag = 0; int errflag = 0;
...@@ -3649,15 +3698,25 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { ...@@ -3649,15 +3698,25 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
errflag = 1; errflag = 1;
} }
stateVersion = tscGetVersionOfSubStateWithoutLock(pSql);
pthread_mutex_unlock(&pSql->subState.mutex); } pthread_mutex_unlock(&pSql->subState.mutex); }
if (errflag) { goto _error; } if (errflag) {
goto _error;
}
pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pSql, pCmd->insertParam.pDataBlocks); pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pSql, pCmd->insertParam.pDataBlocks);
// use the local variable // use the local variable
for (int32_t j = 0; j < pSql->subState.numOfSub; ++j) { for (int32_t j = 0; j < pSql->subState.numOfSub; ++j) {
SSqlObj *pSub = tscAcquireRefOfSubobj(pSql, j); // ACQ ref SSqlObj *pSub = tscAcquireRefOfSubobj(pSql, j, stateVersion); // ACQ ref
if (!pSub) continue; if (!pSub) {
if (stateVersion == tscGetVersionOfSubStateWithoutLock(pSql)) {
continue;
}
tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub);
return TSDB_CODE_FAILED;
}
tscDebug("0x%"PRIx64" sub:%p launch sub insert, orderOfSub:%d", pSql->self, pSub, j); tscDebug("0x%"PRIx64" sub:%p launch sub insert, orderOfSub:%d", pSql->self, pSub, j);
tscBuildAndSendRequest(pSub, NULL); tscBuildAndSendRequest(pSub, NULL);
tscReleaseRefOfSubobj(pSub); // REL ref tscReleaseRefOfSubobj(pSub); // REL ref
......
...@@ -1682,6 +1682,7 @@ void tscFreeSubobj(SSqlObj* pSql) { ...@@ -1682,6 +1682,7 @@ void tscFreeSubobj(SSqlObj* pSql) {
if (pSql->subState.numOfSub == 0) { if (pSql->subState.numOfSub == 0) {
goto _out; goto _out;
} }
pSql->subState.version ++;
tscDebug("0x%"PRIx64" start to free sub SqlObj, numOfSub:%d", pSql->self, pSql->subState.numOfSub); tscDebug("0x%"PRIx64" start to free sub SqlObj, numOfSub:%d", pSql->self, pSql->subState.numOfSub);
...@@ -1749,12 +1750,13 @@ SSqlObj* tscAllocSqlObj() { ...@@ -1749,12 +1750,13 @@ SSqlObj* tscAllocSqlObj() {
return pNew; return pNew;
} }
SSqlObj* tscAcquireRefOfSubobj(SSqlObj* pSql, int32_t idx) { SSqlObj* tscAcquireRefOfSubobj(SSqlObj* pSql, int32_t idx, uint32_t stateVersion) {
assert (pSql != NULL); assert (pSql != NULL);
SSqlObj *pSub = NULL; SSqlObj *pSub = NULL;
pthread_mutex_lock(&pSql->subState.mutex); { pthread_mutex_lock(&pSql->subState.mutex);
if (idx < 0 || if (stateVersion != tscGetVersionOfSubStateWithoutLock(pSql) ||
idx < 0 ||
idx >= pSql->subState.numOfSub || idx >= pSql->subState.numOfSub ||
!pSql->pSubs[idx]) { !pSql->pSubs[idx]) {
goto _out; goto _out;
...@@ -1763,7 +1765,7 @@ SSqlObj* tscAcquireRefOfSubobj(SSqlObj* pSql, int32_t idx) { ...@@ -1763,7 +1765,7 @@ SSqlObj* tscAcquireRefOfSubobj(SSqlObj* pSql, int32_t idx) {
assert (pSql->pSubs[idx] == pSub && "Refcounted subquery obj mismatch"); assert (pSql->pSubs[idx] == pSub && "Refcounted subquery obj mismatch");
_out: _out:
pthread_mutex_unlock(&pSql->subState.mutex); pthread_mutex_unlock(&pSql->subState.mutex); }
return pSub; return pSub;
} }
...@@ -1772,6 +1774,16 @@ void tscReleaseRefOfSubobj(SSqlObj* pSub) { ...@@ -1772,6 +1774,16 @@ void tscReleaseRefOfSubobj(SSqlObj* pSub) {
taosReleaseRef(tscObjRef, pSub->self); taosReleaseRef(tscObjRef, pSub->self);
} }
uint32_t tscGetVersionOfSubStateWithoutLock(SSqlObj *pSql) {
return pSql->subState.version;
}
void tscResetAllSubStates(SSqlObj* pSql) {
pthread_mutex_lock(&pSql->subState.mutex);
memset(pSql->subState.states, 0, sizeof(pSql->subState.states[0]) * pSql->subState.numOfSub);
pthread_mutex_unlock(&pSql->subState.mutex);
}
void tscFreeSqlObj(SSqlObj* pSql) { void tscFreeSqlObj(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) { if (pSql == NULL || pSql->signature != pSql) {
return; return;
...@@ -4240,6 +4252,7 @@ int32_t doReInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) { ...@@ -4240,6 +4252,7 @@ int32_t doReInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
pSql->subState.numOfSub = numOfSubqueries; pSql->subState.numOfSub = numOfSubqueries;
pSql->subState.version ++;
pthread_mutex_unlock(&pSql->subState.mutex); } pthread_mutex_unlock(&pSql->subState.mutex); }
return code; return code;
...@@ -4272,6 +4285,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -4272,6 +4285,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
goto _error; goto _error;
} }
uint32_t stateVersion = 0;
int errflag = 0; int errflag = 0;
{ pthread_mutex_lock(&pSql->subState.mutex); { pthread_mutex_lock(&pSql->subState.mutex);
...@@ -4327,12 +4341,23 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -4327,12 +4341,23 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
numOfInit++; numOfInit++;
} }
stateVersion = tscGetVersionOfSubStateWithoutLock(pSql);
pthread_mutex_unlock(&pSql->subState.mutex); } pthread_mutex_unlock(&pSql->subState.mutex); }
if (errflag) { goto _error; } if (errflag) {
goto _error;
}
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* psub = tscAcquireRefOfSubobj(pSql, i); // ACQ REF SSqlObj* psub = tscAcquireRefOfSubobj(pSql, i, stateVersion); // ACQ ref
if (!psub) continue; if (!psub) {
if (stateVersion == tscGetVersionOfSubStateWithoutLock(pSql)) {
continue;
}
tscError("0x%"PRIx64"subqueries objs reset unexpectedly. numOfSub:%d", pSql->self, pSql->subState.numOfSub);
code = TSDB_CODE_FAILED;
goto _error;
}
// create sub query to handle the sub query. // create sub query to handle the sub query.
SQueryInfo* pq = tscGetQueryInfo(&psub->cmd); SQueryInfo* pq = tscGetQueryInfo(&psub->cmd);
...@@ -4343,7 +4368,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -4343,7 +4368,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
} }
executeQuery(psub, pq); executeQuery(psub, pq);
tscReleaseRefOfSubobj(psub); // REL REF tscReleaseRefOfSubobj(psub); // REL ref
} }
return; return;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册