diff --git a/src/client/inc/tscSQLParser.h b/src/client/inc/tscSQLParser.h index 3a0cd41f23564fdff3ed05902f2a8355012d8a7c..aeb6e1f18c8464c7d531d0908d0167a2e06b6768 100644 --- a/src/client/inc/tscSQLParser.h +++ b/src/client/inc/tscSQLParser.h @@ -364,7 +364,6 @@ SQuerySQL *tSetQuerySQLElems(SSQLToken *pSelectToken, tSQLExprList *pSelection, SCreateTableSQL *tSetCreateSQLElems(tFieldList *pCols, tFieldList *pTags, SSQLToken *pMetricName, tVariantList *pTagVals, SQuerySQL *pSelect, int32_t type); -void tSQLExprDestroy(tSQLExpr *); void tSQLExprNodeDestroy(tSQLExpr *pExpr); tSQLExpr *tSQLExprNodeClone(tSQLExpr *pExpr); diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 687b5bc689507c22dbeac6f81644f2cca260394c..3190fd7057ad76e7a2fbd89827461de1b067d753 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -67,7 +67,7 @@ typedef struct SJoinSubquerySupporter { } SJoinSubquerySupporter; int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name, - STableDataBlocks** dataBlocks); + SMeterMeta* pMeterMeta, STableDataBlocks** dataBlocks); void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks); void tscDestroyDataBlock(STableDataBlocks* pDataBlock); @@ -81,7 +81,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock); void tscFreeUnusedDataBlocks(SDataBlockList* pList); int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList); int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size, - int32_t startOffset, int32_t rowSize, const char* tableId, + int32_t startOffset, int32_t rowSize, const char* tableId, SMeterMeta* pMeterMeta, STableDataBlocks** dataBlocks); SVnodeSidList* tscGetVnodeSidList(SMetricMeta* pMetricmeta, int32_t vnodeIdx); @@ -97,7 +97,7 @@ SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx); */ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo); bool tscIsTWAQuery(SQueryInfo* pQueryInfo); -bool tscProjectionQueryOnMetric(SSqlCmd* pCmd, int32_t subClauseIndex); +bool tscProjectionQueryOnSTable(SSqlCmd* pCmd, int32_t subClauseIndex); bool tscProjectionQueryOnTable(SQueryInfo* pQueryInfo); bool tscIsTwoStageMergeMetricQuery(SSqlCmd* pCmd); @@ -197,7 +197,7 @@ void tscFreeSubqueryInfo(SSqlCmd* pCmd); void tscClearSubqueryInfo(SSqlCmd* pCmd); void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, int32_t subClauseIndex, char* keyStr, uint64_t uid); -int tscGetMetricMeta(SSqlObj* pSql); +int tscGetMetricMeta(SSqlObj* pSql, int32_t clauseIndex); int tscGetMeterMeta(SSqlObj* pSql, SMeterMetaInfo* pMeterMetaInfo); int tscGetMeterMetaEx(SSqlObj* pSql, SMeterMetaInfo* pMeterMetaInfo, bool createIfNotExists); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index d84ad043654d6f06ab04f29d4f56bf031efd6bbd..22613386a507c4a67ba093b342e5b2720f24cec5 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -121,7 +121,7 @@ static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOf SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); // sequentially retrieve data from remain vnodes first, query vnode specified by vnodeIdx - if (numOfRows == 0 && tscProjectionQueryOnMetric(pCmd, 0)) { + if (numOfRows == 0 && tscProjectionQueryOnSTable(pCmd, 0)) { // vnode is denoted by vnodeIdx, continue to query vnode specified by vnodeIdx SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); assert(pMeterMetaInfo->vnodeIndex >= 0); @@ -272,7 +272,7 @@ void tscProcessAsyncRetrieve(void *param, TAOS_RES *tres, int numOfRows) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); if (numOfRows == 0) { // sequentially retrieve data from remain vnodes. - if (tscProjectionQueryOnMetric(pCmd, 0)) { + if (tscProjectionQueryOnSTable(pCmd, 0)) { /* * vnode is denoted by vnodeIdx, continue to query vnode specified by vnodeIdx till all vnode have been retrieved */ @@ -517,7 +517,7 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; - code = tscGetMetricMeta(pSql); + code = tscGetMetricMeta(pSql, 0); pRes->code = code; if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; @@ -534,7 +534,7 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_SUCCESS && UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) { - code = tscGetMetricMeta(pSql); + code = tscGetMetricMeta(pSql, 0); pRes->code = code; if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; diff --git a/src/client/src/tscJoinProcess.c b/src/client/src/tscJoinProcess.c index 6949999804d5acd80f1b96b1d74499105225b839..7b3734ebfd8200a6ea1127d97014c4abd63bade8 100644 --- a/src/client/src/tscJoinProcess.c +++ b/src/client/src/tscJoinProcess.c @@ -438,7 +438,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { taos_fetch_rows_a(tres, joinRetrieveCallback, param); } else if (numOfRows == 0) { // no data from this vnode anymore - if (tscProjectionQueryOnMetric(&pSql->cmd, 0)) { + if (tscProjectionQueryOnSTable(&pParentSql->cmd, 0)) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); @@ -498,7 +498,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { SSqlCmd* pCmd = &pSql->cmd; - if (tscProjectionQueryOnMetric(pCmd, 0) && numOfRows == 0) { + if (tscProjectionQueryOnSTable(pCmd, 0) && numOfRows == 0) { SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); assert(pQueryInfo->numOfTables == 1); @@ -543,7 +543,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); - if (tscProjectionQueryOnMetric(&pSql->cmd, 0)) { + if (tscProjectionQueryOnSTable(&pSql->cmd, 0)) { if (pRes->row >= pRes->numOfRows && pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes && (!tscHasReachLimitation(pSql->pSubs[i]))) { numOfFetch++; @@ -711,7 +711,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { * if the query is a continue query (vnodeIndex > 0 for projection query) for next vnode, do the retrieval of * data instead of returning to its invoker */ - if (pMeterMetaInfo->vnodeIndex > 0 && tscProjectionQueryOnMetric(&pSql->cmd, 0)) { + if (pMeterMetaInfo->vnodeIndex > 0 && tscProjectionQueryOnSTable(&pSql->cmd, 0)) { assert(pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes); pSupporter->pState->numOfCompleted = 0; // reset the record value diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 4a6c158ee37936217a71ee1bbe8e494ca85fcf40..c5d5e488f77456005384a106469aabf362316cbe 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -18,8 +18,8 @@ #define _XOPEN_SOURCE -#include "ihash.h" #include "os.h" +#include "hash.h" #include "tscSecondaryMerge.h" #include "tscUtil.h" #include "tschemautil.h" @@ -652,7 +652,8 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char STableDataBlocks *dataBuf = NULL; int32_t ret = tscGetDataBlockFromList(pTableHashList, pCmd->pDataBlocks, pMeterMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE, - sizeof(SShellSubmitBlock), pMeterMeta->rowSize, pMeterMetaInfo->name, &dataBuf); + sizeof(SShellSubmitBlock), pMeterMeta->rowSize, pMeterMetaInfo->name, + pMeterMeta, &dataBuf); if (ret != TSDB_CODE_SUCCESS) { return ret; } @@ -1111,8 +1112,10 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { wordfree(&full_path); STableDataBlocks *pDataBlock = NULL; - int32_t ret = tscCreateDataBlock(PATH_MAX, pMeterMetaInfo->pMeterMeta->rowSize, sizeof(SShellSubmitBlock), - pMeterMetaInfo->name, &pDataBlock); + SMeterMeta* pMeterMeta = pMeterMetaInfo->pMeterMeta; + + int32_t ret = tscCreateDataBlock(PATH_MAX, pMeterMeta->rowSize, sizeof(SShellSubmitBlock), pMeterMetaInfo->name, + pMeterMeta, &pDataBlock); if (ret != TSDB_CODE_SUCCESS) { goto _error_clean; } @@ -1358,8 +1361,8 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { pCmd->pDataBlocks = tscCreateBlockArrayList(); STableDataBlocks *pTableDataBlock = NULL; - int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, pMeterMeta->rowSize, sizeof(SShellSubmitBlock), - pMeterMetaInfo->name, &pTableDataBlock); + int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, rowSize, sizeof(SShellSubmitBlock), + pMeterMetaInfo->name, pMeterMeta, &pTableDataBlock); if (ret != TSDB_CODE_SUCCESS) { return -1; } @@ -1370,10 +1373,10 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { if (maxRows < 1) return -1; int count = 0; - SParsedDataColInfo spd = {.numOfCols = pMeterMetaInfo->pMeterMeta->numOfColumns}; - SSchema * pSchema = tsGetSchema(pMeterMetaInfo->pMeterMeta); + SParsedDataColInfo spd = {.numOfCols = pMeterMeta->numOfColumns}; + SSchema * pSchema = tsGetSchema(pMeterMeta); - tscSetAssignedColumnInfo(&spd, pSchema, pMeterMetaInfo->pMeterMeta->numOfColumns); + tscSetAssignedColumnInfo(&spd, pSchema, pMeterMeta->numOfColumns); while ((readLen = getline(&line, &n, fp)) != -1) { // line[--readLen] = '\0'; @@ -1383,8 +1386,8 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { char *lineptr = line; strtolower(line, line); - if (numOfRows >= maxRows || pTableDataBlock->size + pMeterMeta->rowSize >= pTableDataBlock->nAllocSize) { - uint32_t tSize = tscAllocateMemIfNeed(pTableDataBlock, pMeterMeta->rowSize); + if (numOfRows >= maxRows || pTableDataBlock->size + rowSize >= pTableDataBlock->nAllocSize) { + uint32_t tSize = tscAllocateMemIfNeed(pTableDataBlock, rowSize); if (0 == tSize) return (-TSDB_CODE_CLI_OUT_OF_MEMORY); maxRows += tSize; } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 138dd96a0642f782a41bd66b4e4ad7bf12ffd91f..1aff4595118c2bbd0a19a3a052bfa7e1a749be26 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -660,6 +660,13 @@ int32_t setMeterID(SMeterMetaInfo* pMeterMetaInfo, SSQLToken* pzTableName, SSqlO SSqlCmd* pCmd = &pSql->cmd; int32_t code = TSDB_CODE_SUCCESS; + // backup the old name in pMeterMetaInfo + size_t size = strlen(pMeterMetaInfo->name); + char* oldName = NULL; + if (size > 0) { + oldName = strdup(pMeterMetaInfo->name); + } + if (hasSpecifyDB(pzTableName)) { // db has been specified in sql string so we ignore current db path code = setObjFullName(pMeterMetaInfo->name, getAccountId(pSql), NULL, pzTableName, NULL); @@ -674,7 +681,25 @@ int32_t setMeterID(SMeterMetaInfo* pMeterMetaInfo, SSQLToken* pzTableName, SSqlO invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); } - return code; + if (code != TSDB_CODE_SUCCESS) { + free(oldName); + return code; + } + + /* + * the old name exists and is not equalled to the new name. Release the metermeta/metricmeta + * that are corresponding to the old name for the new table name. + */ + if (size > 0) { + if (strncasecmp(oldName, pMeterMetaInfo->name, tListLen(pMeterMetaInfo->name)) != 0) { + tscClearMeterMetaInfo(pMeterMetaInfo, false); + } + } else { + assert(pMeterMetaInfo->pMeterMeta == NULL && pMeterMetaInfo->pMetricMeta == NULL); + } + + tfree(oldName); + return TSDB_CODE_SUCCESS; } static bool validateTableColumnInfo(tFieldList* pFieldList, SSqlCmd* pCmd) { @@ -4412,7 +4437,7 @@ int32_t parseLimitClause(SSqlObj* pSql, int32_t subClauseIndex, SQuerySQL* pQuer if (queryOnTags == true) { // local handle the metric tag query pCmd->command = TSDB_SQL_RETRIEVE_TAGS; } else { - if (tscProjectionQueryOnMetric(&pSql->cmd, 0) && + if (tscProjectionQueryOnSTable(&pSql->cmd, 0) && (pQueryInfo->slimit.limit > 0 || pQueryInfo->slimit.offset > 0)) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); } @@ -4425,11 +4450,12 @@ int32_t parseLimitClause(SSqlObj* pSql, int32_t subClauseIndex, SQuerySQL* pQuer } /* - * get the distribution of all tables among available virtual nodes that satisfy query condition and - * created according to this super table from management node. - * And then launching multiple async-queries on required virtual nodes, which is the first-stage query operation. + * Get the distribution of all tables among all available virtual nodes that are qualified for the query condition + * and created according to this super table from management node. + * And then launching multiple async-queries against all qualified virtual nodes, during the first-stage + * query operation. */ - int32_t code = tscGetMetricMeta(pSql); + int32_t code = tscGetMetricMeta(pSql, 0); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -4951,7 +4977,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { } // projection query on metric does not compatible with "group by" syntax - if (tscProjectionQueryOnMetric(pCmd, 0)) { + if (tscProjectionQueryOnSTable(pCmd, 0)) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); } @@ -5259,9 +5285,6 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { SSqlCmd* pCmd = &pSql->cmd; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); assert(pQueryInfo->numOfTables == 1); - // if (pQueryInfo->numOfTables == 1) { - // tscAddEmptyMeterMetaInfo(pQueryInfo); - // } SCreateTableSQL* pCreateTable = pInfo->pCreateTableInfo; SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index cbec36b137aadd7b19aeffad7f957493d73bb0d3..78f25b8724559ff0e52bf8de58dff47834e59858 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2931,17 +2931,12 @@ int tscProcessMeterMetaRsp(SSqlObj *pSql) { pMeta->numOfColumns = htons(pMeta->numOfColumns); - if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) { - tscError("invalid tag value count:%d", pMeta->numOfTags); - return TSDB_CODE_INVALID_VALUE; - } - if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) { tscError("invalid numOfTags:%d", pMeta->numOfTags); return TSDB_CODE_INVALID_VALUE; } - if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) { + if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns <= 0) { tscError("invalid numOfColumns:%d", pMeta->numOfColumns); return TSDB_CODE_INVALID_VALUE; } @@ -2985,10 +2980,11 @@ int tscProcessMeterMetaRsp(SSqlObj *pSql) { // todo add one more function: taosAddDataIfNotExists(); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0); - taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), false); + assert(pMeterMetaInfo->pMeterMeta == NULL); pMeterMetaInfo->pMeterMeta = (SMeterMeta *)taosAddDataIntoCache(tscCacheHandle, pMeterMetaInfo->name, (char *)pMeta, size, tsMeterMetaKeepTimer); + // todo handle out of memory case if (pMeterMetaInfo->pMeterMeta == NULL) return 0; return TSDB_CODE_OTHERS; @@ -3421,7 +3417,7 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { * If the query result is exhausted, or current query is to free resource at server side, * the connection will be recycled. */ - if ((pRes->numOfRows == 0 && !(tscProjectionQueryOnMetric(pCmd, 0) && pRes->offset > 0)) || + if ((pRes->numOfRows == 0 && !(tscProjectionQueryOnSTable(pCmd, 0) && pRes->offset > 0)) || ((pQueryInfo->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE)) { tscTrace("%p no result or free resource, recycle connection", pSql); taosAddConnIntoCache(tscConnCache, pSql->thandle, pSql->ip, pSql->vnode, pObj->user); @@ -3494,9 +3490,7 @@ static int32_t doGetMeterMetaFromServer(SSqlObj *pSql, SMeterMetaInfo *pMeterMet * Transfer the ownership of metermeta to the new object, instead of invoking the release/acquire routine */ if (code == TSDB_CODE_SUCCESS) { - pMeterMetaInfo->pMeterMeta = pNewMeterMetaInfo->pMeterMeta; - pNewMeterMetaInfo->pMeterMeta = NULL; - + pMeterMetaInfo->pMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**) &pNewMeterMetaInfo->pMeterMeta); assert(pMeterMetaInfo->pMeterMeta != NULL); } @@ -3520,10 +3514,12 @@ static int32_t doGetMeterMetaFromServer(SSqlObj *pSql, SMeterMetaInfo *pMeterMet int tscGetMeterMeta(SSqlObj *pSql, SMeterMetaInfo *pMeterMetaInfo) { assert(strlen(pMeterMetaInfo->name) != 0); - // if the SSqlCmd owns a metermeta, release it first - taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), false); + // If this SMeterMetaInfo owns a metermeta, release it first + if (pMeterMetaInfo->pMeterMeta != NULL) { + taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), false); + } + pMeterMetaInfo->pMeterMeta = (SMeterMeta *)taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name); - if (pMeterMetaInfo->pMeterMeta != NULL) { SMeterMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta; @@ -3605,36 +3601,35 @@ int tscRenewMeterMeta(SSqlObj *pSql, char *meterId) { return code; } -int tscGetMetricMeta(SSqlObj *pSql) { +int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) { int code = TSDB_CODE_NETWORK_UNAVAIL; SSqlCmd *pCmd = &pSql->cmd; /* - * the vnode query condition is serialized into pCmd->payload, we need to rebuild key for metricmeta info in cache. + * the query condition is serialized into pCmd->payload, we need to rebuild key for metricmeta info in cache. */ - bool reqMetricMeta = false; - int32_t subClauseIndex = 0; + bool required = false; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex); for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { char tagstr[TSDB_MAX_TAGS_LEN + 1] = {0}; SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i); - tscGetMetricMetaCacheKey(pCmd, subClauseIndex, tagstr, pMeterMetaInfo->pMeterMeta->uid); + tscGetMetricMetaCacheKey(pCmd, clauseIndex, tagstr, pMeterMetaInfo->pMeterMeta->uid); taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false); SMetricMeta *ppMeta = (SMetricMeta *)taosGetDataFromCache(tscCacheHandle, tagstr); if (ppMeta == NULL) { - reqMetricMeta = true; + required = true; break; } else { pMeterMetaInfo->pMetricMeta = ppMeta; } } - // all metricmeta are retrieved from cache, no need to query mgmt node - if (!reqMetricMeta) { + // all metricmeta for one clause are retrieved from cache, no need to retrieve metricmeta from management node + if (!required) { return TSDB_CODE_SUCCESS; } @@ -3645,7 +3640,9 @@ int tscGetMetricMeta(SSqlObj *pSql) { pNew->cmd.command = TSDB_SQL_METRIC; SQueryInfo *pNewQueryInfo = NULL; - tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo); + if ((code = tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo)) != TSDB_CODE_SUCCESS) { + return code; + } for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { SMeterMetaInfo *pMMInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i); @@ -3667,10 +3664,10 @@ int tscGetMetricMeta(SSqlObj *pSql) { pNewQueryInfo->slimit = pQueryInfo->slimit; pNewQueryInfo->order = pQueryInfo->order; - if (pSql->fp != NULL && pSql->pStream == NULL) { - pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); - tscFreeSubqueryInfo(pCmd); - } +// if (pSql->fp != NULL && pSql->pStream == NULL) { +// pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); +// tscFreeSubqueryInfo(pCmd); +// } tscTrace("%p allocate new pSqlObj:%p to get metricMeta", pSql, pNew); if (pSql->fp == NULL) { @@ -3679,19 +3676,22 @@ int tscGetMetricMeta(SSqlObj *pSql) { code = tscProcessSql(pNew); - for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { - char tagstr[TSDB_MAX_TAGS_LEN] = {0}; - - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i); - tscGetMetricMetaCacheKey(pCmd, 0, tagstr, pMeterMetaInfo->pMeterMeta->uid); + if (code == TSDB_CODE_SUCCESS) {//todo optimize the performance + for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { + char tagstr[TSDB_MAX_TAGS_LEN] = {0}; + + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i); + tscGetMetricMetaCacheKey(pCmd, 0, tagstr, pMeterMetaInfo->pMeterMeta->uid); #ifdef _DEBUG_VIEW - printf("create metric key:%s, index:%d\n", tagstr, i); + printf("create metric key:%s, index:%d\n", tagstr, i); #endif - - taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false); - pMeterMetaInfo->pMetricMeta = (SMetricMeta *)taosGetDataFromCache(tscCacheHandle, tagstr); + + taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false); + pMeterMetaInfo->pMetricMeta = (SMetricMeta *)taosGetDataFromCache(tscCacheHandle, tagstr); + } } + tscFreeSqlObj(pNew); } else { diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 4b98780696bc1dbebeef90910e44b45b1ef5f60e..7198954e8c84d42d717c8cb247343ca64c8679fa 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -443,7 +443,7 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) { bool hasData = true; SSqlCmd *pCmd = &pSql->cmd; - if (tscProjectionQueryOnMetric(pCmd, 0)) { + if (tscProjectionQueryOnSTable(pCmd, 0)) { bool allSubqueryExhausted = true; for (int32_t i = 0; i < pSql->numOfSubs; ++i) { @@ -611,7 +611,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - while (rows == NULL && tscProjectionQueryOnMetric(pCmd, 0)) { + while (rows == NULL && tscProjectionQueryOnSTable(pCmd, 0)) { SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); // reach the maximum number of output rows, abort @@ -670,7 +670,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { nRows = taos_fetch_block_impl(res, rows); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - while (*rows == NULL && tscProjectionQueryOnMetric(pCmd, 0)) { + while (*rows == NULL && tscProjectionQueryOnSTable(pCmd, 0)) { /* reach the maximum number of output rows, abort */ if (tscHasReachLimitation(pSql)) { return 0; diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index f9ccd9f6e753f67048b6f108660226cd724e2b73..69e3ea1eb350f366d6ccdffa7c3643d4ef6272dc 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -76,7 +76,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; if (code == 0 && UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) { - code = tscGetMetricMeta(pSql); + code = tscGetMetricMeta(pSql, 0); pSql->res.code = code; if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 1d0f030b2124deb6d69206273177a944c7cdd7e0..40016119a3af92bc0d751990a2093da3dfd3e2f6 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -223,7 +223,7 @@ bool tscIsTwoStageMergeMetricQuery(SSqlCmd* pCmd) { } // for projection query, iterate all qualified vnodes sequentially - if (tscProjectionQueryOnMetric(pCmd, subClauseIndex)) { + if (tscProjectionQueryOnSTable(pCmd, subClauseIndex)) { return false; } @@ -235,10 +235,12 @@ bool tscIsTwoStageMergeMetricQuery(SSqlCmd* pCmd) { return false; } -bool tscProjectionQueryOnMetric(SSqlCmd* pCmd, int32_t subClauseIndex) { +bool tscProjectionQueryOnSTable(SSqlCmd* pCmd, int32_t subClauseIndex) { assert(pCmd != NULL); - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex); + assert(pQueryInfo->numOfTables > 0); + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); /* @@ -542,9 +544,8 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { if (pMeterMetaInfo->pMeterMeta != pDataBlock->pMeterMeta) { strcpy(pMeterMetaInfo->name, pDataBlock->meterId); taosRemoveDataFromCache(tscCacheHandle, (void**)&(pMeterMetaInfo->pMeterMeta), false); - - pMeterMetaInfo->pMeterMeta = pDataBlock->pMeterMeta; - pDataBlock->pMeterMeta = NULL; // delegate the ownership of metermeta to pMeterMetaInfo + + pMeterMetaInfo->pMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**) &pDataBlock->pMeterMeta); } else { assert(strncmp(pMeterMetaInfo->name, pDataBlock->meterId, tListLen(pDataBlock->meterId)) == 0); } @@ -590,7 +591,7 @@ void tscFreeUnusedDataBlocks(SDataBlockList* pList) { * @return */ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name, - STableDataBlocks** dataBlocks) { + SMeterMeta* pMeterMeta, STableDataBlocks** dataBlocks) { STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks)); if (dataBuf == NULL) { tscError("failed to allocated memory, reason:%s", strerror(errno)); @@ -610,22 +611,18 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff /* * The metermeta may be released since the metermeta cache are completed clean by other thread - * due to operation such as drop database. + * due to operation such as drop database. So here we add the reference count directly instead of invoke + * taosGetDataFromCache, which may return NULL value. */ - dataBuf->pMeterMeta = taosGetDataFromCache(tscCacheHandle, dataBuf->meterId); - assert(initialSize > 0); + dataBuf->pMeterMeta = taosGetDataFromExists(tscCacheHandle, pMeterMeta); + assert(initialSize > 0 && pMeterMeta != NULL && dataBuf->pMeterMeta != NULL); - if (dataBuf->pMeterMeta == NULL) { - tfree(dataBuf); - return TSDB_CODE_QUERY_CACHE_ERASED; - } else { - *dataBlocks = dataBuf; - return TSDB_CODE_SUCCESS; - } + *dataBlocks = dataBuf; + return TSDB_CODE_SUCCESS; } int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size, - int32_t startOffset, int32_t rowSize, const char* tableId, + int32_t startOffset, int32_t rowSize, const char* tableId, SMeterMeta* pMeterMeta, STableDataBlocks** dataBlocks) { *dataBlocks = NULL; @@ -635,7 +632,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, } if (*dataBlocks == NULL) { - int32_t ret = tscCreateDataBlock((size_t)size, rowSize, startOffset, tableId, dataBlocks); + int32_t ret = tscCreateDataBlock((size_t)size, rowSize, startOffset, tableId, pMeterMeta, dataBlocks); if (ret != TSDB_CODE_SUCCESS) { return ret; } @@ -658,11 +655,12 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi STableDataBlocks* dataBuf = NULL; int32_t ret = tscGetDataBlockFromList(pVnodeDataBlockHashList, pVnodeDataBlockList, pOneTableBlock->vgid, - TSDB_PAYLOAD_SIZE, tsInsertHeadSize, 0, pOneTableBlock->meterId, &dataBuf); + TSDB_PAYLOAD_SIZE, tsInsertHeadSize, 0, pOneTableBlock->meterId, + pOneTableBlock->pMeterMeta, &dataBuf); if (ret != TSDB_CODE_SUCCESS) { - tscError("%p failed to allocate the data buffer block for merging table data", pSql); - tscDestroyBlockArrayList(pTableDataBlockList); - + tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret); + taosCleanUpHashTable(pVnodeDataBlockHashList); + tscDestroyBlockArrayList(pVnodeDataBlockList); return ret; } @@ -1746,7 +1744,7 @@ void doRemoveMeterMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFro int32_t after = pQueryInfo->numOfTables - index - 1; if (after > 0) { - memmove(&pQueryInfo->pMeterInfo[index], &pQueryInfo->pMeterInfo[index + 1], after * sizeof(void*)); + memmove(&pQueryInfo->pMeterInfo[index], &pQueryInfo->pMeterInfo[index + 1], after * POINTER_BYTES); } pQueryInfo->numOfTables -= 1; @@ -1826,9 +1824,12 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void pNewQueryInfo->tsBuf = NULL; tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond); - pNewQueryInfo->defaultVal = malloc(pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(int64_t)); - memcpy(pNewQueryInfo->defaultVal, pQueryInfo->defaultVal, pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(int64_t)); - + + if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) { + pNewQueryInfo->defaultVal = malloc(pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(int64_t)); + memcpy(pNewQueryInfo->defaultVal, pQueryInfo->defaultVal, pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(int64_t)); + } + if (tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) { tscError("%p new subquery failed, tableIndex:%d, vnodeIndex:%d", pSql, tableIndex, pMeterMetaInfo->vnodeIndex); tscFreeSqlObj(pNew); @@ -1887,11 +1888,12 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void pMeterMetaInfo->tagColumnIndex); } else { // transfer the ownership of pMeterMeta/pMetricMeta to the newly create sql object. SMeterMetaInfo* pPrevInfo = tscGetMeterMetaInfo(&pPrevSql->cmd, 0, 0); - pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pPrevInfo->pMeterMeta, pPrevInfo->pMetricMeta, - pMeterMetaInfo->numOfTags, pMeterMetaInfo->tagColumnIndex); - - pPrevInfo->pMeterMeta = NULL; - pPrevInfo->pMetricMeta = NULL; + + SMeterMeta* pPrevMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**) &pPrevInfo->pMeterMeta); + SMetricMeta* pPrevMetricMeta = taosTransferDataInCache(tscCacheHandle, (void**) &pPrevInfo->pMetricMeta); + + pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pPrevMeterMeta, pPrevMetricMeta, pMeterMetaInfo->numOfTags, + pMeterMetaInfo->tagColumnIndex); } assert(pFinalInfo->pMeterMeta != NULL); diff --git a/src/inc/tcache.h b/src/inc/tcache.h index 93bbf22cd3752589731375a32da3da78c635b956..b577c53ea8dbcdc9f069288b94d0244907e77f12 100644 --- a/src/inc/tcache.h +++ b/src/inc/tcache.h @@ -86,6 +86,26 @@ void taosCleanUpDataCache(void *handle); */ void taosClearDataCache(void *handle); +/** + * Add one reference count for the exist data, and assign this data for a new owner. + * The new owner needs to invoke the taosRemoveDataFromCache when it does not need this data anymore. + * This procedure is a faster version of taosGetDataFromCache function, which avoids the sideeffect of the problem of the + * data is moved to trash, and taosGetDataFromCache will fail to retrieve it again. + * + * @param handle + * @param data + * @return + */ +void* taosGetDataFromExists(void* handle, void* data); + +/** + * transfer the ownership of data in cache to another object without increasing reference count. + * @param handle + * @param data + * @return + */ +void* taosTransferDataInCache(void* handle, void** data); + #ifdef __cplusplus } #endif diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 8a2f1347df9e7eb7d7fb29623eab4120b5484aeb..e8c5476e92c0e76ebf1c2a50d5f70c3c521a8d2b 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -20,6 +20,7 @@ #include "ttime.h" #include "ttimer.h" #include "tutil.h" +#include "hashutil.h" #define HASH_MAX_CAPACITY (1024*1024*16) #define HASH_VALUE_IN_TRASH (-1) @@ -901,5 +902,46 @@ void taosCleanUpDataCache(void *handle) { } pObj->deleting = 1; - return; +} + +void* taosGetDataFromExists(void* handle, void* data) { + SCacheObj *pObj = (SCacheObj *)handle; + if (pObj == NULL || data == NULL) return NULL; + + size_t offset = offsetof(SDataNode, data); + SDataNode *ptNode = (SDataNode *)((char *)data - offset); + + if (ptNode->signature != (uint64_t) ptNode) { + pError("key: %p the data from cache is invalid", ptNode); + return NULL; + } + + int32_t ref = atomic_add_fetch_32(&ptNode->refCount, 1); + pTrace("%p add ref data in cache, refCnt:%d", data, ref) + + // the data if referenced by at least one object, so the reference count must be greater than the value of 2. + assert(ref >= 2); + return data; +} + +void* taosTransferDataInCache(void* handle, void** data) { + SCacheObj *pObj = (SCacheObj *)handle; + if (pObj == NULL || data == NULL) return NULL; + + size_t offset = offsetof(SDataNode, data); + SDataNode *ptNode = (SDataNode *)((char *)(*data) - offset); + + if (ptNode->signature != (uint64_t) ptNode) { + pError("key: %p the data from cache is invalid", ptNode); + return NULL; + } + + assert(ptNode->refCount >= 1); + + char* d = *data; + + // clear its reference to old area + *data = NULL; + + return d; }