提交 9216e543 编写于 作者: B Benguang Zhao

enh: 1. let doCleanupSubqueries be idempotent

     2. access pSql->subState.numOfSub in the same way
上级 a855a50b
......@@ -50,7 +50,7 @@ void tscUnlockByThread(int64_t *lockedBy);
int tsInsertInitialCheck(SSqlObj *pSql);
void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs);
void doCleanupSubqueries(SSqlObj *pSql);
void tscFreeRetrieveSup(void **param);
......
......@@ -694,6 +694,10 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
void freeJoinSubqueryObj(SSqlObj* pSql) {
pthread_mutex_lock(&pSql->subState.mutex);
if (pSql->subState.numOfSub == 0) {
goto _out;
}
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) {
......@@ -709,6 +713,8 @@ void freeJoinSubqueryObj(SSqlObj* pSql) {
tfree(pSql->pSubs);
tfree(pSql->subState.states);
pSql->subState.numOfSub = 0;
_out:
pthread_mutex_unlock(&pSql->subState.mutex);
}
......@@ -2227,17 +2233,18 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
tscAsyncResultOnError(pSql);
}
void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) {
assert(numOfSubs <= pSql->subState.numOfSub && numOfSubs >= 0);
for(int32_t i = 0; i < numOfSubs; ++i) {
void doCleanupSubqueries(SSqlObj *pSql) {
pthread_mutex_lock(&pSql->subState.mutex);
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
assert(pSub != NULL);
if (!pSub) continue;
pSql->pSubs[i] = NULL;
tscFreeRetrieveSup(&pSub->param);
taos_free_result(pSub);
}
pthread_mutex_unlock(&pSql->subState.mutex);
}
void tscLockByThread(int64_t *lockedBy) {
......@@ -2732,8 +2739,6 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSubqueryState *pState = &pSql->subState;
int32_t numOfSub = (pTableMetaInfo->pVgroupTables == NULL) ? pTableMetaInfo->vgroupList->numOfVgroups
: (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
int32_t ret = doReInitSubState(pSql, numOfSub);
......@@ -2751,11 +2756,11 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
return ret;
}
tscDebug("0x%"PRIx64" retrieved query data from %d vnode(s)", pSql->self, pState->numOfSub);
tscDebug("0x%"PRIx64" retrieved query data from %d vnode(s)", pSql->self, pSql->subState.numOfSub);
pRes->code = TSDB_CODE_SUCCESS;
int32_t i = 0;
for (; i < pState->numOfSub; ++i) {
for (; i < pSql->subState.numOfSub; ++i) {
SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport));
if (trs == NULL) {
tscError("0x%"PRIx64" failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno));
......@@ -2795,24 +2800,19 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" create subquery success. orderOfSub:%d", pSql->self, pNew->self,
trs->subqueryIndex);
}
if (i < pState->numOfSub) {
if (i < pSql->subState.numOfSub) {
tscError("0x%"PRIx64" failed to prepare subquery structure and launch subqueries", pSql->self);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc, pState->numOfSub);
doCleanupSubqueries(pSql, i);
return pRes->code; // free all allocated resource
}
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc, pState->numOfSub);
doCleanupSubqueries(pSql, i);
if (pRes->code != TSDB_CODE_SUCCESS) {
tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc, pSql->subState.numOfSub);
doCleanupSubqueries(pSql);
return pRes->code;
}
doConcurrentlySendSubQueries(pSql);
return TSDB_CODE_SUCCESS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册