提交 23b677dc 编写于 作者: B Benguang Zhao

fix: refactor doInitSubState to doReInitSubState with old pSubs reset properly

上级 7546cd7c
...@@ -350,7 +350,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex, SSqlC ...@@ -350,7 +350,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex, SSqlC
int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid); int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid);
int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId); int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId);
int32_t doInitSubState(SSqlObj* pSql, int32_t numOfSubqueries); int32_t doReInitSubState(SSqlObj* pSql, int32_t numOfSubqueries);
void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex); void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex);
......
...@@ -202,7 +202,7 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -202,7 +202,7 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
ret = doInitSubState(pSql, numOfSub); ret = doReInitSubState(pSql, numOfSub);
if (ret != 0) { if (ret != 0) {
tscAsyncResultOnError(pSql); tscAsyncResultOnError(pSql);
return ret; return ret;
......
...@@ -2175,20 +2175,19 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { ...@@ -2175,20 +2175,19 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
assert((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0); assert((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
pSql->subState.numOfSub = pQueryInfo->numOfTables;
pthread_mutex_lock(&pSql->subState.mutex);
pSql->subState.numOfSub = pQueryInfo->numOfTables;
if (pSql->subState.states == NULL) { if (pSql->subState.states == NULL) {
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states)); pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states));
if (pSql->subState.states == NULL) { if (pSql->subState.states == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error; goto _error;
} }
pthread_mutex_init(&pSql->subState.mutex, NULL);
} }
memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub); memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub);
tscDebug("0x%"PRIx64" reset all sub states to 0, start subquery, total:%d", pSql->self, pQueryInfo->numOfTables); tscDebug("0x%"PRIx64" reset all sub states to 0, start subquery, total:%d", pSql->self, pQueryInfo->numOfTables);
pthread_mutex_unlock(&pSql->subState.mutex);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, i); SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, i);
...@@ -2760,8 +2759,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { ...@@ -2760,8 +2759,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
int32_t numOfSub = (pTableMetaInfo->pVgroupTables == NULL) ? pTableMetaInfo->vgroupList->numOfVgroups int32_t numOfSub = (pTableMetaInfo->pVgroupTables == NULL) ? pTableMetaInfo->vgroupList->numOfVgroups
: (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); : (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
int32_t ret = doReInitSubState(pSql, numOfSub);
int32_t ret = doInitSubState(pSql, numOfSub);
if (ret != 0) { if (ret != 0) {
tscAsyncResultOnError(pSql); tscAsyncResultOnError(pSql);
return ret; return ret;
...@@ -3611,7 +3609,6 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { ...@@ -3611,7 +3609,6 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
tscDebug("0x%"PRIx64" sub:%p launch sub insert, orderOfSub:%d", pSql->self, pSub, j); tscDebug("0x%"PRIx64" sub:%p launch sub insert, orderOfSub:%d", pSql->self, pSub, j);
tscBuildAndSendRequest(pSub, NULL); tscBuildAndSendRequest(pSub, NULL);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_error: _error:
......
...@@ -4207,30 +4207,19 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { ...@@ -4207,30 +4207,19 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
taos_fetch_rows_a(tres, tscSubqueryRetrieveCallback, param); taos_fetch_rows_a(tres, tscSubqueryRetrieveCallback, param);
} }
int32_t doInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) { int32_t doReInitSubState(SSqlObj* pSql, int32_t numOfSubqueries) {
//bug fix. Above doInitSubState level, the loop invocation with the same SSqlObj will be fail. tscFreeSubobj(pSql);
//assert(pSql->subState.numOfSub == 0 && pSql->pSubs == NULL && pSql->subState.states == NULL);
if(pSql->pSubs) { pthread_mutex_lock(&pSql->subState.mutex);
free(pSql->pSubs); int32_t code = TSDB_CODE_SUCCESS;
pSql->pSubs = NULL;
}
if(pSql->subState.states) {
free(pSql->subState.states);
pSql->subState.states = NULL;
}
pSql->subState.numOfSub = numOfSubqueries; pSql->subState.numOfSub = numOfSubqueries;
pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t)); pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t));
if (pSql->pSubs == NULL || pSql->subState.states == NULL) {
int32_t code = pthread_mutex_init(&pSql->subState.mutex, NULL); code = TSDB_CODE_TSC_OUT_OF_MEMORY;
if (pSql->pSubs == NULL || pSql->subState.states == NULL || code != 0) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
pthread_mutex_unlock(&pSql->subState.mutex);
return TSDB_CODE_SUCCESS; return code;
} }
// do execute the query according to the query execution plan // do execute the query according to the query execution plan
...@@ -4255,7 +4244,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -4255,7 +4244,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
// upstream may be freed before retry // upstream may be freed before retry
if (pQueryInfo->pUpstream && taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // nest query. do execute it firstly if (pQueryInfo->pUpstream && taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // nest query. do execute it firstly
code = doInitSubState(pSql, (int32_t) taosArrayGetSize(pQueryInfo->pUpstream)); code = doReInitSubState(pSql, (int32_t) taosArrayGetSize(pQueryInfo->pUpstream));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册