提交 795690cd 编写于 作者: H Haojun Liao

[TD-1600]refactor codes.

上级 710d41da
...@@ -435,7 +435,7 @@ void waitForQueryRsp(void *param, TAOS_RES *tres, int code); ...@@ -435,7 +435,7 @@ void waitForQueryRsp(void *param, TAOS_RES *tres, int code);
void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, __async_cb_func_t fp, void *param, const char *sqlstr, size_t sqlLen); void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, __async_cb_func_t fp, void *param, const char *sqlstr, size_t sqlLen);
void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql); void tscImportDataFromFile(SSqlObj *pSql);
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen); void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
bool tscIsUpdateQuery(SSqlObj* pSql); bool tscIsUpdateQuery(SSqlObj* pSql);
bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes); bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes);
......
...@@ -1406,7 +1406,7 @@ typedef struct SImportFileSupport { ...@@ -1406,7 +1406,7 @@ typedef struct SImportFileSupport {
FILE *fp; FILE *fp;
} SImportFileSupport; } SImportFileSupport;
static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRows) {
assert(param != NULL && tres != NULL); assert(param != NULL && tres != NULL);
char * tokenBuf = NULL; char * tokenBuf = NULL;
...@@ -1425,21 +1425,19 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { ...@@ -1425,21 +1425,19 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) {
SSqlObj *pParentSql = pSupporter->pSql; SSqlObj *pParentSql = pSupporter->pSql;
fp = pSupporter->fp; fp = pSupporter->fp;
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { // handle error int32_t code = pSql->res.code;
assert(taos_errno(pSql) == code);
if (code == TSDB_CODE_TDB_TABLE_RECONFIGURE) { // retry parse data from file and import data from the begining again
assert(pSql->res.numOfRows == 0); if (code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
int32_t ret = fseek(fp, 0, SEEK_SET); assert(pSql->res.numOfRows == 0);
if (ret < 0) { int32_t ret = fseek(fp, 0, SEEK_SET);
tscError("%p failed to seek SEEK_SET since:%s", pSql, tstrerror(errno)); if (ret < 0) {
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); tscError("%p failed to seek SEEK_SET since:%s", pSql, tstrerror(errno));
goto _error; code = TAOS_SYSTEM_ERROR(errno);
}
} else {
pParentSql->res.code = code;
goto _error; goto _error;
} }
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
} }
// accumulate the total submit records // accumulate the total submit records
...@@ -1459,7 +1457,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { ...@@ -1459,7 +1457,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) {
if (pCmd->pTableBlockHashList == NULL) { if (pCmd->pTableBlockHashList == NULL) {
pCmd->pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); pCmd->pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
if (pCmd->pTableBlockHashList == NULL) { if (pCmd->pTableBlockHashList == NULL) {
pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error; goto _error;
} }
} }
...@@ -1476,7 +1474,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { ...@@ -1476,7 +1474,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) {
tscAllocateMemIfNeed(pTableDataBlock, tinfo.rowSize, &maxRows); tscAllocateMemIfNeed(pTableDataBlock, tinfo.rowSize, &maxRows);
tokenBuf = calloc(1, TSDB_MAX_BYTES_PER_ROW); tokenBuf = calloc(1, TSDB_MAX_BYTES_PER_ROW);
if (tokenBuf == NULL) { if (tokenBuf == NULL) {
pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error; goto _error;
} }
...@@ -1509,18 +1507,15 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { ...@@ -1509,18 +1507,15 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) {
tfree(line); tfree(line);
pParentSql->res.code = code; pParentSql->res.code = code;
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
if (count > 0) { if (count > 0) {
code = doPackSendDataBlock(pSql, count, pTableDataBlock); code = doPackSendDataBlock(pSql, count, pTableDataBlock);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
return; return;
} else { } else {
pParentSql->res.code = code;
goto _error; goto _error;
} }
} else { } else {
pParentSql->res.code = code;
taos_free_result(pSql); taos_free_result(pSql);
tfree(pSupporter); tfree(pSupporter);
fclose(fp); fclose(fp);
...@@ -1528,8 +1523,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { ...@@ -1528,8 +1523,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) {
pParentSql->fp = pParentSql->fetchFp; pParentSql->fp = pParentSql->fetchFp;
// all data has been sent to vnode, call user function // all data has been sent to vnode, call user function
int32_t v = int32_t v = (code != TSDB_CODE_SUCCESS) ? code : (int32_t)pParentSql->res.numOfRows;
(pParentSql->res.code != TSDB_CODE_SUCCESS) ? pParentSql->res.code : (int32_t)pParentSql->res.numOfRows;
(*pParentSql->fp)(pParentSql->param, pParentSql, v); (*pParentSql->fp)(pParentSql->param, pParentSql, v);
return; return;
} }
...@@ -1545,7 +1539,7 @@ _error: ...@@ -1545,7 +1539,7 @@ _error:
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(pParentSql);
} }
void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql) { void tscImportDataFromFile(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
if (pCmd->command != TSDB_SQL_INSERT) { if (pCmd->command != TSDB_SQL_INSERT) {
return; return;
...@@ -1564,12 +1558,11 @@ void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql) { ...@@ -1564,12 +1558,11 @@ void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql) {
tfree(pSupporter); tfree(pSupporter);
tscAsyncResultOnError(pSql); tscAsyncResultOnError(pSql);
return; return;
} }
pSupporter->pSql = pSql; pSupporter->pSql = pSql;
pSupporter->fp = fp; pSupporter->fp = fp;
parseFileSendDataBlock(pSupporter, pNew, 0); parseFileSendDataBlock(pSupporter, pNew, TSDB_CODE_SUCCESS);
} }
...@@ -2197,7 +2197,7 @@ void tscDoQuery(SSqlObj* pSql) { ...@@ -2197,7 +2197,7 @@ void tscDoQuery(SSqlObj* pSql) {
} }
if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) { if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) {
tscProcessMultiVnodesImportFromFile(pSql); tscImportDataFromFile(pSql);
} else { } else {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
uint16_t type = pQueryInfo->type; uint16_t type = pQueryInfo->type;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册