提交 5d87d141 编写于 作者: H Haojun Liao

[td-719]

上级 768d7206
......@@ -1377,24 +1377,114 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock
return code;
}
if ((code = tscProcessSql(pSql)) != TSDB_CODE_SUCCESS) {
return code;
return tscProcessSql(pSql);
}
typedef struct SImportFileSupport {
SSqlObj *pSql;
FILE *fp;
} SImportFileSupport;
static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) {
assert(param != NULL && tres != NULL);
SSqlObj *pSql = tres;
SSqlCmd *pCmd = &pSql->cmd;
SImportFileSupport *pSupporter = (SImportFileSupport *) param;
SSqlObj *pParentSql = pSupporter->pSql;
FILE *fp = pSupporter->fp;
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { // handle error
assert(taos_errno(pSql) == code);
taos_free_result(pSql);
tfree(pSupporter);
fclose(fp);
pParentSql->res.code = code;
return;
}
return TSDB_CODE_SUCCESS;
// accumulate the total submit records
pParentSql->res.numOfRows += pSql->res.numOfRows;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SSchema * pSchema = tscGetTableSchema(pTableMeta);
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
SParsedDataColInfo spd = {.numOfCols = tinfo.numOfColumns};
tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns);
size_t n = 0;
ssize_t readLen = 0;
char * line = NULL;
int32_t count = 0;
int32_t maxRows = 0;
STableDataBlocks *pTableDataBlock = taosArrayGetP(pSql->cmd.pDataBlocks, 0);
pTableDataBlock->size = pTableDataBlock->headerSize;
tscAllocateMemIfNeed(pTableDataBlock, tinfo.rowSize, &maxRows);
char *tokenBuf = calloc(1, 4096);
while ((readLen = getline(&line, &n, fp)) != -1) {
if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
line[--readLen] = 0;
}
if (readLen == 0) {
continue;
}
char *lineptr = line;
strtolower(line, line);
int32_t len =
tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd->payload, tinfo.precision, &code, tokenBuf);
if (len <= 0 || pTableDataBlock->numOfParams > 0) {
pSql->res.code = code;
break;
}
pTableDataBlock->size += len;
if (++count >= maxRows) {
break;
}
}
tfree(tokenBuf);
free(line);
if (count > 0) {
if ((code = doPackSendDataBlock(pSql, count, pTableDataBlock)) != TSDB_CODE_SUCCESS) {
pParentSql->res.code = code;
}
} else {
taos_free_result(pSql);
tfree(pSupporter);
fclose(fp);
pParentSql->fp = pParentSql->fetchFp;
// all data has been sent to vnode, call user function
int32_t v = (pParentSql->res.code != TSDB_CODE_SUCCESS) ? pParentSql->res.code : pParentSql->res.numOfRows;
(*pParentSql->fp)(pParentSql->param, pParentSql, v);
}
}
static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
static UNUSED_FUNC int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
size_t readLen = 0;
char * line = NULL;
size_t n = 0;
int len = 0;
int32_t maxRows = 0;
SSqlCmd * pCmd = &pSql->cmd;
int numOfRows = 0;
int32_t code = 0;
int nrows = 0;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
......@@ -1422,6 +1512,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns);
size_t n = 0;
while ((readLen = getline(&line, &n, fp)) != -1) {
// line[--readLen] = '\0';
if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) line[--readLen] = 0;
......@@ -1430,19 +1521,18 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
char *lineptr = line;
strtolower(line, line);
len = tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd->payload, tinfo.precision, &code, tmpTokenBuf);
int32_t len = tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd->payload, tinfo.precision, &code, tmpTokenBuf);
if (len <= 0 || pTableDataBlock->numOfParams > 0) {
pSql->res.code = code;
return (-code);
return code;
}
pTableDataBlock->size += len;
count++;
nrows++;
if (count >= maxRows) {
if ((code = doPackSendDataBlock(pSql, count, pTableDataBlock)) != TSDB_CODE_SUCCESS) {
return -code;
return code;
}
pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, 0);
......@@ -1457,7 +1547,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
if (count > 0) {
if ((code = doPackSendDataBlock(pSql, count, pTableDataBlock)) != TSDB_CODE_SUCCESS) {
return -code;
return code;
}
numOfRows += pSql->res.numOfRows;
......@@ -1477,26 +1567,32 @@ void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql) {
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
STableDataBlocks *pDataBlock = NULL;
int32_t affected_rows = 0;
assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE && pCmd->pDataBlocks != NULL);
assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE/* && pCmd->pDataBlocks != NULL*/);
SArray *pDataBlockList = pCmd->pDataBlocks;
pCmd->pDataBlocks = NULL;
STableDataBlocks* pDataBlock = taosArrayGetP(pDataBlockList, 0);
char path[PATH_MAX] = {0};
size_t size = taosArrayGetSize(pDataBlockList);
for (int32_t i = 0; i < size; ++i) {
pDataBlock = taosArrayGetP(pDataBlockList, i );
if (pDataBlock == NULL) {
continue;
}
SImportFileSupport* pSupporter = calloc(1, sizeof(SImportFileSupport));
pSupporter->pSql = pSql;
SSqlObj *pNew = createSubqueryObj(pSql, 0, parseFileSendDataBlock, pSupporter, TSDB_SQL_INSERT, NULL);
pNew->cmd.pDataBlocks = taosArrayInit(4, POINTER_BYTES);
STableDataBlocks *pTableDataBlock = NULL;
int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, tinfo.rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name, pTableMeta, &pTableDataBlock);
if (ret != TSDB_CODE_SUCCESS) {
// return ret;
}
taosArrayPush(pNew->cmd.pDataBlocks, &pTableDataBlock);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE)) {
tscError("%p failed to malloc when insert file", pSql);
continue;
}
pCmd->count = 1;
......@@ -1505,44 +1601,10 @@ void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql) {
FILE *fp = fopen(path, "r");
if (fp == NULL) {
tscError("%p failed to open file %s to load data from file, reason:%s", pSql, path, strerror(errno));
continue;
}
tstrncpy(pTableMetaInfo->name, pDataBlock->tableId, sizeof(pTableMetaInfo->name));
memset(pDataBlock->pData, 0, pDataBlock->nAllocSize);
int32_t ret = tscGetTableMeta(pSql, pTableMetaInfo);
if (ret != TSDB_CODE_SUCCESS) {
tscError("%p get meter meta failed, abort", pSql);
continue;
}
char *tmpTokenBuf = calloc(1, 4096); // used for deleting Escape character: \\, \', \"
if (NULL == tmpTokenBuf) {
tscError("%p calloc failed", pSql);
continue;
// continue;// todo handle error
}
int nrows = tscInsertDataFromFile(pSql, fp, tmpTokenBuf);
free(tmpTokenBuf);
pSupporter->fp = fp;
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
if (nrows < 0) {
fclose(fp);
tscTrace("%p no records(%d) in file %s", pSql, nrows, path);
continue;
}
fclose(fp);
affected_rows += nrows;
tscTrace("%p Insert data %d records from file %s", pSql, nrows, path);
}
pSql->res.numOfRows = affected_rows;
// all data have been submit to vnode, release data blocks
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
tscDestroyBlockArrayList(pDataBlockList);
parseFileSendDataBlock(pSupporter, pNew, 0);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册