diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 652e5bdd47b5bfa3adfdb8b8f58e94399c4de810..83eaeeb3219e4ba2cce7017ac50701c454e32c1e 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -435,7 +435,7 @@ void waitForQueryRsp(void *param, TAOS_RES *tres, int code); void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, __async_cb_func_t fp, void *param, const char *sqlstr, size_t sqlLen); -void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql); +void tscImportDataFromFile(SSqlObj *pSql); void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen); bool tscIsUpdateQuery(SSqlObj* pSql); bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes); diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index d284b92a6746e89ef5729889a077a423c4199938..f2dddd5e291420fda4127d7f469541f956c08abe 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1406,7 +1406,7 @@ typedef struct SImportFileSupport { FILE *fp; } SImportFileSupport; -static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { +static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRows) { assert(param != NULL && tres != NULL); char * tokenBuf = NULL; @@ -1425,21 +1425,19 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { SSqlObj *pParentSql = pSupporter->pSql; fp = pSupporter->fp; - if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { // handle error - assert(taos_errno(pSql) == code); + int32_t code = pSql->res.code; - if (code == TSDB_CODE_TDB_TABLE_RECONFIGURE) { - assert(pSql->res.numOfRows == 0); - int32_t ret = fseek(fp, 0, SEEK_SET); - if (ret < 0) { - tscError("%p failed to seek SEEK_SET since:%s", pSql, tstrerror(errno)); - pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); - goto _error; - } - } else { - pParentSql->res.code = code; + // retry parse data from file and import data from the begining again + if (code == TSDB_CODE_TDB_TABLE_RECONFIGURE) { + assert(pSql->res.numOfRows == 0); + int32_t ret = fseek(fp, 0, SEEK_SET); + if (ret < 0) { + tscError("%p failed to seek SEEK_SET since:%s", pSql, tstrerror(errno)); + code = TAOS_SYSTEM_ERROR(errno); goto _error; } + } else if (code != TSDB_CODE_SUCCESS) { + goto _error; } // accumulate the total submit records @@ -1459,7 +1457,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { if (pCmd->pTableBlockHashList == NULL) { pCmd->pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); if (pCmd->pTableBlockHashList == NULL) { - pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; + code = TSDB_CODE_TSC_OUT_OF_MEMORY; goto _error; } } @@ -1476,7 +1474,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { tscAllocateMemIfNeed(pTableDataBlock, tinfo.rowSize, &maxRows); tokenBuf = calloc(1, TSDB_MAX_BYTES_PER_ROW); if (tokenBuf == NULL) { - pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; + code = TSDB_CODE_TSC_OUT_OF_MEMORY; goto _error; } @@ -1509,18 +1507,15 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { tfree(line); pParentSql->res.code = code; - if (code == TSDB_CODE_SUCCESS) { if (count > 0) { code = doPackSendDataBlock(pSql, count, pTableDataBlock); if (code == TSDB_CODE_SUCCESS) { return; } else { - pParentSql->res.code = code; goto _error; } } else { - pParentSql->res.code = code; taos_free_result(pSql); tfree(pSupporter); fclose(fp); @@ -1528,8 +1523,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { pParentSql->fp = pParentSql->fetchFp; // all data has been sent to vnode, call user function - int32_t v = - (pParentSql->res.code != TSDB_CODE_SUCCESS) ? pParentSql->res.code : (int32_t)pParentSql->res.numOfRows; + int32_t v = (code != TSDB_CODE_SUCCESS) ? code : (int32_t)pParentSql->res.numOfRows; (*pParentSql->fp)(pParentSql->param, pParentSql, v); return; } @@ -1545,7 +1539,7 @@ _error: tscAsyncResultOnError(pParentSql); } -void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql) { +void tscImportDataFromFile(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; if (pCmd->command != TSDB_SQL_INSERT) { return; @@ -1564,12 +1558,11 @@ void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql) { tfree(pSupporter); tscAsyncResultOnError(pSql); - return; } pSupporter->pSql = pSql; - pSupporter->fp = fp; + pSupporter->fp = fp; - parseFileSendDataBlock(pSupporter, pNew, 0); + parseFileSendDataBlock(pSupporter, pNew, TSDB_CODE_SUCCESS); } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 02cd9b9692ffa3e72270b57a4f1dd2dc146f0257..a153a9147d55031f703a3b27b83902f772fc0cb0 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2197,7 +2197,7 @@ void tscDoQuery(SSqlObj* pSql) { } if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) { - tscProcessMultiVnodesImportFromFile(pSql); + tscImportDataFromFile(pSql); } else { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); uint16_t type = pQueryInfo->type;