diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 5808d0705cfc15b68bf1bb8b487797d501e5f863..7759ab9f4ad8fb81d1c95f5362abcbc70f7ab928 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -3533,34 +3533,17 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; } - pSql->subState.numOfSub = (uint16_t)taosArrayGetSize(pCmd->insertParam.pDataBlocks); - assert(pSql->subState.numOfSub > 0); - pRes->code = TSDB_CODE_SUCCESS; - // the number of already initialized subqueries - int32_t numOfSub = 0; - - if (pSql->subState.states == NULL) { - pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(*pSql->subState.states)); - if (pSql->subState.states == NULL) { - pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - goto _error; - } - - pthread_mutex_init(&pSql->subState.mutex, NULL); - } - - memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub); - tscDebug("0x%"PRIx64" reset all sub states to 0", pSql->self); - - pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES); - if (pSql->pSubs == NULL) { + int32_t code = doReInitSubState(pSql, (int32_t)taosArrayGetSize(pCmd->insertParam.pDataBlocks)); + if (code != TSDB_CODE_SUCCESS) { goto _error; } + assert(pSql->subState.numOfSub > 0); tscDebug("0x%"PRIx64" submit data to %d vnode(s)", pSql->self, pSql->subState.numOfSub); + int32_t numOfSub = 0; while(numOfSub < pSql->subState.numOfSub) { SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter)); if (pSupporter == NULL) { @@ -3611,7 +3594,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { } return TSDB_CODE_SUCCESS; - _error: +_error: return TSDB_CODE_TSC_OUT_OF_MEMORY; }