未验证 提交 49df1015 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #953 from taosdata/feature/lihui

[TBASE-1360]
...@@ -391,6 +391,8 @@ typedef struct _sql_obj { ...@@ -391,6 +391,8 @@ typedef struct _sql_obj {
SSqlCmd cmd; SSqlCmd cmd;
SSqlRes res; SSqlRes res;
uint8_t numOfSubs; uint8_t numOfSubs;
char* asyncTblPos;
void* pTableHashList;
struct _sql_obj **pSubs; struct _sql_obj **pSubs;
struct _sql_obj * prev, *next; struct _sql_obj * prev, *next;
} SSqlObj; } SSqlObj;
......
...@@ -667,7 +667,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char ...@@ -667,7 +667,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char
if (NULL == tmpTokenBuf) { if (NULL == tmpTokenBuf) {
return TSDB_CODE_CLI_OUT_OF_MEMORY; return TSDB_CODE_CLI_OUT_OF_MEMORY;
} }
int32_t numOfRows = tsParseValues(str, dataBuf, pMeterMeta, maxNumOfRows, spd, pCmd->payload, &code, tmpTokenBuf); int32_t numOfRows = tsParseValues(str, dataBuf, pMeterMeta, maxNumOfRows, spd, pCmd->payload, &code, tmpTokenBuf);
free(tmpTokenBuf); free(tmpTokenBuf);
if (numOfRows <= 0) { if (numOfRows <= 0) {
...@@ -949,9 +949,17 @@ int doParserInsertSql(SSqlObj *pSql, char *str) { ...@@ -949,9 +949,17 @@ int doParserInsertSql(SSqlObj *pSql, char *str) {
return code; return code;
} }
void *pTableHashList = taosInitIntHash(128, POINTER_BYTES, taosHashInt); if ((NULL == pSql->asyncTblPos) && (NULL == pSql->pTableHashList)) {
pSql->pTableHashList = taosInitIntHash(128, POINTER_BYTES, taosHashInt);
pSql->cmd.pDataBlocks = tscCreateBlockArrayList(); pSql->cmd.pDataBlocks = tscCreateBlockArrayList();
if (NULL == pSql->pTableHashList || NULL == pSql->cmd.pDataBlocks) {
code = TSDB_CODE_CLI_OUT_OF_MEMORY;
goto _error_clean;
}
} else {
str = pSql->asyncTblPos;
}
tscTrace("%p create data block list for submit data, %p", pSql, pSql->cmd.pDataBlocks); tscTrace("%p create data block list for submit data, %p", pSql, pSql->cmd.pDataBlocks);
while (1) { while (1) {
...@@ -970,6 +978,8 @@ int doParserInsertSql(SSqlObj *pSql, char *str) { ...@@ -970,6 +978,8 @@ int doParserInsertSql(SSqlObj *pSql, char *str) {
} }
} }
pSql->asyncTblPos = sToken.z;
// Check if the table name available or not // Check if the table name available or not
if (validateTableName(sToken.z, sToken.n) != TSDB_CODE_SUCCESS) { if (validateTableName(sToken.z, sToken.n) != TSDB_CODE_SUCCESS) {
code = tscInvalidSQLErrMsg(pCmd->payload, "table name invalid", sToken.z); code = tscInvalidSQLErrMsg(pCmd->payload, "table name invalid", sToken.z);
...@@ -984,7 +994,8 @@ int doParserInsertSql(SSqlObj *pSql, char *str) { ...@@ -984,7 +994,8 @@ int doParserInsertSql(SSqlObj *pSql, char *str) {
void *fp = pSql->fp; void *fp = pSql->fp;
if ((code = tscParseSqlForCreateTableOnDemand(&str, pSql)) != TSDB_CODE_SUCCESS) { if ((code = tscParseSqlForCreateTableOnDemand(&str, pSql)) != TSDB_CODE_SUCCESS) {
if (fp != NULL) { if (fp != NULL) {
goto _clean; //goto _clean;
return code;
} else { } else {
/* /*
* for async insert, the free data block operations, which is tscDestroyBlockArrayList, * for async insert, the free data block operations, which is tscDestroyBlockArrayList,
...@@ -1027,11 +1038,10 @@ int doParserInsertSql(SSqlObj *pSql, char *str) { ...@@ -1027,11 +1038,10 @@ int doParserInsertSql(SSqlObj *pSql, char *str) {
* app here insert data in different vnodes, so we need to set the following * app here insert data in different vnodes, so we need to set the following
* data in another submit procedure using async insert routines * data in another submit procedure using async insert routines
*/ */
code = doParseInsertStatement(pSql, pTableHashList, &str, &spd, &totalNum); code = doParseInsertStatement(pSql, pSql->pTableHashList, &str, &spd, &totalNum);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error_clean; goto _error_clean;
} }
} else if (sToken.type == TK_FILE) { } else if (sToken.type == TK_FILE) {
if (pCmd->isInsertFromFile == -1) { if (pCmd->isInsertFromFile == -1) {
pCmd->isInsertFromFile = 1; pCmd->isInsertFromFile = 1;
...@@ -1142,7 +1152,7 @@ int doParserInsertSql(SSqlObj *pSql, char *str) { ...@@ -1142,7 +1152,7 @@ int doParserInsertSql(SSqlObj *pSql, char *str) {
goto _error_clean; goto _error_clean;
} }
code = doParseInsertStatement(pSql, pTableHashList, &str, &spd, &totalNum); code = doParseInsertStatement(pSql, pSql->pTableHashList, &str, &spd, &totalNum);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error_clean; goto _error_clean;
} }
...@@ -1156,7 +1166,7 @@ int doParserInsertSql(SSqlObj *pSql, char *str) { ...@@ -1156,7 +1166,7 @@ int doParserInsertSql(SSqlObj *pSql, char *str) {
if (pCmd->numOfParams > 0) { if (pCmd->numOfParams > 0) {
goto _clean; goto _clean;
} }
// submit to more than one vnode // submit to more than one vnode
if (pCmd->pDataBlocks->nSize > 0) { if (pCmd->pDataBlocks->nSize > 0) {
// merge according to vgid // merge according to vgid
...@@ -1184,7 +1194,8 @@ _error_clean: ...@@ -1184,7 +1194,8 @@ _error_clean:
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
_clean: _clean:
taosCleanUpIntHash(pTableHashList); taosCleanUpIntHash(pSql->pTableHashList);
pSql->pTableHashList = NULL;
return code; return code;
} }
...@@ -1219,7 +1230,11 @@ int tsParseSql(SSqlObj *pSql, char *acct, char *db, bool multiVnodeInsertion) { ...@@ -1219,7 +1230,11 @@ int tsParseSql(SSqlObj *pSql, char *acct, char *db, bool multiVnodeInsertion) {
// must before clean the sqlcmd object // must before clean the sqlcmd object
tscRemoveAllMeterMetaInfo(&pSql->cmd, false); tscRemoveAllMeterMetaInfo(&pSql->cmd, false);
tscCleanSqlCmd(&pSql->cmd);
if (NULL == pSql->asyncTblPos) {
tscTrace("continue parse sql: %s", pSql->asyncTblPos);
tscCleanSqlCmd(&pSql->cmd);
}
if (tscIsInsertOrImportData(pSql->sqlstr)) { if (tscIsInsertOrImportData(pSql->sqlstr)) {
/* /*
......
...@@ -3606,9 +3606,9 @@ int tscGetMeterMeta(SSqlObj *pSql, char *meterId, int32_t index) { ...@@ -3606,9 +3606,9 @@ int tscGetMeterMeta(SSqlObj *pSql, char *meterId, int32_t index) {
* for async insert operation, release data block buffer before issue new object to get metermeta * for async insert operation, release data block buffer before issue new object to get metermeta
* because in metermeta callback function, the tscParse function will generate the submit data blocks * because in metermeta callback function, the tscParse function will generate the submit data blocks
*/ */
if (pSql->fp != NULL && pSql->pStream == NULL) { //if (pSql->fp != NULL && pSql->pStream == NULL) {
tscFreeSqlCmdData(pCmd); // tscFreeSqlCmdData(pCmd);
} //}
return tscDoGetMeterMeta(pSql, meterId, index); return tscDoGetMeterMeta(pSql, meterId, index);
} }
......
...@@ -204,6 +204,12 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) { ...@@ -204,6 +204,12 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
pRes->numOfRows = 1; pRes->numOfRows = 1;
pRes->numOfTotal = 0; pRes->numOfTotal = 0;
pSql->asyncTblPos = NULL;
if (NULL != pSql->pTableHashList) {
taosCleanUpIntHash(pSql->pTableHashList);
pSql->pTableHashList = NULL;
}
tscTrace("%p SQL: %s pObj:%p", pSql, pSql->sqlstr, pObj); tscTrace("%p SQL: %s pObj:%p", pSql, pSql->sqlstr, pObj);
pRes->code = (uint8_t)tsParseSql(pSql, pObj->acctId, pObj->db, false); pRes->code = (uint8_t)tsParseSql(pSql, pObj->acctId, pObj->db, false);
...@@ -947,6 +953,12 @@ int taos_validate_sql(TAOS *taos, const char *sql) { ...@@ -947,6 +953,12 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
strtolower(pSql->sqlstr, sql); strtolower(pSql->sqlstr, sql);
pSql->asyncTblPos = NULL;
if (NULL != pSql->pTableHashList) {
taosCleanUpIntHash(pSql->pTableHashList);
pSql->pTableHashList = NULL;
}
pRes->code = (uint8_t)tsParseSql(pSql, pObj->acctId, pObj->db, false); pRes->code = (uint8_t)tsParseSql(pSql, pObj->acctId, pObj->db, false);
int code = pRes->code; int code = pRes->code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册