From c159dd8ad7a449f1411b8b573a75d98c03dda697 Mon Sep 17 00:00:00 2001 From: lihui Date: Thu, 19 Dec 2019 17:37:30 +0800 Subject: [PATCH] [TBASE-1360] --- src/client/inc/tsclient.h | 2 ++ src/client/src/tscParseInsert.c | 37 +++++++++++++++++++++++---------- src/client/src/tscServer.c | 6 +++--- src/client/src/tscSql.c | 12 +++++++++++ 4 files changed, 43 insertions(+), 14 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index be9ba47f2a..6adf2f1be1 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -391,6 +391,8 @@ typedef struct _sql_obj { SSqlCmd cmd; SSqlRes res; uint8_t numOfSubs; + char* asyncTblPos; + void* pTableHashList; struct _sql_obj **pSubs; struct _sql_obj * prev, *next; } SSqlObj; diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index a0a7bc7638..2a6c80e06b 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -667,7 +667,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char if (NULL == tmpTokenBuf) { return TSDB_CODE_CLI_OUT_OF_MEMORY; } - + int32_t numOfRows = tsParseValues(str, dataBuf, pMeterMeta, maxNumOfRows, spd, pCmd->payload, &code, tmpTokenBuf); free(tmpTokenBuf); if (numOfRows <= 0) { @@ -949,9 +949,17 @@ int doParserInsertSql(SSqlObj *pSql, char *str) { return code; } - void *pTableHashList = taosInitIntHash(128, POINTER_BYTES, taosHashInt); - - pSql->cmd.pDataBlocks = tscCreateBlockArrayList(); + if ((NULL == pSql->asyncTblPos) && (NULL == pSql->pTableHashList)) { + pSql->pTableHashList = taosInitIntHash(128, POINTER_BYTES, taosHashInt); + pSql->cmd.pDataBlocks = tscCreateBlockArrayList(); + if (NULL == pSql->pTableHashList || NULL == pSql->cmd.pDataBlocks) { + code = TSDB_CODE_CLI_OUT_OF_MEMORY; + goto _error_clean; + } + } else { + str = pSql->asyncTblPos; + } + tscTrace("%p create data block list for submit data, %p", pSql, pSql->cmd.pDataBlocks); while (1) { @@ -970,6 +978,8 @@ int doParserInsertSql(SSqlObj *pSql, char *str) { } } + pSql->asyncTblPos = sToken.z; + // Check if the table name available or not if (validateTableName(sToken.z, sToken.n) != TSDB_CODE_SUCCESS) { code = tscInvalidSQLErrMsg(pCmd->payload, "table name invalid", sToken.z); @@ -984,7 +994,8 @@ int doParserInsertSql(SSqlObj *pSql, char *str) { void *fp = pSql->fp; if ((code = tscParseSqlForCreateTableOnDemand(&str, pSql)) != TSDB_CODE_SUCCESS) { if (fp != NULL) { - goto _clean; + //goto _clean; + return code; } else { /* * for async insert, the free data block operations, which is tscDestroyBlockArrayList, @@ -1027,11 +1038,10 @@ int doParserInsertSql(SSqlObj *pSql, char *str) { * app here insert data in different vnodes, so we need to set the following * data in another submit procedure using async insert routines */ - code = doParseInsertStatement(pSql, pTableHashList, &str, &spd, &totalNum); + code = doParseInsertStatement(pSql, pSql->pTableHashList, &str, &spd, &totalNum); if (code != TSDB_CODE_SUCCESS) { goto _error_clean; } - } else if (sToken.type == TK_FILE) { if (pCmd->isInsertFromFile == -1) { pCmd->isInsertFromFile = 1; @@ -1142,7 +1152,7 @@ int doParserInsertSql(SSqlObj *pSql, char *str) { goto _error_clean; } - code = doParseInsertStatement(pSql, pTableHashList, &str, &spd, &totalNum); + code = doParseInsertStatement(pSql, pSql->pTableHashList, &str, &spd, &totalNum); if (code != TSDB_CODE_SUCCESS) { goto _error_clean; } @@ -1156,7 +1166,7 @@ int doParserInsertSql(SSqlObj *pSql, char *str) { if (pCmd->numOfParams > 0) { goto _clean; } - + // submit to more than one vnode if (pCmd->pDataBlocks->nSize > 0) { // merge according to vgid @@ -1184,7 +1194,8 @@ _error_clean: pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); _clean: - taosCleanUpIntHash(pTableHashList); + taosCleanUpIntHash(pSql->pTableHashList); + pSql->pTableHashList = NULL; return code; } @@ -1219,7 +1230,11 @@ int tsParseSql(SSqlObj *pSql, char *acct, char *db, bool multiVnodeInsertion) { // must before clean the sqlcmd object tscRemoveAllMeterMetaInfo(&pSql->cmd, false); - tscCleanSqlCmd(&pSql->cmd); + + if (NULL == pSql->asyncTblPos) { + tscTrace("continue parse sql: %s", pSql->asyncTblPos); + tscCleanSqlCmd(&pSql->cmd); + } if (tscIsInsertOrImportData(pSql->sqlstr)) { /* diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 65859eea0e..4ccdb6b327 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -3606,9 +3606,9 @@ int tscGetMeterMeta(SSqlObj *pSql, char *meterId, int32_t index) { * for async insert operation, release data block buffer before issue new object to get metermeta * because in metermeta callback function, the tscParse function will generate the submit data blocks */ - if (pSql->fp != NULL && pSql->pStream == NULL) { - tscFreeSqlCmdData(pCmd); - } + //if (pSql->fp != NULL && pSql->pStream == NULL) { + // tscFreeSqlCmdData(pCmd); + //} return tscDoGetMeterMeta(pSql, meterId, index); } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 25d94c6eaf..4b78c5ed7a 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -204,6 +204,12 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) { pRes->numOfRows = 1; pRes->numOfTotal = 0; + pSql->asyncTblPos = NULL; + if (NULL != pSql->pTableHashList) { + taosCleanUpIntHash(pSql->pTableHashList); + pSql->pTableHashList = NULL; + } + tscTrace("%p SQL: %s pObj:%p", pSql, pSql->sqlstr, pObj); pRes->code = (uint8_t)tsParseSql(pSql, pObj->acctId, pObj->db, false); @@ -947,6 +953,12 @@ int taos_validate_sql(TAOS *taos, const char *sql) { strtolower(pSql->sqlstr, sql); + pSql->asyncTblPos = NULL; + if (NULL != pSql->pTableHashList) { + taosCleanUpIntHash(pSql->pTableHashList); + pSql->pTableHashList = NULL; + } + pRes->code = (uint8_t)tsParseSql(pSql, pObj->acctId, pObj->db, false); int code = pRes->code; -- GitLab