提交 4cbcf232 编写于 作者: B Benguang Zhao

enh: 1. let doCleanupSubqueries be idempotent

     2. access pSql->subState.numOfSub in the same way
上级 5e918eaa
......@@ -50,7 +50,7 @@ void tscUnlockByThread(int64_t *lockedBy);
int tsInsertInitialCheck(SSqlObj *pSql);
void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs);
void doCleanupSubqueries(SSqlObj *pSql);
void tscFreeRetrieveSup(void **param);
......
......@@ -195,7 +195,6 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
return TSDB_CODE_VND_INVALID_VGROUP_ID;
}
SSubqueryState *pState = &pSql->subState;
int32_t numOfSub = pTableMetaInfo->vgroupList->numOfVgroups;
if(numOfSub == 0) {
tscInfo(":CDEL SQL:%p tablename=%s numOfVgroups is zero, maybe empty table.", pSql, pTableMetaInfo->name.tname);
......@@ -203,16 +202,16 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
}
ret = doReInitSubState(pSql, numOfSub);
if (ret != 0) {
if (ret != TSDB_CODE_SUCCESS) {
tscAsyncResultOnError(pSql);
return ret;
}
tscDebug("0x%"PRIx64":CDEL retrieved query data from %d vnode(s)", pSql->self, pState->numOfSub);
tscDebug("0x%"PRIx64":CDEL retrieved query data from %d vnode(s)", pSql->self, pSql->subState.numOfSub);
pRes->code = TSDB_CODE_SUCCESS;
int32_t i;
for (i = 0; i < pState->numOfSub; ++i) {
for (i = 0; i < pSql->subState.numOfSub; ++i) {
// vgroup
SVgroupMsg* pVgroupMsg = &pTableMetaInfo->vgroupList->vgroups[i];
......@@ -234,23 +233,18 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
}
pSql->pSubs[i] = pNew;
}
if (i < pState->numOfSub) {
if (i < pSql->subState.numOfSub) {
tscError("0x%"PRIx64":CDEL failed to prepare subdelete structure and launch subqueries", pSql->self);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
doCleanupSubqueries(pSql, i);
return pRes->code; // free all allocated resource
}
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
doCleanupSubqueries(pSql, i);
if (pRes->code != TSDB_CODE_SUCCESS) {
doCleanupSubqueries(pSql);
return pRes->code;
}
// send sub sql
doConcurrentlySendSubQueries(pSql);
//return TSDB_CODE_TSC_QUERY_CANCELLED;
return TSDB_CODE_SUCCESS;
}
......@@ -694,6 +694,10 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
void freeJoinSubqueryObj(SSqlObj* pSql) {
pthread_mutex_lock(&pSql->subState.mutex);
if (pSql->subState.numOfSub == 0) {
goto _out;
}
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) {
......@@ -709,6 +713,8 @@ void freeJoinSubqueryObj(SSqlObj* pSql) {
tfree(pSql->pSubs);
tfree(pSql->subState.states);
pSql->subState.numOfSub = 0;
_out:
pthread_mutex_unlock(&pSql->subState.mutex);
}
......@@ -2227,23 +2233,17 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
tscAsyncResultOnError(pSql);
}
void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) {
void doCleanupSubqueries(SSqlObj *pSql) {
pthread_mutex_lock(&pSql->subState.mutex);
if (numOfSubs > pSql->subState.numOfSub || numOfSubs <= 0 || pSql->subState.numOfSub <= 0) {
goto _out;
}
for(int32_t i = 0; i < numOfSubs; ++i) {
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
pSql->pSubs[i] = NULL;
if (!pSub) continue;
pSql->pSubs[i] = NULL;
tscFreeRetrieveSup(&pSub->param);
taos_free_result(pSub);
}
_out:
pthread_mutex_unlock(&pSql->subState.mutex);
}
......@@ -2739,8 +2739,6 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSubqueryState *pState = &pSql->subState;
int32_t numOfSub = (pTableMetaInfo->pVgroupTables == NULL) ? pTableMetaInfo->vgroupList->numOfVgroups
: (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
int32_t ret = doReInitSubState(pSql, numOfSub);
......@@ -2758,11 +2756,11 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
return ret;
}
tscDebug("0x%"PRIx64" retrieved query data from %d vnode(s)", pSql->self, pState->numOfSub);
tscDebug("0x%"PRIx64" retrieved query data from %d vnode(s)", pSql->self, pSql->subState.numOfSub);
pRes->code = TSDB_CODE_SUCCESS;
int32_t i = 0;
for (; i < pState->numOfSub; ++i) {
for (; i < pSql->subState.numOfSub; ++i) {
SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport));
if (trs == NULL) {
tscError("0x%"PRIx64" failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno));
......@@ -2802,24 +2800,19 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" create subquery success. orderOfSub:%d", pSql->self, pNew->self,
trs->subqueryIndex);
}
if (i < pState->numOfSub) {
if (i < pSql->subState.numOfSub) {
tscError("0x%"PRIx64" failed to prepare subquery structure and launch subqueries", pSql->self);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc, pState->numOfSub);
doCleanupSubqueries(pSql, i);
return pRes->code; // free all allocated resource
}
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc, pState->numOfSub);
doCleanupSubqueries(pSql, i);
if (pRes->code != TSDB_CODE_SUCCESS) {
tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc, pSql->subState.numOfSub);
doCleanupSubqueries(pSql);
return pRes->code;
}
doConcurrentlySendSubQueries(pSql);
return TSDB_CODE_SUCCESS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册