From afcc90a4b4a2989adda540e29c3f0bc14c5e098d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 24 Jun 2020 23:54:15 +0800 Subject: [PATCH] [td-719] --- src/client/inc/tsclient.h | 8 +--- src/client/src/tscParseInsert.c | 83 ++++++++++++--------------------- src/client/src/tscUtil.c | 2 +- src/inc/taosdef.h | 2 +- 4 files changed, 35 insertions(+), 60 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 29abff7685..471db9c99f 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -171,11 +171,7 @@ typedef struct STableDataBlocks { * to avoid it to be removed from cache */ STableMeta *pTableMeta; - - union { - char *filename; - char *pData; - }; + char *pData; // for parameter ('?') binding uint32_t numOfAllocedParams; @@ -398,7 +394,7 @@ void waitForQueryRsp(void *param, TAOS_RES *tres, int code) ; int doAsyncParseSql(SSqlObj* pSql); void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen); -void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql); +void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql); void tscKillSTableQuery(SSqlObj *pSql); void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen); bool tscIsUpdateQuery(SSqlObj* pSql); diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 34741e2ed3..c2ce81992c 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1153,29 +1153,19 @@ int tsParseInsertSql(SSqlObj *pSql) { goto _error; } - char fname[PATH_MAX] = {0}; - strncpy(fname, sToken.z, sToken.n); - strdequote(fname); + strncpy(pCmd->payload, sToken.z, sToken.n); + strdequote(pCmd->payload); + // todo refactor extract method wordexp_t full_path; - if (wordexp(fname, &full_path, 0) != 0) { + if (wordexp(pCmd->payload, &full_path, 0) != 0) { code = tscInvalidSQLErrMsg(pCmd->payload, "invalid filename", sToken.z); goto _error; } - strcpy(fname, full_path.we_wordv[0]); - wordfree(&full_path); - STableDataBlocks *pDataBlock = NULL; - STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; - - int32_t ret = tscCreateDataBlock(PATH_MAX, tinfo.rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name, - pTableMeta, &pDataBlock); - if (ret != TSDB_CODE_SUCCESS) { - goto _error; - } + tstrncpy(pCmd->payload, full_path.we_wordv[0], pCmd->allocSize); + wordfree(&full_path); - taosArrayPush(pCmd->pDataBlocks, &pDataBlock); - strcpy(pDataBlock->filename, fname); } else if (sToken.type == TK_LP) { /* insert into tablename(col1, col2,..., coln) values(v1, v2,... vn); */ STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta; @@ -1424,10 +1414,18 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { int32_t count = 0; int32_t maxRows = 0; - STableDataBlocks *pTableDataBlock = taosArrayGetP(pSql->cmd.pDataBlocks, 0); - pTableDataBlock->size = pTableDataBlock->headerSize; + tscDestroyBlockArrayList(pSql->cmd.pDataBlocks); + pCmd->pDataBlocks = taosArrayInit(1, POINTER_BYTES); + STableDataBlocks *pTableDataBlock = NULL; + int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, tinfo.rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name, pTableMeta, &pTableDataBlock); + if (ret != TSDB_CODE_SUCCESS) { +// return ret; + } + + taosArrayPush(pCmd->pDataBlocks, &pTableDataBlock); tscAllocateMemIfNeed(pTableDataBlock, tinfo.rowSize, &maxRows); + char *tokenBuf = calloc(1, 4096); while ((readLen = getline(&line, &n, fp)) != -1) { @@ -1559,52 +1557,33 @@ static UNUSED_FUNC int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpT return numOfRows; } -void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql) { +void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; if (pCmd->command != TSDB_SQL_INSERT) { return; } - SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); - STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; - - assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE/* && pCmd->pDataBlocks != NULL*/); - SArray *pDataBlockList = pCmd->pDataBlocks; - STableDataBlocks* pDataBlock = taosArrayGetP(pDataBlockList, 0); - - char path[PATH_MAX] = {0}; - - SImportFileSupport* pSupporter = calloc(1, sizeof(SImportFileSupport)); - pSupporter->pSql = pSql; + assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE && strlen(pCmd->payload) != 0); + SImportFileSupport *pSupporter = calloc(1, sizeof(SImportFileSupport)); SSqlObj *pNew = createSubqueryObj(pSql, 0, parseFileSendDataBlock, pSupporter, TSDB_SQL_INSERT, NULL); pNew->cmd.pDataBlocks = taosArrayInit(4, POINTER_BYTES); + pCmd->count = 1; - STableDataBlocks *pTableDataBlock = NULL; - int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, tinfo.rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name, pTableMeta, &pTableDataBlock); - if (ret != TSDB_CODE_SUCCESS) { -// return ret; - } - - taosArrayPush(pNew->cmd.pDataBlocks, &pTableDataBlock); - - if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE)) { - tscError("%p failed to malloc when insert file", pSql); - } - pCmd->count = 1; + FILE *fp = fopen(pCmd->payload, "r"); + if (fp == NULL) { + pSql->res.code = TAOS_SYSTEM_ERROR(errno); + tscError("%p failed to open file %s to load data from file, code:%s", pSql, pCmd->payload, tstrerror(pSql->res.code)); - tstrncpy(path, pDataBlock->filename, sizeof(path)); + tfree(pSupporter) + tscQueueAsyncRes(pSql); - FILE *fp = fopen(path, "r"); - if (fp == NULL) { - tscError("%p failed to open file %s to load data from file, reason:%s", pSql, path, strerror(errno)); -// continue;// todo handle error - } + return; + } - pSupporter->fp = fp; + pSupporter->pSql = pSql; + pSupporter->fp = fp; - parseFileSendDataBlock(pSupporter, pNew, 0); + parseFileSendDataBlock(pSupporter, pNew, 0); } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 4d90caddcb..b4fe670df8 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1880,7 +1880,7 @@ void tscDoQuery(SSqlObj* pSql) { } if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) { - tscProcessMultiVnodesInsertFromFile(pSql); + tscProcessMultiVnodesImportFromFile(pSql); } else { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); uint16_t type = pQueryInfo->type; diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 8cc5e82590..eefd9f0c00 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -245,7 +245,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_DEFAULT_PKT_SIZE 65480 //same as RPC_MAX_UDP_SIZE #define TSDB_PAYLOAD_SIZE TSDB_DEFAULT_PKT_SIZE -#define TSDB_DEFAULT_PAYLOAD_SIZE 2048 // default payload size +#define TSDB_DEFAULT_PAYLOAD_SIZE 5120 // default payload size, greater than PATH_MAX value #define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth #define TSDB_CQ_SQL_SIZE 1024 #define TSDB_MAX_VNODES 256 -- GitLab