diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index f2af4f1d32b3de2ff8e65994c7199f522ff2f634..34741e2ed33cfd3f1250d719bceeb2b01d07b04e 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1377,24 +1377,114 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock return code; } - if ((code = tscProcessSql(pSql)) != TSDB_CODE_SUCCESS) { - return code; + return tscProcessSql(pSql); +} + +typedef struct SImportFileSupport { + SSqlObj *pSql; + FILE *fp; +} SImportFileSupport; + +static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { + assert(param != NULL && tres != NULL); + + SSqlObj *pSql = tres; + SSqlCmd *pCmd = &pSql->cmd; + + SImportFileSupport *pSupporter = (SImportFileSupport *) param; + + SSqlObj *pParentSql = pSupporter->pSql; + FILE *fp = pSupporter->fp; + + if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { // handle error + assert(taos_errno(pSql) == code); + + taos_free_result(pSql); + tfree(pSupporter); + fclose(fp); + + pParentSql->res.code = code; + return; } - return TSDB_CODE_SUCCESS; + // accumulate the total submit records + pParentSql->res.numOfRows += pSql->res.numOfRows; + + STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); + STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; + SSchema * pSchema = tscGetTableSchema(pTableMeta); + STableComInfo tinfo = tscGetTableInfo(pTableMeta); + + SParsedDataColInfo spd = {.numOfCols = tinfo.numOfColumns}; + tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns); + + size_t n = 0; + ssize_t readLen = 0; + char * line = NULL; + int32_t count = 0; + int32_t maxRows = 0; + + STableDataBlocks *pTableDataBlock = taosArrayGetP(pSql->cmd.pDataBlocks, 0); + pTableDataBlock->size = pTableDataBlock->headerSize; + + tscAllocateMemIfNeed(pTableDataBlock, tinfo.rowSize, &maxRows); + char *tokenBuf = calloc(1, 4096); + + while ((readLen = getline(&line, &n, fp)) != -1) { + if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) { + line[--readLen] = 0; + } + + if (readLen == 0) { + continue; + } + + char *lineptr = line; + strtolower(line, line); + + int32_t len = + tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd->payload, tinfo.precision, &code, tokenBuf); + if (len <= 0 || pTableDataBlock->numOfParams > 0) { + pSql->res.code = code; + break; + } + + pTableDataBlock->size += len; + + if (++count >= maxRows) { + break; + } + } + + tfree(tokenBuf); + free(line); + + if (count > 0) { + if ((code = doPackSendDataBlock(pSql, count, pTableDataBlock)) != TSDB_CODE_SUCCESS) { + pParentSql->res.code = code; + } + + } else { + taos_free_result(pSql); + tfree(pSupporter); + fclose(fp); + + pParentSql->fp = pParentSql->fetchFp; + + // all data has been sent to vnode, call user function + int32_t v = (pParentSql->res.code != TSDB_CODE_SUCCESS) ? pParentSql->res.code : pParentSql->res.numOfRows; + (*pParentSql->fp)(pParentSql->param, pParentSql, v); + } } -static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { +static UNUSED_FUNC int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { size_t readLen = 0; char * line = NULL; - size_t n = 0; - int len = 0; int32_t maxRows = 0; SSqlCmd * pCmd = &pSql->cmd; int numOfRows = 0; int32_t code = 0; - int nrows = 0; - + STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; STableComInfo tinfo = tscGetTableInfo(pTableMeta); @@ -1422,6 +1512,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns); + size_t n = 0; while ((readLen = getline(&line, &n, fp)) != -1) { // line[--readLen] = '\0'; if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) line[--readLen] = 0; @@ -1430,19 +1521,18 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { char *lineptr = line; strtolower(line, line); - len = tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd->payload, tinfo.precision, &code, tmpTokenBuf); + int32_t len = tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd->payload, tinfo.precision, &code, tmpTokenBuf); if (len <= 0 || pTableDataBlock->numOfParams > 0) { pSql->res.code = code; - return (-code); + return code; } pTableDataBlock->size += len; count++; - nrows++; if (count >= maxRows) { if ((code = doPackSendDataBlock(pSql, count, pTableDataBlock)) != TSDB_CODE_SUCCESS) { - return -code; + return code; } pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, 0); @@ -1457,7 +1547,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { if (count > 0) { if ((code = doPackSendDataBlock(pSql, count, pTableDataBlock)) != TSDB_CODE_SUCCESS) { - return -code; + return code; } numOfRows += pSql->res.numOfRows; @@ -1477,26 +1567,32 @@ void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql) { SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); + STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; - STableDataBlocks *pDataBlock = NULL; - int32_t affected_rows = 0; - - assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE && pCmd->pDataBlocks != NULL); + assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE/* && pCmd->pDataBlocks != NULL*/); SArray *pDataBlockList = pCmd->pDataBlocks; - pCmd->pDataBlocks = NULL; + STableDataBlocks* pDataBlock = taosArrayGetP(pDataBlockList, 0); char path[PATH_MAX] = {0}; - size_t size = taosArrayGetSize(pDataBlockList); - for (int32_t i = 0; i < size; ++i) { - pDataBlock = taosArrayGetP(pDataBlockList, i ); - if (pDataBlock == NULL) { - continue; - } + SImportFileSupport* pSupporter = calloc(1, sizeof(SImportFileSupport)); + pSupporter->pSql = pSql; + + SSqlObj *pNew = createSubqueryObj(pSql, 0, parseFileSendDataBlock, pSupporter, TSDB_SQL_INSERT, NULL); + + pNew->cmd.pDataBlocks = taosArrayInit(4, POINTER_BYTES); + + STableDataBlocks *pTableDataBlock = NULL; + int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, tinfo.rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name, pTableMeta, &pTableDataBlock); + if (ret != TSDB_CODE_SUCCESS) { +// return ret; + } + + taosArrayPush(pNew->cmd.pDataBlocks, &pTableDataBlock); if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE)) { tscError("%p failed to malloc when insert file", pSql); - continue; } pCmd->count = 1; @@ -1505,44 +1601,10 @@ void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql) { FILE *fp = fopen(path, "r"); if (fp == NULL) { tscError("%p failed to open file %s to load data from file, reason:%s", pSql, path, strerror(errno)); - continue; - } - - tstrncpy(pTableMetaInfo->name, pDataBlock->tableId, sizeof(pTableMetaInfo->name)); - memset(pDataBlock->pData, 0, pDataBlock->nAllocSize); - - int32_t ret = tscGetTableMeta(pSql, pTableMetaInfo); - if (ret != TSDB_CODE_SUCCESS) { - tscError("%p get meter meta failed, abort", pSql); - continue; - } - - char *tmpTokenBuf = calloc(1, 4096); // used for deleting Escape character: \\, \', \" - if (NULL == tmpTokenBuf) { - tscError("%p calloc failed", pSql); - continue; +// continue;// todo handle error } - int nrows = tscInsertDataFromFile(pSql, fp, tmpTokenBuf); - free(tmpTokenBuf); + pSupporter->fp = fp; - pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); - - if (nrows < 0) { - fclose(fp); - tscTrace("%p no records(%d) in file %s", pSql, nrows, path); - continue; - } - - fclose(fp); - affected_rows += nrows; - - tscTrace("%p Insert data %d records from file %s", pSql, nrows, path); - } - - pSql->res.numOfRows = affected_rows; - - // all data have been submit to vnode, release data blocks - pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); - tscDestroyBlockArrayList(pDataBlockList); + parseFileSendDataBlock(pSupporter, pNew, 0); }