diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 0c7af5d4e3d8b61445b25c94cf5eda021ceba3e9..d284b92a6746e89ef5729889a077a423c4199938 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1409,36 +1409,37 @@ typedef struct SImportFileSupport { static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { assert(param != NULL && tres != NULL); + char * tokenBuf = NULL; + size_t n = 0; + ssize_t readLen = 0; + char * line = NULL; + int32_t count = 0; + int32_t maxRows = 0; + FILE * fp = NULL; + SSqlObj *pSql = tres; SSqlCmd *pCmd = &pSql->cmd; - SImportFileSupport *pSupporter = (SImportFileSupport *) param; + SImportFileSupport *pSupporter = (SImportFileSupport *)param; SSqlObj *pParentSql = pSupporter->pSql; - FILE *fp = pSupporter->fp; + fp = pSupporter->fp; if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { // handle error assert(taos_errno(pSql) == code); - do { - if (code == TSDB_CODE_TDB_TABLE_RECONFIGURE) { - assert(pSql->res.numOfRows == 0); - int32_t errc = fseek(fp, 0, SEEK_SET); - if (errc < 0) { - tscError("%p failed to seek SEEK_SET since:%s", pSql, tstrerror(errno)); - } else { - break; - } + if (code == TSDB_CODE_TDB_TABLE_RECONFIGURE) { + assert(pSql->res.numOfRows == 0); + int32_t ret = fseek(fp, 0, SEEK_SET); + if (ret < 0) { + tscError("%p failed to seek SEEK_SET since:%s", pSql, tstrerror(errno)); + pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); + goto _error; } - - taos_free_result(pSql); - tfree(pSupporter); - fclose(fp); - + } else { pParentSql->res.code = code; - tscAsyncResultOnError(pParentSql); - return; - } while (0); + goto _error; + } } // accumulate the total submit records @@ -1452,28 +1453,32 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { SParsedDataColInfo spd = {.numOfCols = tinfo.numOfColumns}; tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns); - size_t n = 0; - ssize_t readLen = 0; - char * line = NULL; - int32_t count = 0; - int32_t maxRows = 0; - tfree(pCmd->pTableNameList); pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); if (pCmd->pTableBlockHashList == NULL) { pCmd->pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + if (pCmd->pTableBlockHashList == NULL) { + pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto _error; + } } STableDataBlocks *pTableDataBlock = NULL; - int32_t ret = tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, - sizeof(SSubmitBlk), tinfo.rowSize, pTableMetaInfo->name, pTableMeta, &pTableDataBlock, NULL); + int32_t ret = + tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk), + tinfo.rowSize, pTableMetaInfo->name, pTableMeta, &pTableDataBlock, NULL); if (ret != TSDB_CODE_SUCCESS) { -// return ret; + pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto _error; } tscAllocateMemIfNeed(pTableDataBlock, tinfo.rowSize, &maxRows); - char *tokenBuf = calloc(1, 4096); + tokenBuf = calloc(1, TSDB_MAX_BYTES_PER_ROW); + if (tokenBuf == NULL) { + pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto _error; + } while ((readLen = tgetline(&line, &n, fp)) != -1) { if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) { @@ -1501,27 +1506,43 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { } tfree(tokenBuf); - free(line); + tfree(line); - if (count > 0) { - code = doPackSendDataBlock(pSql, count, pTableDataBlock); - if (code != TSDB_CODE_SUCCESS) { + pParentSql->res.code = code; + + if (code == TSDB_CODE_SUCCESS) { + if (count > 0) { + code = doPackSendDataBlock(pSql, count, pTableDataBlock); + if (code == TSDB_CODE_SUCCESS) { + return; + } else { + pParentSql->res.code = code; + goto _error; + } + } else { pParentSql->res.code = code; - tscAsyncResultOnError(pParentSql); + taos_free_result(pSql); + tfree(pSupporter); + fclose(fp); + + pParentSql->fp = pParentSql->fetchFp; + + // all data has been sent to vnode, call user function + int32_t v = + (pParentSql->res.code != TSDB_CODE_SUCCESS) ? pParentSql->res.code : (int32_t)pParentSql->res.numOfRows; + (*pParentSql->fp)(pParentSql->param, pParentSql, v); return; } + } - } else { - taos_free_result(pSql); - tfree(pSupporter); - fclose(fp); - - pParentSql->fp = pParentSql->fetchFp; +_error: + tfree(tokenBuf); + tfree(line); + taos_free_result(pSql); + tfree(pSupporter); + fclose(fp); - // all data has been sent to vnode, call user function - int32_t v = (pParentSql->res.code != TSDB_CODE_SUCCESS) ? pParentSql->res.code : (int32_t)pParentSql->res.numOfRows; - (*pParentSql->fp)(pParentSql->param, pParentSql, v); - } + tscAsyncResultOnError(pParentSql); } void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql) {