From d3e969d42894368654ff02d2016b9f2c4e216eed Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 20 Jun 2020 11:23:26 +0800 Subject: [PATCH] [td-225] refactor insert codes --- src/client/inc/tsclient.h | 2 +- src/client/src/tscAsync.c | 29 +++------ src/client/src/tscParseInsert.c | 100 ++++++++++++++----------------- src/client/src/tscSql.c | 11 ++-- src/client/src/tscStream.c | 4 +- src/client/src/tscSubquery.c | 101 ++++++++++++++++++++++---------- 6 files changed, 130 insertions(+), 117 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index bd956aeb74..90364987bb 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -349,7 +349,7 @@ typedef struct SSqlStream { int32_t tscInitRpc(const char *user, const char *secret, void** pDnodeConn); void tscInitMsgsFp(); -int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion); +int tsParseSql(SSqlObj *pSql, bool initialParse); void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet); int tscProcessSql(SSqlObj *pSql); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 3c54176d0a..2b99d23099 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -39,41 +39,26 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows); -int doAsyncParseSql(SSqlObj* pSql) { - SSqlCmd* pCmd = &pSql->cmd; - SSqlRes* pRes = &pSql->res; - - int32_t code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE); - if (code != TSDB_CODE_SUCCESS) { - tscError("failed to malloc payload"); - pSql->res.code = code; - - tscQueueAsyncRes(pSql); - return code; - } - - pRes->qhandle = 0; - pRes->numOfRows = 1; - - tscDump("%p SQL: %s", pSql, pSql->sqlstr); - return tsParseSql(pSql, true); -} - void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) { pSql->signature = pSql; pSql->param = param; pSql->pTscObj = pObj; pSql->maxRetry = TSDB_MAX_REPLICA_NUM; pSql->fp = fp; + pSql->sqlstr = calloc(1, sqlLen + 1); if (pSql->sqlstr == NULL) { tscError("%p failed to malloc sql string buffer", pSql); tscQueueAsyncError(pSql->fp, pSql->param, TSDB_CODE_TSC_OUT_OF_MEMORY); return; } + strtolower(pSql->sqlstr, sqlstr); - - int32_t code = doAsyncParseSql(pSql); + + tscDump("%p SQL: %s", pSql, pSql->sqlstr); + pSql->cmd.curSql = pSql->sqlstr; + + int32_t code = tsParseSql(pSql, true); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return; if (code != TSDB_CODE_SUCCESS) { diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 4b6178eb61..aa9204e62a 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1014,42 +1014,37 @@ static int32_t validateDataSource(SSqlCmd *pCmd, int8_t type, const char *sql) { * @param pSql * @return */ -int doParseInsertSql(SSqlObj *pSql, char *str) { +int tsParseInsertSql(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; + char* str = pCmd->curSql; int32_t totalNum = 0; int32_t code = TSDB_CODE_SUCCESS; - STableMetaInfo *pTableMetaInfo = NULL; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); assert(pQueryInfo != NULL); + STableMetaInfo *pTableMetaInfo = NULL; if (pQueryInfo->numOfTables == 0) { pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo); } else { pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); } - // TODO: 2048 is added because TSDB_MAX_TAGS_LEN now is 65536 - // but TSDB_PAYLOAD_SIZE is 65380 + // TODO: 2048 is added because TSDB_MAX_TAGS_LEN now is 65536, but TSDB_PAYLOAD_SIZE is 65380 if ((code = tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE + 2048)) != TSDB_CODE_SUCCESS) { return code; } - assert(((NULL == pCmd->curSql) && (NULL == pCmd->pTableList)) - || ((NULL != pCmd->curSql) && (NULL != pCmd->pTableList))); - - if ((NULL == pCmd->curSql) && (NULL == pCmd->pTableList)) { + if (NULL == pCmd->pTableList) { pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); pSql->cmd.pDataBlocks = tscCreateBlockArrayList(); if (NULL == pCmd->pTableList || NULL == pSql->cmd.pDataBlocks) { code = TSDB_CODE_TSC_OUT_OF_MEMORY; - goto _error_clean; + goto _error; } } else { - assert((NULL != pCmd->curSql) && (NULL != pCmd->pTableList)); str = pCmd->curSql; } @@ -1075,7 +1070,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { */ if (totalNum == 0) { code = TSDB_CODE_TSC_INVALID_SQL; - goto _error_clean; + goto _error; } else { break; } @@ -1086,11 +1081,11 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { // Check if the table name available or not if (validateTableName(sToken.z, sToken.n) != TSDB_CODE_SUCCESS) { code = tscInvalidSQLErrMsg(pCmd->payload, "table name invalid", sToken.z); - goto _error_clean; + goto _error; } if ((code = tscSetTableFullName(pTableMetaInfo, &sToken, pSql)) != TSDB_CODE_SUCCESS) { - goto _error_clean; + goto _error; } ptrdiff_t pos = pCmd->curSql - pSql->sqlstr; @@ -1103,20 +1098,19 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { * interrupted position. */ if (TSDB_CODE_TSC_ACTION_IN_PROGRESS == code) { - tscTrace("%p waiting for get table meta during insert, then resume from offset: %" PRId64 " , %s", pSql, + tscTrace("%p waiting for get table meta during insert, then resume from offset: %" PRId64 ", %s", pSql, pos, pCmd->curSql); return code; } - // todo add to return - tscError("%p async insert parse error, code:%d, %s", pSql, code, tstrerror(code)); + tscError("%p async insert parse error, code:%d, reason:%s", pSql, code, tstrerror(code)); pCmd->curSql = NULL; - goto _error_clean; // TODO: should _clean or _error_clean to async flow ???? + goto _error; } if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { code = tscInvalidSQLErrMsg(pCmd->payload, "insert data into super table is not supported", NULL); - goto _error_clean; + goto _error; } index = 0; @@ -1125,7 +1119,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { if (sToken.n == 0) { code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE required", sToken.z); - goto _error_clean; + goto _error; } STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); @@ -1137,7 +1131,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns); if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) { - goto _error_clean; + goto _error; } /* @@ -1146,11 +1140,11 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { */ code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum); if (code != TSDB_CODE_SUCCESS) { - goto _error_clean; + goto _error; } } else if (sToken.type == TK_FILE) { if (validateDataSource(pCmd, DATA_FROM_DATA_FILE, sToken.z) != TSDB_CODE_SUCCESS) { - goto _error_clean; + goto _error; } index = 0; @@ -1158,7 +1152,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { str += index; if (sToken.n == 0) { code = tscInvalidSQLErrMsg(pCmd->payload, "file path is required following keyword FILE", sToken.z); - goto _error_clean; + goto _error; } char fname[PATH_MAX] = {0}; @@ -1168,7 +1162,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { wordexp_t full_path; if (wordexp(fname, &full_path, 0) != 0) { code = tscInvalidSQLErrMsg(pCmd->payload, "invalid filename", sToken.z); - goto _error_clean; + goto _error; } strcpy(fname, full_path.we_wordv[0]); wordfree(&full_path); @@ -1179,7 +1173,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { int32_t ret = tscCreateDataBlock(PATH_MAX, tinfo.rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name, pTableMeta, &pDataBlock); if (ret != TSDB_CODE_SUCCESS) { - goto _error_clean; + goto _error; } tscAppendDataBlock(pCmd->pDataBlocks, pDataBlock); @@ -1190,7 +1184,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { SSchema * pSchema = tscGetTableSchema(pTableMeta); if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) { - goto _error_clean; + goto _error; } SParsedDataColInfo spd = {0}; @@ -1226,7 +1220,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { if (spd.hasVal[t] == true) { code = tscInvalidSQLErrMsg(pCmd->payload, "duplicated column name", sToken.z); - goto _error_clean; + goto _error; } spd.hasVal[t] = true; @@ -1237,13 +1231,13 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { if (!findColumnIndex) { code = tscInvalidSQLErrMsg(pCmd->payload, "invalid column name", sToken.z); - goto _error_clean; + goto _error; } } if (spd.numOfAssignedCols == 0 || spd.numOfAssignedCols > tinfo.numOfColumns) { code = tscInvalidSQLErrMsg(pCmd->payload, "column name expected", sToken.z); - goto _error_clean; + goto _error; } index = 0; @@ -1252,16 +1246,16 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { if (sToken.type != TK_VALUES) { code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES is expected", sToken.z); - goto _error_clean; + goto _error; } code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum); if (code != TSDB_CODE_SUCCESS) { - goto _error_clean; + goto _error; } } else { code = tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES or FILE are required", sToken.z); - goto _error_clean; + goto _error; } } @@ -1272,7 +1266,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { if (pCmd->pDataBlocks->nSize > 0) { // merge according to vgId if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) { - goto _error_clean; + goto _error; } } else { pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); @@ -1281,7 +1275,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { code = TSDB_CODE_SUCCESS; goto _clean; -_error_clean: +_error: pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); _clean: @@ -1294,7 +1288,7 @@ _clean: return code; } -int tsParseInsertSql(SSqlObj *pSql) { +int tsInsertInitialCheck(SSqlObj *pSql) { if (!pSql->pTscObj->writeAuth) { return TSDB_CODE_TSC_NO_WRITE_AUTH; } @@ -1312,30 +1306,24 @@ int tsParseInsertSql(SSqlObj *pSql) { SQueryInfo *pQueryInfo = NULL; tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo); - TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT); - TSDB_QUERY_SET_TYPE(pQueryInfo->type, pCmd->insertType); + TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT | pCmd->insertType); sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL); if (sToken.type != TK_INTO) { return tscInvalidSQLErrMsg(pCmd->payload, "keyword INTO is expected", sToken.z); } - return doParseInsertSql(pSql, pSql->sqlstr + index); + pCmd->curSql = sToken.z + sToken.n; + return TSDB_CODE_SUCCESS; } int tsParseSql(SSqlObj *pSql, bool initialParse) { int32_t ret = TSDB_CODE_SUCCESS; - - if (initialParse) { - assert(!pSql->cmd.parseFinished); + SSqlCmd* pCmd = &pSql->cmd; - char* p = pSql->sqlstr; - pSql->sqlstr = NULL; - - tscPartiallyFreeSqlObj(pSql); - pSql->sqlstr = p; - } else if (!pSql->cmd.parseFinished) { - tscTrace("continue parse sql: %s", pSql->cmd.curSql); + tscTrace("------------------%p, initial:%d, sqlstr:%s", pSql, initialParse, pSql->sqlstr); + if (!pCmd->parseFinished) { + tscTrace("%p resume to parse sql: %s", pSql, pCmd->curSql); } if (tscIsInsertData(pSql->sqlstr)) { @@ -1347,7 +1335,11 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) { pSql->fetchFp = pSql->fp; pSql->fp = (void(*)())tscHandleMultivnodeInsert; } - + + if (initialParse && ((ret = tsInsertInitialCheck(pSql)) != TSDB_CODE_SUCCESS)) { + return ret; + } + ret = tsParseInsertSql(pSql); } else { ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE); @@ -1362,11 +1354,9 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) { /* * the pRes->code may be modified or released by another thread in tscTableMetaCallBack function, - * so do NOT use pRes->code to determine if the getTableMeta/getMetricMeta function - * invokes new threads to get data from mnode or simply retrieves data from cache. - * - * do NOT assign return code to pRes->code for the same reason since it may be released by another thread - * pRes->code = ret; + * so do NOT use pRes->code to determine if the getTableMeta function + * invokes new threads to get data from mgmt node or simply retrieves data from cache. + * do NOT assign return code to pRes->code for the same reason since it may be released by another thread already. */ return ret; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 0fa841bc7c..47a602a6fb 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -488,7 +488,7 @@ static bool tscFreeQhandleInVnode(SSqlObj* pSql) { (pCmd->command == TSDB_SQL_SELECT && pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL)) { pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; - tscTrace("%p start to send msg to free qhandle in dnode, command:%s", pSql, sqlCmd[pCmd->command]); + tscTrace("%p send msg to dnode to free qhandle ASAP, command:%s", pSql, sqlCmd[pCmd->command]); pSql->freed = 1; tscProcessSql(pSql); @@ -510,18 +510,17 @@ static bool tscFreeQhandleInVnode(SSqlObj* pSql) { void taos_free_result(TAOS_RES *res) { SSqlObj *pSql = (SSqlObj *)res; - tscTrace("%p start to free result", res); - + if (pSql == NULL || pSql->signature != pSql) { - tscTrace("%p result has been freed", pSql); + tscTrace("%p sqlObj has been freed", pSql); return; } // The semaphore can not be changed while freeing async sub query objects. SSqlRes *pRes = &pSql->res; if (pRes == NULL || pRes->qhandle == 0) { - tscTrace("%p SqlObj is freed by app, qhandle is null", pSql); tscFreeSqlObj(pSql); + tscTrace("%p SqlObj is freed by app, qhandle is null", pSql); return; } @@ -529,6 +528,7 @@ void taos_free_result(TAOS_RES *res) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); if (pQueryInfo == NULL) { tscFreeSqlObj(pSql); + tscTrace("%p SqlObj is freed by app", pSql); return; } @@ -713,7 +713,6 @@ int taos_validate_sql(TAOS *taos, const char *sql) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; - pRes->numOfRows = 1; pRes->numOfTotal = 0; pRes->numOfClauseTotal = 0; diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 6fc934b6c0..86a41b7ba4 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -503,8 +503,10 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p } strtolower(pSql->sqlstr, sqlstr); + tscDump("%p SQL: %s", pSql, pSql->sqlstr); tsem_init(&pSql->rspSem, 0, 0); - int32_t code = doAsyncParseSql(pSql); + + int32_t code = tsParseSql(pSql, true); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { sem_wait(&pSql->rspSem); } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index aaa5ab291f..ecac6fed1c 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1165,8 +1165,8 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); pSql->res.qhandle = 0x1; - pSql->res.numOfRows = 0; - + assert(pSql->res.numOfRows == 0); + if (pSql->pSubs == NULL) { pSql->pSubs = calloc(pSupporter->pState->numOfTotal, POINTER_BYTES); if (pSql->pSubs == NULL) { @@ -1364,7 +1364,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { tOrderDescriptor *pDesc = NULL; SColumnModel * pModel = NULL; - pRes->qhandle = 1; // hack the qhandle check + pRes->qhandle = 0x1; // hack the qhandle check const uint32_t nBufferSize = (1u << 16); // 64KB @@ -1845,34 +1845,41 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { } } -static void multiVnodeInsertMerge(void* param, TAOS_RES* tres, int numOfRows) { +static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) { SInsertSupporter *pSupporter = (SInsertSupporter *)param; SSqlObj* pParentObj = pSupporter->pSql; SSqlCmd* pParentCmd = &pParentObj->cmd; SSubqueryState* pState = pSupporter->pState; - // increase the total inserted rows + // record the total inserted rows if (numOfRows > 0) { - pParentObj->res.numOfRows += numOfRows; + if (tres != pParentObj) { + pParentObj->res.numOfRows += numOfRows; + } } else { SSqlObj* pSql = (SSqlObj*) tres; assert(pSql != NULL && pSql->res.code == numOfRows); pParentObj->res.code = pSql->res.code; } - - taos_free_result(tres); + + // it is not the initial sqlObj, free it + if (tres != pParentObj) { + taos_free_result(tres); + } else { + assert(pParentObj->pSubs[0] == tres); + } + + tfree(pSupporter); if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) { return; } tscTrace("%p Async insertion completed, total inserted:%" PRId64, pParentObj, pParentObj->res.numOfRows); - - tfree(pState); - tfree(pSupporter); - + // release data block data + tfree(pState); pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks); // restore user defined fp @@ -1886,9 +1893,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; - pRes->qhandle = 1; // hack the qhandle check SDataBlockList *pDataBlocks = pCmd->pDataBlocks; - pSql->pSubs = calloc(pDataBlocks->nSize, POINTER_BYTES); pSql->numOfSubs = pDataBlocks->nSize; assert(pDataBlocks->nSize > 0); @@ -1899,27 +1904,50 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { pState->numOfRemain = pState->numOfTotal; pRes->code = TSDB_CODE_SUCCESS; - - int32_t i = 0; + + SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter)); + pSupporter->pSql = pSql; + pSupporter->pState = pState; + + pSql->fp = multiVnodeInsertFinalize; + pSql->param = pSupporter; + pSql->pSubs[0] = pSql; // the first sub insert points back to itself + tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pSql, 0); + + int32_t code = tscCopyDataBlockToPayload(pSql, pDataBlocks->pData[0]); + if (code != TSDB_CODE_SUCCESS) { + tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", pSql, 0, + pDataBlocks->nSize, code); + goto _error; + } + + int32_t i = 1; for (; i < pSql->numOfSubs; ++i) { - SInsertSupporter* pSupporter = calloc(1, sizeof(SInsertSupporter)); - pSupporter->pSql = pSql; - pSupporter->pState = pState; + SInsertSupporter* pSupporter1 = calloc(1, sizeof(SInsertSupporter)); + pSupporter1->pSql = pSql; + pSupporter1->pState = pState; - SSqlObj *pNew = createSubqueryObj(pSql, 0, multiVnodeInsertMerge, pSupporter, TSDB_SQL_INSERT, NULL); + SSqlObj *pNew = createSubqueryObj(pSql, 0, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT, NULL); if (pNew == NULL) { tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); - break; + goto _error; } /* * assign the callback function to fetchFp to make sure that the error process function can restore - * the callback function (multiVnodeInsertMerge) correctly. + * the callback function (multiVnodeInsertFinalize) correctly. */ pNew->fetchFp = pNew->fp; pSql->pSubs[i] = pNew; - tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, i); + code = tscCopyDataBlockToPayload(pNew, pDataBlocks->pData[i]); + if (code != TSDB_CODE_SUCCESS) { + tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", pSql, i, + pDataBlocks->nSize, code); + goto _error; + } else { + tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, i); + } } if (i < pSql->numOfSubs) { @@ -1927,21 +1955,30 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; return pRes->code; // free all allocated resource } - - for (int32_t j = 0; j < pSql->numOfSubs; ++j) { + + // use the local variable + int32_t numOfSub = pSql->numOfSubs; + for (int32_t j = 0; j < numOfSub; ++j) { SSqlObj *pSub = pSql->pSubs[j]; - int32_t code = tscCopyDataBlockToPayload(pSub, pDataBlocks->pData[j]); - - if (code != TSDB_CODE_SUCCESS) { - tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", pSql, j, - pDataBlocks->nSize, code); - } - tscTrace("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, j); tscProcessSql(pSub); } return TSDB_CODE_SUCCESS; + + _error: + // restore the udf fp + pSql->fp = pSql->fetchFp; + + tfree(pState); + tfree(pSql->param); + + for(int32_t j = 1; j < i; ++j) { + tfree(pSql->pSubs[j]->param); + taos_free_result(pSql->pSubs[j]); + } + + return TSDB_CODE_TSC_OUT_OF_MEMORY; } void tscBuildResFromSubqueries(SSqlObj *pSql) { -- GitLab