diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index d4f707311826bf161e4951a024f8ecd7c8bffbf1..59e942866737907e1d703320962220cb62e29cd9 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2180,43 +2180,56 @@ static void doFreeInsertSupporter(SSqlObj* pSqlObj) { tfree(pSql->param); } } +static bool doCheckAllSubObj(SSqlObj* pSqlObj, int *numOfRows, int8_t *buildSchema) { + assert(pSqlObj != NULL && pSqlObj->subState.numOfSub > 0); + bool res = true; + for(int32_t i = 0; i < pSqlObj->subState.numOfSub; ++i) { + SSqlObj* pSql = pSqlObj->pSubs[i]; + if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { + res = false; + if (pSql->cmd.submitSchema) { + *buildSchema = 1; + } + } else { + *numOfRows += pSql->res.numOfRows; + } + } + return res; +} static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) { SInsertSupporter *pSupporter = (SInsertSupporter *)param; SSqlObj* pParentObj = pSupporter->pSql; + SSqlObj* pSubObj = (SSqlObj*) tres; // record the total inserted rows if (numOfRows > 0) { - pParentObj->res.numOfRows += numOfRows; - } - - if (taos_errno(tres) != TSDB_CODE_SUCCESS) { - SSqlObj* pSql = (SSqlObj*) tres; - assert(pSql != NULL && pSql->res.code == numOfRows); - - pParentObj->res.code = pSql->res.code; - - // set the flag in the parent sqlObj - if (pSql->cmd.submitSchema) { - pParentObj->cmd.submitSchema = 1; - } + pSubObj->res.numOfRows = numOfRows; } if (atomic_sub_fetch_32(&pParentObj->subState.numOfRemain, 1) > 0) { return; } - + if (0 != atomic_load_32(&pParentObj->subState.numOfRemain)) { + tscError("%p impossible", pParentObj); + } + int totalRows = 0; + int8_t submitSchema = 0; + bool res = doCheckAllSubObj(pParentObj, &totalRows, &submitSchema); // restore user defined fp pParentObj->fp = pParentObj->fetchFp; + pParentObj->cmd.submitSchema = submitSchema; + pParentObj->res.numOfRows = totalRows; + int32_t numOfSub = pParentObj->subState.numOfSub; - - if (pParentObj->res.code == TSDB_CODE_SUCCESS) { + if (res == true) { tscDebug("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows); doFreeInsertSupporter(pParentObj); // todo remove this parameter in async callback function definition. // all data has been sent to vnode, call user function - int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows; + //int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows; + int32_t v = totalRows; (*pParentObj->fp)(pParentObj->param, pParentObj, v); } else { if (!needRetryInsert(pParentObj, numOfSub)) { @@ -2242,7 +2255,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) } tscError("%p Async insertion completed, total inserted:%d rows, numOfFailed:%d, numOfTotal:%d", pParentObj, - pParentObj->res.numOfRows, numOfFailed, numOfSub); + totalRows, numOfFailed, numOfSub); tscDebug("%p cleanup %d tableMeta in cache", pParentObj, pParentObj->cmd.numOfTables); for(int32_t i = 0; i < pParentObj->cmd.numOfTables; ++i) {