提交 6287be5a 编写于 作者: H hjxilinx

add the union support in sql parser: fix memory leaks and refactor some...

add the union support in sql parser: fix memory leaks and refactor some functions, improve the performance of some functions. #1032. [TBASE-1140]
上级 f2016993
......@@ -364,7 +364,6 @@ SQuerySQL *tSetQuerySQLElems(SSQLToken *pSelectToken, tSQLExprList *pSelection,
SCreateTableSQL *tSetCreateSQLElems(tFieldList *pCols, tFieldList *pTags, SSQLToken *pMetricName,
tVariantList *pTagVals, SQuerySQL *pSelect, int32_t type);
void tSQLExprDestroy(tSQLExpr *);
void tSQLExprNodeDestroy(tSQLExpr *pExpr);
tSQLExpr *tSQLExprNodeClone(tSQLExpr *pExpr);
......
......@@ -67,7 +67,7 @@ typedef struct SJoinSubquerySupporter {
} SJoinSubquerySupporter;
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name,
STableDataBlocks** dataBlocks);
SMeterMeta* pMeterMeta, STableDataBlocks** dataBlocks);
void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock);
......@@ -81,7 +81,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
void tscFreeUnusedDataBlocks(SDataBlockList* pList);
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList);
int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, const char* tableId,
int32_t startOffset, int32_t rowSize, const char* tableId, SMeterMeta* pMeterMeta,
STableDataBlocks** dataBlocks);
SVnodeSidList* tscGetVnodeSidList(SMetricMeta* pMetricmeta, int32_t vnodeIdx);
......@@ -97,7 +97,7 @@ SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
*/
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
bool tscProjectionQueryOnMetric(SSqlCmd* pCmd, int32_t subClauseIndex);
bool tscProjectionQueryOnSTable(SSqlCmd* pCmd, int32_t subClauseIndex);
bool tscProjectionQueryOnTable(SQueryInfo* pQueryInfo);
bool tscIsTwoStageMergeMetricQuery(SSqlCmd* pCmd);
......@@ -197,7 +197,7 @@ void tscFreeSubqueryInfo(SSqlCmd* pCmd);
void tscClearSubqueryInfo(SSqlCmd* pCmd);
void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, int32_t subClauseIndex, char* keyStr, uint64_t uid);
int tscGetMetricMeta(SSqlObj* pSql);
int tscGetMetricMeta(SSqlObj* pSql, int32_t clauseIndex);
int tscGetMeterMeta(SSqlObj* pSql, SMeterMetaInfo* pMeterMetaInfo);
int tscGetMeterMetaEx(SSqlObj* pSql, SMeterMetaInfo* pMeterMetaInfo, bool createIfNotExists);
......
......@@ -121,7 +121,7 @@ static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOf
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
// sequentially retrieve data from remain vnodes first, query vnode specified by vnodeIdx
if (numOfRows == 0 && tscProjectionQueryOnMetric(pCmd, 0)) {
if (numOfRows == 0 && tscProjectionQueryOnSTable(pCmd, 0)) {
// vnode is denoted by vnodeIdx, continue to query vnode specified by vnodeIdx
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
assert(pMeterMetaInfo->vnodeIndex >= 0);
......@@ -272,7 +272,7 @@ void tscProcessAsyncRetrieve(void *param, TAOS_RES *tres, int numOfRows) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
if (numOfRows == 0) {
// sequentially retrieve data from remain vnodes.
if (tscProjectionQueryOnMetric(pCmd, 0)) {
if (tscProjectionQueryOnSTable(pCmd, 0)) {
/*
* vnode is denoted by vnodeIdx, continue to query vnode specified by vnodeIdx till all vnode have been retrieved
*/
......@@ -517,7 +517,7 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
code = tscGetMetricMeta(pSql);
code = tscGetMetricMeta(pSql, 0);
pRes->code = code;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
......@@ -534,7 +534,7 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
if (code == TSDB_CODE_SUCCESS && UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) {
code = tscGetMetricMeta(pSql);
code = tscGetMetricMeta(pSql, 0);
pRes->code = code;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
......
......@@ -438,7 +438,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
taos_fetch_rows_a(tres, joinRetrieveCallback, param);
} else if (numOfRows == 0) { // no data from this vnode anymore
if (tscProjectionQueryOnMetric(&pSql->cmd, 0)) {
if (tscProjectionQueryOnSTable(&pParentSql->cmd, 0)) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
......@@ -498,7 +498,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
SSqlCmd* pCmd = &pSql->cmd;
if (tscProjectionQueryOnMetric(pCmd, 0) && numOfRows == 0) {
if (tscProjectionQueryOnSTable(pCmd, 0) && numOfRows == 0) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1);
......@@ -543,7 +543,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
if (tscProjectionQueryOnMetric(&pSql->cmd, 0)) {
if (tscProjectionQueryOnSTable(&pSql->cmd, 0)) {
if (pRes->row >= pRes->numOfRows && pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes &&
(!tscHasReachLimitation(pSql->pSubs[i]))) {
numOfFetch++;
......@@ -711,7 +711,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
* if the query is a continue query (vnodeIndex > 0 for projection query) for next vnode, do the retrieval of
* data instead of returning to its invoker
*/
if (pMeterMetaInfo->vnodeIndex > 0 && tscProjectionQueryOnMetric(&pSql->cmd, 0)) {
if (pMeterMetaInfo->vnodeIndex > 0 && tscProjectionQueryOnSTable(&pSql->cmd, 0)) {
assert(pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes);
pSupporter->pState->numOfCompleted = 0; // reset the record value
......
......@@ -18,8 +18,8 @@
#define _XOPEN_SOURCE
#include "ihash.h"
#include "os.h"
#include "hash.h"
#include "tscSecondaryMerge.h"
#include "tscUtil.h"
#include "tschemautil.h"
......@@ -652,7 +652,8 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char
STableDataBlocks *dataBuf = NULL;
int32_t ret = tscGetDataBlockFromList(pTableHashList, pCmd->pDataBlocks, pMeterMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SShellSubmitBlock), pMeterMeta->rowSize, pMeterMetaInfo->name, &dataBuf);
sizeof(SShellSubmitBlock), pMeterMeta->rowSize, pMeterMetaInfo->name,
pMeterMeta, &dataBuf);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
......@@ -1111,8 +1112,10 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
wordfree(&full_path);
STableDataBlocks *pDataBlock = NULL;
int32_t ret = tscCreateDataBlock(PATH_MAX, pMeterMetaInfo->pMeterMeta->rowSize, sizeof(SShellSubmitBlock),
pMeterMetaInfo->name, &pDataBlock);
SMeterMeta* pMeterMeta = pMeterMetaInfo->pMeterMeta;
int32_t ret = tscCreateDataBlock(PATH_MAX, pMeterMeta->rowSize, sizeof(SShellSubmitBlock), pMeterMetaInfo->name,
pMeterMeta, &pDataBlock);
if (ret != TSDB_CODE_SUCCESS) {
goto _error_clean;
}
......@@ -1358,8 +1361,8 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
pCmd->pDataBlocks = tscCreateBlockArrayList();
STableDataBlocks *pTableDataBlock = NULL;
int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, pMeterMeta->rowSize, sizeof(SShellSubmitBlock),
pMeterMetaInfo->name, &pTableDataBlock);
int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, rowSize, sizeof(SShellSubmitBlock),
pMeterMetaInfo->name, pMeterMeta, &pTableDataBlock);
if (ret != TSDB_CODE_SUCCESS) {
return -1;
}
......@@ -1370,10 +1373,10 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
if (maxRows < 1) return -1;
int count = 0;
SParsedDataColInfo spd = {.numOfCols = pMeterMetaInfo->pMeterMeta->numOfColumns};
SSchema * pSchema = tsGetSchema(pMeterMetaInfo->pMeterMeta);
SParsedDataColInfo spd = {.numOfCols = pMeterMeta->numOfColumns};
SSchema * pSchema = tsGetSchema(pMeterMeta);
tscSetAssignedColumnInfo(&spd, pSchema, pMeterMetaInfo->pMeterMeta->numOfColumns);
tscSetAssignedColumnInfo(&spd, pSchema, pMeterMeta->numOfColumns);
while ((readLen = getline(&line, &n, fp)) != -1) {
// line[--readLen] = '\0';
......@@ -1383,8 +1386,8 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
char *lineptr = line;
strtolower(line, line);
if (numOfRows >= maxRows || pTableDataBlock->size + pMeterMeta->rowSize >= pTableDataBlock->nAllocSize) {
uint32_t tSize = tscAllocateMemIfNeed(pTableDataBlock, pMeterMeta->rowSize);
if (numOfRows >= maxRows || pTableDataBlock->size + rowSize >= pTableDataBlock->nAllocSize) {
uint32_t tSize = tscAllocateMemIfNeed(pTableDataBlock, rowSize);
if (0 == tSize) return (-TSDB_CODE_CLI_OUT_OF_MEMORY);
maxRows += tSize;
}
......
......@@ -660,6 +660,13 @@ int32_t setMeterID(SMeterMetaInfo* pMeterMetaInfo, SSQLToken* pzTableName, SSqlO
SSqlCmd* pCmd = &pSql->cmd;
int32_t code = TSDB_CODE_SUCCESS;
// backup the old name in pMeterMetaInfo
size_t size = strlen(pMeterMetaInfo->name);
char* oldName = NULL;
if (size > 0) {
oldName = strdup(pMeterMetaInfo->name);
}
if (hasSpecifyDB(pzTableName)) {
// db has been specified in sql string so we ignore current db path
code = setObjFullName(pMeterMetaInfo->name, getAccountId(pSql), NULL, pzTableName, NULL);
......@@ -674,7 +681,25 @@ int32_t setMeterID(SMeterMetaInfo* pMeterMetaInfo, SSQLToken* pzTableName, SSqlO
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg);
}
return code;
if (code != TSDB_CODE_SUCCESS) {
free(oldName);
return code;
}
/*
* the old name exists and is not equalled to the new name. Release the metermeta/metricmeta
* that are corresponding to the old name for the new table name.
*/
if (size > 0) {
if (strncasecmp(oldName, pMeterMetaInfo->name, tListLen(pMeterMetaInfo->name)) != 0) {
tscClearMeterMetaInfo(pMeterMetaInfo, false);
}
} else {
assert(pMeterMetaInfo->pMeterMeta == NULL && pMeterMetaInfo->pMetricMeta == NULL);
}
tfree(oldName);
return TSDB_CODE_SUCCESS;
}
static bool validateTableColumnInfo(tFieldList* pFieldList, SSqlCmd* pCmd) {
......@@ -4412,7 +4437,7 @@ int32_t parseLimitClause(SSqlObj* pSql, int32_t subClauseIndex, SQuerySQL* pQuer
if (queryOnTags == true) { // local handle the metric tag query
pCmd->command = TSDB_SQL_RETRIEVE_TAGS;
} else {
if (tscProjectionQueryOnMetric(&pSql->cmd, 0) &&
if (tscProjectionQueryOnSTable(&pSql->cmd, 0) &&
(pQueryInfo->slimit.limit > 0 || pQueryInfo->slimit.offset > 0)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
......@@ -4425,11 +4450,12 @@ int32_t parseLimitClause(SSqlObj* pSql, int32_t subClauseIndex, SQuerySQL* pQuer
}
/*
* get the distribution of all tables among available virtual nodes that satisfy query condition and
* created according to this super table from management node.
* And then launching multiple async-queries on required virtual nodes, which is the first-stage query operation.
* Get the distribution of all tables among all available virtual nodes that are qualified for the query condition
* and created according to this super table from management node.
* And then launching multiple async-queries against all qualified virtual nodes, during the first-stage
* query operation.
*/
int32_t code = tscGetMetricMeta(pSql);
int32_t code = tscGetMetricMeta(pSql, 0);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -4951,7 +4977,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
}
// projection query on metric does not compatible with "group by" syntax
if (tscProjectionQueryOnMetric(pCmd, 0)) {
if (tscProjectionQueryOnSTable(pCmd, 0)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
......@@ -5259,9 +5285,6 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
assert(pQueryInfo->numOfTables == 1);
// if (pQueryInfo->numOfTables == 1) {
// tscAddEmptyMeterMetaInfo(pQueryInfo);
// }
SCreateTableSQL* pCreateTable = pInfo->pCreateTableInfo;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
......
......@@ -2931,17 +2931,12 @@ int tscProcessMeterMetaRsp(SSqlObj *pSql) {
pMeta->numOfColumns = htons(pMeta->numOfColumns);
if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
tscError("invalid tag value count:%d", pMeta->numOfTags);
return TSDB_CODE_INVALID_VALUE;
}
if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
tscError("invalid numOfTags:%d", pMeta->numOfTags);
return TSDB_CODE_INVALID_VALUE;
}
if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns <= 0) {
tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
return TSDB_CODE_INVALID_VALUE;
}
......@@ -2985,10 +2980,11 @@ int tscProcessMeterMetaRsp(SSqlObj *pSql) {
// todo add one more function: taosAddDataIfNotExists();
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), false);
assert(pMeterMetaInfo->pMeterMeta == NULL);
pMeterMetaInfo->pMeterMeta = (SMeterMeta *)taosAddDataIntoCache(tscCacheHandle, pMeterMetaInfo->name, (char *)pMeta,
size, tsMeterMetaKeepTimer);
// todo handle out of memory case
if (pMeterMetaInfo->pMeterMeta == NULL) return 0;
return TSDB_CODE_OTHERS;
......@@ -3421,7 +3417,7 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
* If the query result is exhausted, or current query is to free resource at server side,
* the connection will be recycled.
*/
if ((pRes->numOfRows == 0 && !(tscProjectionQueryOnMetric(pCmd, 0) && pRes->offset > 0)) ||
if ((pRes->numOfRows == 0 && !(tscProjectionQueryOnSTable(pCmd, 0) && pRes->offset > 0)) ||
((pQueryInfo->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE)) {
tscTrace("%p no result or free resource, recycle connection", pSql);
taosAddConnIntoCache(tscConnCache, pSql->thandle, pSql->ip, pSql->vnode, pObj->user);
......@@ -3494,9 +3490,7 @@ static int32_t doGetMeterMetaFromServer(SSqlObj *pSql, SMeterMetaInfo *pMeterMet
* Transfer the ownership of metermeta to the new object, instead of invoking the release/acquire routine
*/
if (code == TSDB_CODE_SUCCESS) {
pMeterMetaInfo->pMeterMeta = pNewMeterMetaInfo->pMeterMeta;
pNewMeterMetaInfo->pMeterMeta = NULL;
pMeterMetaInfo->pMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**) &pNewMeterMetaInfo->pMeterMeta);
assert(pMeterMetaInfo->pMeterMeta != NULL);
}
......@@ -3520,10 +3514,12 @@ static int32_t doGetMeterMetaFromServer(SSqlObj *pSql, SMeterMetaInfo *pMeterMet
int tscGetMeterMeta(SSqlObj *pSql, SMeterMetaInfo *pMeterMetaInfo) {
assert(strlen(pMeterMetaInfo->name) != 0);
// if the SSqlCmd owns a metermeta, release it first
taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), false);
// If this SMeterMetaInfo owns a metermeta, release it first
if (pMeterMetaInfo->pMeterMeta != NULL) {
taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), false);
}
pMeterMetaInfo->pMeterMeta = (SMeterMeta *)taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name);
if (pMeterMetaInfo->pMeterMeta != NULL) {
SMeterMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;
......@@ -3605,36 +3601,35 @@ int tscRenewMeterMeta(SSqlObj *pSql, char *meterId) {
return code;
}
int tscGetMetricMeta(SSqlObj *pSql) {
int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) {
int code = TSDB_CODE_NETWORK_UNAVAIL;
SSqlCmd *pCmd = &pSql->cmd;
/*
* the vnode query condition is serialized into pCmd->payload, we need to rebuild key for metricmeta info in cache.
* the query condition is serialized into pCmd->payload, we need to rebuild key for metricmeta info in cache.
*/
bool reqMetricMeta = false;
int32_t subClauseIndex = 0;
bool required = false;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
char tagstr[TSDB_MAX_TAGS_LEN + 1] = {0};
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
tscGetMetricMetaCacheKey(pCmd, subClauseIndex, tagstr, pMeterMetaInfo->pMeterMeta->uid);
tscGetMetricMetaCacheKey(pCmd, clauseIndex, tagstr, pMeterMetaInfo->pMeterMeta->uid);
taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
SMetricMeta *ppMeta = (SMetricMeta *)taosGetDataFromCache(tscCacheHandle, tagstr);
if (ppMeta == NULL) {
reqMetricMeta = true;
required = true;
break;
} else {
pMeterMetaInfo->pMetricMeta = ppMeta;
}
}
// all metricmeta are retrieved from cache, no need to query mgmt node
if (!reqMetricMeta) {
// all metricmeta for one clause are retrieved from cache, no need to retrieve metricmeta from management node
if (!required) {
return TSDB_CODE_SUCCESS;
}
......@@ -3645,7 +3640,9 @@ int tscGetMetricMeta(SSqlObj *pSql) {
pNew->cmd.command = TSDB_SQL_METRIC;
SQueryInfo *pNewQueryInfo = NULL;
tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo);
if ((code = tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo)) != TSDB_CODE_SUCCESS) {
return code;
}
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SMeterMetaInfo *pMMInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
......@@ -3667,10 +3664,10 @@ int tscGetMetricMeta(SSqlObj *pSql) {
pNewQueryInfo->slimit = pQueryInfo->slimit;
pNewQueryInfo->order = pQueryInfo->order;
if (pSql->fp != NULL && pSql->pStream == NULL) {
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
tscFreeSubqueryInfo(pCmd);
}
// if (pSql->fp != NULL && pSql->pStream == NULL) {
// pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
// tscFreeSubqueryInfo(pCmd);
// }
tscTrace("%p allocate new pSqlObj:%p to get metricMeta", pSql, pNew);
if (pSql->fp == NULL) {
......@@ -3679,19 +3676,22 @@ int tscGetMetricMeta(SSqlObj *pSql) {
code = tscProcessSql(pNew);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
char tagstr[TSDB_MAX_TAGS_LEN] = {0};
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
tscGetMetricMetaCacheKey(pCmd, 0, tagstr, pMeterMetaInfo->pMeterMeta->uid);
if (code == TSDB_CODE_SUCCESS) {//todo optimize the performance
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
char tagstr[TSDB_MAX_TAGS_LEN] = {0};
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
tscGetMetricMetaCacheKey(pCmd, 0, tagstr, pMeterMetaInfo->pMeterMeta->uid);
#ifdef _DEBUG_VIEW
printf("create metric key:%s, index:%d\n", tagstr, i);
printf("create metric key:%s, index:%d\n", tagstr, i);
#endif
taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
pMeterMetaInfo->pMetricMeta = (SMetricMeta *)taosGetDataFromCache(tscCacheHandle, tagstr);
taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
pMeterMetaInfo->pMetricMeta = (SMetricMeta *)taosGetDataFromCache(tscCacheHandle, tagstr);
}
}
tscFreeSqlObj(pNew);
} else {
......
......@@ -443,7 +443,7 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
bool hasData = true;
SSqlCmd *pCmd = &pSql->cmd;
if (tscProjectionQueryOnMetric(pCmd, 0)) {
if (tscProjectionQueryOnSTable(pCmd, 0)) {
bool allSubqueryExhausted = true;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
......@@ -611,7 +611,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
while (rows == NULL && tscProjectionQueryOnMetric(pCmd, 0)) {
while (rows == NULL && tscProjectionQueryOnSTable(pCmd, 0)) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
// reach the maximum number of output rows, abort
......@@ -670,7 +670,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
nRows = taos_fetch_block_impl(res, rows);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
while (*rows == NULL && tscProjectionQueryOnMetric(pCmd, 0)) {
while (*rows == NULL && tscProjectionQueryOnSTable(pCmd, 0)) {
/* reach the maximum number of output rows, abort */
if (tscHasReachLimitation(pSql)) {
return 0;
......
......@@ -76,7 +76,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
if (code == 0 && UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) {
code = tscGetMetricMeta(pSql);
code = tscGetMetricMeta(pSql, 0);
pSql->res.code = code;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
......
......@@ -223,7 +223,7 @@ bool tscIsTwoStageMergeMetricQuery(SSqlCmd* pCmd) {
}
// for projection query, iterate all qualified vnodes sequentially
if (tscProjectionQueryOnMetric(pCmd, subClauseIndex)) {
if (tscProjectionQueryOnSTable(pCmd, subClauseIndex)) {
return false;
}
......@@ -235,10 +235,12 @@ bool tscIsTwoStageMergeMetricQuery(SSqlCmd* pCmd) {
return false;
}
bool tscProjectionQueryOnMetric(SSqlCmd* pCmd, int32_t subClauseIndex) {
bool tscProjectionQueryOnSTable(SSqlCmd* pCmd, int32_t subClauseIndex) {
assert(pCmd != NULL);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex);
assert(pQueryInfo->numOfTables > 0);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
/*
......@@ -542,9 +544,8 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
if (pMeterMetaInfo->pMeterMeta != pDataBlock->pMeterMeta) {
strcpy(pMeterMetaInfo->name, pDataBlock->meterId);
taosRemoveDataFromCache(tscCacheHandle, (void**)&(pMeterMetaInfo->pMeterMeta), false);
pMeterMetaInfo->pMeterMeta = pDataBlock->pMeterMeta;
pDataBlock->pMeterMeta = NULL; // delegate the ownership of metermeta to pMeterMetaInfo
pMeterMetaInfo->pMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**) &pDataBlock->pMeterMeta);
} else {
assert(strncmp(pMeterMetaInfo->name, pDataBlock->meterId, tListLen(pDataBlock->meterId)) == 0);
}
......@@ -590,7 +591,7 @@ void tscFreeUnusedDataBlocks(SDataBlockList* pList) {
* @return
*/
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name,
STableDataBlocks** dataBlocks) {
SMeterMeta* pMeterMeta, STableDataBlocks** dataBlocks) {
STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks));
if (dataBuf == NULL) {
tscError("failed to allocated memory, reason:%s", strerror(errno));
......@@ -610,22 +611,18 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
/*
* The metermeta may be released since the metermeta cache are completed clean by other thread
* due to operation such as drop database.
* due to operation such as drop database. So here we add the reference count directly instead of invoke
* taosGetDataFromCache, which may return NULL value.
*/
dataBuf->pMeterMeta = taosGetDataFromCache(tscCacheHandle, dataBuf->meterId);
assert(initialSize > 0);
dataBuf->pMeterMeta = taosGetDataFromExists(tscCacheHandle, pMeterMeta);
assert(initialSize > 0 && pMeterMeta != NULL && dataBuf->pMeterMeta != NULL);
if (dataBuf->pMeterMeta == NULL) {
tfree(dataBuf);
return TSDB_CODE_QUERY_CACHE_ERASED;
} else {
*dataBlocks = dataBuf;
return TSDB_CODE_SUCCESS;
}
*dataBlocks = dataBuf;
return TSDB_CODE_SUCCESS;
}
int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, const char* tableId,
int32_t startOffset, int32_t rowSize, const char* tableId, SMeterMeta* pMeterMeta,
STableDataBlocks** dataBlocks) {
*dataBlocks = NULL;
......@@ -635,7 +632,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList,
}
if (*dataBlocks == NULL) {
int32_t ret = tscCreateDataBlock((size_t)size, rowSize, startOffset, tableId, dataBlocks);
int32_t ret = tscCreateDataBlock((size_t)size, rowSize, startOffset, tableId, pMeterMeta, dataBlocks);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
......@@ -658,11 +655,12 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
STableDataBlocks* dataBuf = NULL;
int32_t ret = tscGetDataBlockFromList(pVnodeDataBlockHashList, pVnodeDataBlockList, pOneTableBlock->vgid,
TSDB_PAYLOAD_SIZE, tsInsertHeadSize, 0, pOneTableBlock->meterId, &dataBuf);
TSDB_PAYLOAD_SIZE, tsInsertHeadSize, 0, pOneTableBlock->meterId,
pOneTableBlock->pMeterMeta, &dataBuf);
if (ret != TSDB_CODE_SUCCESS) {
tscError("%p failed to allocate the data buffer block for merging table data", pSql);
tscDestroyBlockArrayList(pTableDataBlockList);
tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret);
taosCleanUpHashTable(pVnodeDataBlockHashList);
tscDestroyBlockArrayList(pVnodeDataBlockList);
return ret;
}
......@@ -1746,7 +1744,7 @@ void doRemoveMeterMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFro
int32_t after = pQueryInfo->numOfTables - index - 1;
if (after > 0) {
memmove(&pQueryInfo->pMeterInfo[index], &pQueryInfo->pMeterInfo[index + 1], after * sizeof(void*));
memmove(&pQueryInfo->pMeterInfo[index], &pQueryInfo->pMeterInfo[index + 1], after * POINTER_BYTES);
}
pQueryInfo->numOfTables -= 1;
......@@ -1826,9 +1824,12 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
pNewQueryInfo->tsBuf = NULL;
tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond);
pNewQueryInfo->defaultVal = malloc(pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(int64_t));
memcpy(pNewQueryInfo->defaultVal, pQueryInfo->defaultVal, pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(int64_t));
if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
pNewQueryInfo->defaultVal = malloc(pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(int64_t));
memcpy(pNewQueryInfo->defaultVal, pQueryInfo->defaultVal, pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(int64_t));
}
if (tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) {
tscError("%p new subquery failed, tableIndex:%d, vnodeIndex:%d", pSql, tableIndex, pMeterMetaInfo->vnodeIndex);
tscFreeSqlObj(pNew);
......@@ -1887,11 +1888,12 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
pMeterMetaInfo->tagColumnIndex);
} else { // transfer the ownership of pMeterMeta/pMetricMeta to the newly create sql object.
SMeterMetaInfo* pPrevInfo = tscGetMeterMetaInfo(&pPrevSql->cmd, 0, 0);
pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pPrevInfo->pMeterMeta, pPrevInfo->pMetricMeta,
pMeterMetaInfo->numOfTags, pMeterMetaInfo->tagColumnIndex);
pPrevInfo->pMeterMeta = NULL;
pPrevInfo->pMetricMeta = NULL;
SMeterMeta* pPrevMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**) &pPrevInfo->pMeterMeta);
SMetricMeta* pPrevMetricMeta = taosTransferDataInCache(tscCacheHandle, (void**) &pPrevInfo->pMetricMeta);
pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pPrevMeterMeta, pPrevMetricMeta, pMeterMetaInfo->numOfTags,
pMeterMetaInfo->tagColumnIndex);
}
assert(pFinalInfo->pMeterMeta != NULL);
......
......@@ -86,6 +86,26 @@ void taosCleanUpDataCache(void *handle);
*/
void taosClearDataCache(void *handle);
/**
* Add one reference count for the exist data, and assign this data for a new owner.
* The new owner needs to invoke the taosRemoveDataFromCache when it does not need this data anymore.
* This procedure is a faster version of taosGetDataFromCache function, which avoids the sideeffect of the problem of the
* data is moved to trash, and taosGetDataFromCache will fail to retrieve it again.
*
* @param handle
* @param data
* @return
*/
void* taosGetDataFromExists(void* handle, void* data);
/**
* transfer the ownership of data in cache to another object without increasing reference count.
* @param handle
* @param data
* @return
*/
void* taosTransferDataInCache(void* handle, void** data);
#ifdef __cplusplus
}
#endif
......
......@@ -20,6 +20,7 @@
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
#include "hashutil.h"
#define HASH_MAX_CAPACITY (1024*1024*16)
#define HASH_VALUE_IN_TRASH (-1)
......@@ -901,5 +902,46 @@ void taosCleanUpDataCache(void *handle) {
}
pObj->deleting = 1;
return;
}
void* taosGetDataFromExists(void* handle, void* data) {
SCacheObj *pObj = (SCacheObj *)handle;
if (pObj == NULL || data == NULL) return NULL;
size_t offset = offsetof(SDataNode, data);
SDataNode *ptNode = (SDataNode *)((char *)data - offset);
if (ptNode->signature != (uint64_t) ptNode) {
pError("key: %p the data from cache is invalid", ptNode);
return NULL;
}
int32_t ref = atomic_add_fetch_32(&ptNode->refCount, 1);
pTrace("%p add ref data in cache, refCnt:%d", data, ref)
// the data if referenced by at least one object, so the reference count must be greater than the value of 2.
assert(ref >= 2);
return data;
}
void* taosTransferDataInCache(void* handle, void** data) {
SCacheObj *pObj = (SCacheObj *)handle;
if (pObj == NULL || data == NULL) return NULL;
size_t offset = offsetof(SDataNode, data);
SDataNode *ptNode = (SDataNode *)((char *)(*data) - offset);
if (ptNode->signature != (uint64_t) ptNode) {
pError("key: %p the data from cache is invalid", ptNode);
return NULL;
}
assert(ptNode->refCount >= 1);
char* d = *data;
// clear its reference to old area
*data = NULL;
return d;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册