diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 5c442a155b22faf6f57a286ae3738764ed9a33a2..0d684b72dce860433cbf0bc9ae74a1be120f76af 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -267,19 +267,19 @@ typedef struct { int32_t numOfTablesInSubmit; }; - int32_t clauseIndex; // index of multiple subclause query - int8_t isParseFinish; - short numOfCols; - uint32_t allocSize; - char * payload; - int payloadLen; - + int32_t clauseIndex; // index of multiple subclause query + int8_t parseFinished; + short numOfCols; + uint32_t allocSize; + char * payload; + int32_t payloadLen; SQueryInfo **pQueryInfo; int32_t numOfClause; - - // submit data blocks branched according to vnode - SDataBlockList *pDataBlocks; - + + SDataBlockList *pDataBlocks; // submit data blocks after parsing sql + char * curSql; // current sql, resume position of sql after parsing paused + void * pTableList; // referred table involved in sql + // for parameter ('?') binding and batch processing int32_t batchSize; int32_t numOfParams; @@ -358,8 +358,6 @@ typedef struct SSqlObj { SSqlCmd cmd; SSqlRes res; uint8_t numOfSubs; - char * asyncTblPos; - void * pTableHashList; struct SSqlObj **pSubs; struct SSqlObj * prev, *next; } SSqlObj; @@ -422,7 +420,7 @@ void tscRestoreSQLFunctionForMetricQuery(SQueryInfo *pQueryInfo); int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo); void tscDestroyResPointerInfo(SSqlRes *pRes); -void tscFreeSqlCmdData(SSqlCmd *pCmd); +void tscResetSqlCmdObj(SSqlCmd *pCmd); void tscFreeResData(SSqlObj *pSql); /** diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index e6722253080b56c943eb67bea5c0212ada5bda84..25f58568773513a1b6841f95373d8bca8eec8f0c 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -67,7 +67,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const pRes->numOfRows = 1; strtolower(pSql->sqlstr, sqlstr); - tscDump("%p pObj:%p, Async SQL: %s", pSql, pObj, pSql->sqlstr); + tscDump("%p SQL: %s", pSql, pSql->sqlstr); int32_t code = tsParseSql(pSql, true); if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; @@ -342,7 +342,7 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) { (*pSql->fp)(pSql->param, taosres, code); if (shouldFree) { - tscTrace("%p Async sql is automatically freed in async res", pSql); + tscTrace("%p sqlObj is automatically freed in async res", pSql); tscFreeSqlObj(pSql); } } @@ -463,7 +463,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; } else { // normal async query continues - if (pCmd->isParseFinish) { + if (pCmd->parseFinished) { tscTrace("%p re-send data to vnode in table Meta callback since sql parsed completed", pSql); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 518b3680b4a5c4013f7ab1acd82e02b6c741f911..c4d3c475c4b2e0ed4af1452c11013eacd6ffac2e 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -21,7 +21,6 @@ #include "os.h" #include "hash.h" -//#include "tscSecondaryMerge.h" #include "tscUtil.h" #include "tschemautil.h" #include "tsclient.h" @@ -656,7 +655,7 @@ void sortRemoveDuplicates(STableDataBlocks *dataBuf) { } } -static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char **str, SParsedDataColInfo *spd, +static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableList, char **str, SParsedDataColInfo *spd, int32_t *totalNum) { SSqlCmd * pCmd = &pSql->cmd; STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); @@ -664,7 +663,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char STableComInfo tinfo = tscGetTableInfo(pTableMeta); STableDataBlocks *dataBuf = NULL; - int32_t ret = tscGetDataBlockFromList(pTableHashList, pCmd->pDataBlocks, pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE, + int32_t ret = tscGetDataBlockFromList(pTableList, pCmd->pDataBlocks, pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), tinfo.rowSize, pTableMetaInfo->name, pTableMeta, &dataBuf); if (ret != TSDB_CODE_SUCCESS) { @@ -942,7 +941,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) { } code = tscGetTableMeta(pSql, pTableMetaInfo); - if (pSql->asyncTblPos == NULL) { + if (pCmd->curSql == NULL) { assert(code == TSDB_CODE_ACTION_IN_PROGRESS); } } @@ -1008,23 +1007,23 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { return code; } - assert(((NULL == pSql->asyncTblPos) && (NULL == pSql->pTableHashList)) - || ((NULL != pSql->asyncTblPos) && (NULL != pSql->pTableHashList))); + assert(((NULL == pCmd->curSql) && (NULL == pCmd->pTableList)) + || ((NULL != pCmd->curSql) && (NULL != pCmd->pTableList))); - if ((NULL == pSql->asyncTblPos) && (NULL == pSql->pTableHashList)) { - pSql->pTableHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); + if ((NULL == pCmd->curSql) && (NULL == pCmd->pTableList)) { + pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); pSql->cmd.pDataBlocks = tscCreateBlockArrayList(); - if (NULL == pSql->pTableHashList || NULL == pSql->cmd.pDataBlocks) { + if (NULL == pCmd->pTableList || NULL == pSql->cmd.pDataBlocks) { code = TSDB_CODE_CLI_OUT_OF_MEMORY; goto _error_clean; } } else { - assert((NULL != pSql->asyncTblPos) && (NULL != pSql->pTableHashList)); - str = pSql->asyncTblPos; + assert((NULL != pCmd->curSql) && (NULL != pCmd->pTableList)); + str = pCmd->curSql; } - tscTrace("%p create data block list for submit data:%p, asyncTblPos:%p, pTableHashList:%p", pSql, pSql->cmd.pDataBlocks, pSql->asyncTblPos, pSql->pTableHashList); + tscTrace("%p create data block list for submit data:%p, curSql:%p, pTableList:%p", pSql, pSql->cmd.pDataBlocks, pCmd->curSql, pCmd->pTableList); while (1) { int32_t index = 0; @@ -1052,7 +1051,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { } } - pSql->asyncTblPos = sToken.z; + pCmd->curSql = sToken.z; // Check if the table name available or not if (validateTableName(sToken.z, sToken.n) != TSDB_CODE_SUCCESS) { @@ -1064,7 +1063,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { goto _error_clean; } - ptrdiff_t pos = pSql->asyncTblPos - pSql->sqlstr; + ptrdiff_t pos = pCmd->curSql - pSql->sqlstr; if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) { /* @@ -1075,13 +1074,13 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { */ if (TSDB_CODE_ACTION_IN_PROGRESS == code) { tscTrace("%p waiting for get table meta during insert, then resume from offset: %" PRId64 " , %s", pSql, - pos, pSql->asyncTblPos); + pos, pCmd->curSql); return code; } // todo add to return tscError("%p async insert parse error, code:%d, %s", pSql, code, tstrerror(code)); - pSql->asyncTblPos = NULL; + pCmd->curSql = NULL; goto _error_clean; // TODO: should _clean or _error_clean to async flow ???? } @@ -1115,7 +1114,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { * app here insert data in different vnodes, so we need to set the following * data in another submit procedure using async insert routines */ - code = doParseInsertStatement(pSql, pSql->pTableHashList, &str, &spd, &totalNum); + code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum); if (code != TSDB_CODE_SUCCESS) { goto _error_clean; } @@ -1227,7 +1226,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { goto _error_clean; } - code = doParseInsertStatement(pSql, pSql->pTableHashList, &str, &spd, &totalNum); + code = doParseInsertStatement(pSql, pCmd->pTableList, &str, &spd, &totalNum); if (code != TSDB_CODE_SUCCESS) { goto _error_clean; } @@ -1257,11 +1256,11 @@ _error_clean: pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); _clean: - taosHashCleanup(pSql->pTableHashList); + taosHashCleanup(pCmd->pTableList); + pCmd->pTableList = NULL; - pSql->pTableHashList = NULL; - pSql->asyncTblPos = NULL; - pCmd->isParseFinish = 1; + pCmd->curSql = NULL; + pCmd->parseFinished = 1; return code; } @@ -1305,17 +1304,15 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) { tscFreeSqlObjPartial(pSql); pSql->sqlstr = p; } else { - tscTrace("continue parse sql: %s", pSql->asyncTblPos); + tscTrace("continue parse sql: %s", pSql->cmd.curSql); } if (tscIsInsertOrImportData(pSql->sqlstr)) { /* - * only for async multi-vnode insertion - * Set the fp before parse the sql string, in case of getmetermeta failed, in which - * the error handle callback function can rightfully restore the user defined function (fp) + * Set the fp before parse the sql string, in case of getTableMeta failed, in which + * the error handle callback function can rightfully restore the user-defined callback function (fp). */ if (initialParse) { - // replace user defined callback function with multi-insert proxy function pSql->fetchFp = pSql->fp; pSql->fp = (void(*)())tscHandleMultivnodeInsert; } @@ -1335,11 +1332,11 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) { } /* - * the pRes->code may be modified or even released by another thread in tscTableMetaCallBack - * function, so do NOT use pRes->code to determine if the getMeterMeta/getMetricMeta function - * invokes new threads to get data from mnode or simply retrieves data from cache. + * the pRes->code may be modified or released by another thread in tscTableMetaCallBack function, + * so do NOT use pRes->code to determine if the getTableMeta/getMetricMeta function + * invokes new threads to get data from mgmt node or simply retrieves data from cache. * - * do NOT assign return code to pRes->code for the same reason for it may be released by another thread + * do NOT assign return code to pRes->code for the same reason since it may be released by another thread * pRes->code = ret; */ return ret; @@ -1457,7 +1454,6 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { return numOfRows; } -// multi-vnodes insertion in sync query model void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; if (pCmd->command != TSDB_SQL_INSERT) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 0bb3cb0dc790e571c9aa1a79761dc6faa6d3798c..ebac676b77a292affc631dd2e0214de010127c4d 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -350,7 +350,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { void *taosres = tscKeepConn[pCmd->command] ? pSql : NULL; rpcMsg->code = pRes->code ? pRes->code : pRes->numOfRows; - tscTrace("%p Async SQL result:%s res:%p", pSql, tstrerror(pRes->code), pSql); + tscTrace("%p SQL result:%s res:%p", pSql, tstrerror(pRes->code), pSql); /* * Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj @@ -364,7 +364,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { (*pSql->fp)(pSql->param, taosres, rpcMsg->code); if (shouldFree) { - tscTrace("%p Async sql is automatically freed", pSql); + tscTrace("%p sqlObj is automatically freed", pSql); tscFreeSqlObj(pSql); } } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index c522dbfebe9365e51c8ef8aa20e2d6243401fee4..1eee5d5e7fc7edea575fa6370b67acc3baf21e17 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -232,15 +232,16 @@ void taos_close(TAOS *taos) { int taos_query_imp(STscObj *pObj, SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; - - pRes->numOfRows = 1; + SSqlCmd *pCmd = &pSql->cmd; + + pRes->numOfRows = 1; pRes->numOfTotal = 0; pRes->numOfTotalInCurrentClause = 0; - pSql->asyncTblPos = NULL; - if (NULL != pSql->pTableHashList) { - taosHashCleanup(pSql->pTableHashList); - pSql->pTableHashList = NULL; + pCmd->curSql = NULL; + if (NULL != pCmd->pTableList) { + taosHashCleanup(pCmd->pTableList); + pCmd->pTableList = NULL; } tscDump("%p pObj:%p, SQL: %s", pSql, pObj, pSql->sqlstr); @@ -767,7 +768,7 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp); if (tscShouldFreeAsyncSqlObj(pSql)) { - tscTrace("%p Async SqlObj is freed by app", pSql); + tscTrace("%p SqlObj is freed by app", pSql); tscFreeSqlObj(pSql); } else { if (keepCmd) { @@ -851,7 +852,7 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { assert(pRes->numOfRows == 0 || (pCmd->command > TSDB_SQL_LOCAL)); tscFreeSqlObj(pSql); - tscTrace("%p Async sql result is freed by app", pSql); + tscTrace("%p sql result is freed by app", pSql); } else { if (keepCmd) { tscFreeSqlResult(pSql); @@ -1027,8 +1028,9 @@ int taos_validate_sql(TAOS *taos, const char *sql) { SSqlObj *pSql = pObj->pSql; SSqlRes *pRes = &pSql->res; - - pRes->numOfRows = 1; + SSqlCmd *pCmd = &pSql->cmd; + + pRes->numOfRows = 1; pRes->numOfTotal = 0; pRes->numOfTotalInCurrentClause = 0; @@ -1051,10 +1053,10 @@ int taos_validate_sql(TAOS *taos, const char *sql) { strtolower(pSql->sqlstr, sql); - pSql->asyncTblPos = NULL; - if (NULL != pSql->pTableHashList) { - taosHashCleanup(pSql->pTableHashList); - pSql->pTableHashList = NULL; + pCmd->curSql = NULL; + if (NULL != pCmd->pTableList) { + taosHashCleanup(pCmd->pTableList); + pCmd->pTableList = NULL; } pRes->code = (uint8_t)tsParseSql(pSql, false); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 2385b4ad823513e536ce26ce7d98fa14db657246..7ad7e65c51b406de04df6209115ab4bba58584fc 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -406,10 +406,16 @@ void tscDestroyResPointerInfo(SSqlRes* pRes) { pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free } -void tscFreeSqlCmdData(SSqlCmd* pCmd) { - pCmd->command = 0; +void tscResetSqlCmdObj(SSqlCmd* pCmd) { + pCmd->command = 0; pCmd->numOfCols = 0; - pCmd->count = 0; + pCmd->count = 0; + pCmd->curSql = NULL; + pCmd->msgType = 0; + pCmd->parseFinished = 0; + + taosHashCleanup(pCmd->pTableList); + pCmd->pTableList= NULL; pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); tscFreeSubqueryInfo(pCmd); @@ -480,14 +486,10 @@ void tscFreeSqlObjPartial(SSqlObj* pSql) { tscFreeSqlResult(pSql); tfree(pSql->pSubs); - taosHashCleanup(pSql->pTableHashList); - pSql->freed = 0; pSql->numOfSubs = 0; - pSql->pTableHashList = NULL; - pSql->asyncTblPos = NULL; - tscFreeSqlCmdData(pCmd); + tscResetSqlCmdObj(pCmd); tscTrace("%p partially free sqlObj completed", pSql); }