提交 710d41da 编写于 作者: H Haojun Liao

[TD-1600]<fix>: return error code to client if importing data from file failed.

上级 da13f686
...@@ -1409,36 +1409,37 @@ typedef struct SImportFileSupport { ...@@ -1409,36 +1409,37 @@ typedef struct SImportFileSupport {
static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) {
assert(param != NULL && tres != NULL); assert(param != NULL && tres != NULL);
char * tokenBuf = NULL;
size_t n = 0;
ssize_t readLen = 0;
char * line = NULL;
int32_t count = 0;
int32_t maxRows = 0;
FILE * fp = NULL;
SSqlObj *pSql = tres; SSqlObj *pSql = tres;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SImportFileSupport *pSupporter = (SImportFileSupport *) param; SImportFileSupport *pSupporter = (SImportFileSupport *)param;
SSqlObj *pParentSql = pSupporter->pSql; SSqlObj *pParentSql = pSupporter->pSql;
FILE *fp = pSupporter->fp; fp = pSupporter->fp;
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { // handle error if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { // handle error
assert(taos_errno(pSql) == code); assert(taos_errno(pSql) == code);
do { if (code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
if (code == TSDB_CODE_TDB_TABLE_RECONFIGURE) { assert(pSql->res.numOfRows == 0);
assert(pSql->res.numOfRows == 0); int32_t ret = fseek(fp, 0, SEEK_SET);
int32_t errc = fseek(fp, 0, SEEK_SET); if (ret < 0) {
if (errc < 0) { tscError("%p failed to seek SEEK_SET since:%s", pSql, tstrerror(errno));
tscError("%p failed to seek SEEK_SET since:%s", pSql, tstrerror(errno)); pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
} else { goto _error;
break;
}
} }
} else {
taos_free_result(pSql);
tfree(pSupporter);
fclose(fp);
pParentSql->res.code = code; pParentSql->res.code = code;
tscAsyncResultOnError(pParentSql); goto _error;
return; }
} while (0);
} }
// accumulate the total submit records // accumulate the total submit records
...@@ -1452,28 +1453,32 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { ...@@ -1452,28 +1453,32 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) {
SParsedDataColInfo spd = {.numOfCols = tinfo.numOfColumns}; SParsedDataColInfo spd = {.numOfCols = tinfo.numOfColumns};
tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns); tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns);
size_t n = 0;
ssize_t readLen = 0;
char * line = NULL;
int32_t count = 0;
int32_t maxRows = 0;
tfree(pCmd->pTableNameList); tfree(pCmd->pTableNameList);
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
if (pCmd->pTableBlockHashList == NULL) { if (pCmd->pTableBlockHashList == NULL) {
pCmd->pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); pCmd->pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
if (pCmd->pTableBlockHashList == NULL) {
pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
} }
STableDataBlocks *pTableDataBlock = NULL; STableDataBlocks *pTableDataBlock = NULL;
int32_t ret = tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, int32_t ret =
sizeof(SSubmitBlk), tinfo.rowSize, pTableMetaInfo->name, pTableMeta, &pTableDataBlock, NULL); tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk),
tinfo.rowSize, pTableMetaInfo->name, pTableMeta, &pTableDataBlock, NULL);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
// return ret; pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
} }
tscAllocateMemIfNeed(pTableDataBlock, tinfo.rowSize, &maxRows); tscAllocateMemIfNeed(pTableDataBlock, tinfo.rowSize, &maxRows);
char *tokenBuf = calloc(1, 4096); tokenBuf = calloc(1, TSDB_MAX_BYTES_PER_ROW);
if (tokenBuf == NULL) {
pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
while ((readLen = tgetline(&line, &n, fp)) != -1) { while ((readLen = tgetline(&line, &n, fp)) != -1) {
if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) { if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
...@@ -1501,27 +1506,43 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { ...@@ -1501,27 +1506,43 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) {
} }
tfree(tokenBuf); tfree(tokenBuf);
free(line); tfree(line);
if (count > 0) { pParentSql->res.code = code;
code = doPackSendDataBlock(pSql, count, pTableDataBlock);
if (code != TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
if (count > 0) {
code = doPackSendDataBlock(pSql, count, pTableDataBlock);
if (code == TSDB_CODE_SUCCESS) {
return;
} else {
pParentSql->res.code = code;
goto _error;
}
} else {
pParentSql->res.code = code; pParentSql->res.code = code;
tscAsyncResultOnError(pParentSql); taos_free_result(pSql);
tfree(pSupporter);
fclose(fp);
pParentSql->fp = pParentSql->fetchFp;
// all data has been sent to vnode, call user function
int32_t v =
(pParentSql->res.code != TSDB_CODE_SUCCESS) ? pParentSql->res.code : (int32_t)pParentSql->res.numOfRows;
(*pParentSql->fp)(pParentSql->param, pParentSql, v);
return; return;
} }
}
} else { _error:
taos_free_result(pSql); tfree(tokenBuf);
tfree(pSupporter); tfree(line);
fclose(fp); taos_free_result(pSql);
tfree(pSupporter);
pParentSql->fp = pParentSql->fetchFp; fclose(fp);
// all data has been sent to vnode, call user function tscAsyncResultOnError(pParentSql);
int32_t v = (pParentSql->res.code != TSDB_CODE_SUCCESS) ? pParentSql->res.code : (int32_t)pParentSql->res.numOfRows;
(*pParentSql->fp)(pParentSql->param, pParentSql, v);
}
} }
void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql) { void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册