From 62127398993168ee33b2694e6c83c668b0cddf85 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 24 Dec 2020 22:49:59 +0800 Subject: [PATCH] [TD-2378]: optimize the client memory requirement. --- src/client/inc/tsclient.h | 3 +- src/client/src/tscFunctionImpl.c | 1 - src/client/src/tscParseInsert.c | 2 +- src/client/src/tscSQLParser.c | 2 +- src/client/src/tscSchemaUtil.c | 30 +++--- src/client/src/tscServer.c | 91 ++++++------------ src/client/src/tscUtil.c | 46 +++------ src/query/inc/qUtil.h | 4 - src/query/src/qExecutor.c | 7 +- src/query/src/qUtil.c | 154 ------------------------------- 10 files changed, 59 insertions(+), 281 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 33ea06ba9c..0f3b4b7566 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -69,9 +69,10 @@ typedef struct STableMeta { int16_t sversion; int16_t tversion; char sTableId[TSDB_TABLE_FNAME_LEN]; - SVgroupInfo vgroupInfo; + int32_t vgId; SCorVgroupInfo corVgroupInfo; STableId id; +// union {int64_t stableUid; SSchema* schema;}; SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info } STableMeta; diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 6afc5ba223..22240efe14 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -2625,7 +2625,6 @@ static bool apercentile_function_setup(SQLFunctionCtx *pCtx) { char *tmp = (char *)pInfo + sizeof(SAPercentileInfo); pInfo->pHisto = tHistogramCreateFrom(tmp, MAX_HISTOGRAM_BIN); - printf("%p, %p\n", pInfo->pHisto, pInfo->pHisto->elems); return true; } diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index ded12f64ea..b43fe1fcd7 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -731,7 +731,7 @@ static int32_t doParseInsertStatement(SSqlCmd* pCmd, char **str, SParsedDataColI return code; } - dataBuf->vgId = pTableMeta->vgroupInfo.vgId; + dataBuf->vgId = pTableMeta->vgId; dataBuf->numOfTables = 1; *totalNum += numOfRows; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index f1acea71c5..4dad65fe9e 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -4936,7 +4936,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { } SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload; - pUpdateMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId); + pUpdateMsg->head.vgId = htonl(pTableMeta->vgId); pUpdateMsg->tid = htonl(pTableMeta->id.tid); pUpdateMsg->uid = htobe64(pTableMeta->id.uid); pUpdateMsg->colId = htons(pTagsSchema->colId); diff --git a/src/client/src/tscSchemaUtil.c b/src/client/src/tscSchemaUtil.c index fcc93ffadc..d77bb9990c 100644 --- a/src/client/src/tscSchemaUtil.c +++ b/src/client/src/tscSchemaUtil.c @@ -130,13 +130,14 @@ SSchema* tscGetColumnSchemaById(STableMeta* pTableMeta, int16_t colId) { return NULL; } -static void tscInitCorVgroupInfo(SCorVgroupInfo *corVgroupInfo, SVgroupInfo *vgroupInfo) { +static void tscInitCorVgroupInfo(SCorVgroupInfo *corVgroupInfo, SVgroupMsg *pVgroupMsg) { corVgroupInfo->version = 0; - corVgroupInfo->inUse = 0; - corVgroupInfo->numOfEps = vgroupInfo->numOfEps; - for (int32_t i = 0; i < corVgroupInfo->numOfEps; i++) { - corVgroupInfo->epAddr[i].fqdn = strdup(vgroupInfo->epAddr[i].fqdn); - corVgroupInfo->epAddr[i].port = vgroupInfo->epAddr[i].port; + corVgroupInfo->inUse = 0; + corVgroupInfo->numOfEps = pVgroupMsg->numOfEps; + + for (int32_t i = 0; i < pVgroupMsg->numOfEps; i++) { + corVgroupInfo->epAddr[i].fqdn = strndup(pVgroupMsg->epAddr[i].fqdn, tListLen(pVgroupMsg->epAddr[0].fqdn)); + corVgroupInfo->epAddr[i].port = pVgroupMsg->epAddr[i].port; } } @@ -145,8 +146,10 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size int32_t schemaSize = (pTableMetaMsg->numOfColumns + pTableMetaMsg->numOfTags) * sizeof(SSchema); STableMeta* pTableMeta = calloc(1, sizeof(STableMeta) + schemaSize); + pTableMeta->tableType = pTableMetaMsg->tableType; - + pTableMeta->vgId = pTableMetaMsg->vgroup.vgId; + pTableMeta->tableInfo = (STableComInfo) { .numOfTags = pTableMetaMsg->numOfTags, .precision = pTableMetaMsg->precision, @@ -156,18 +159,7 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size pTableMeta->id.tid = pTableMetaMsg->tid; pTableMeta->id.uid = pTableMetaMsg->uid; - SVgroupInfo* pVgroupInfo = &pTableMeta->vgroupInfo; - pVgroupInfo->numOfEps = pTableMetaMsg->vgroup.numOfEps; - pVgroupInfo->vgId = pTableMetaMsg->vgroup.vgId; - - for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) { - SEpAddrMsg* pEpMsg = &pTableMetaMsg->vgroup.epAddr[i]; - - pVgroupInfo->epAddr[i].fqdn = strndup(pEpMsg->fqdn, tListLen(pEpMsg->fqdn)); - pVgroupInfo->epAddr[i].port = pEpMsg->port; - } - - tscInitCorVgroupInfo(&pTableMeta->corVgroupInfo, pVgroupInfo); + tscInitCorVgroupInfo(&pTableMeta->corVgroupInfo, &pTableMetaMsg->vgroup); pTableMeta->sversion = pTableMetaMsg->sversion; pTableMeta->tversion = pTableMetaMsg->tversion; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ded04388f4..958ae28427 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -45,32 +45,30 @@ static int32_t getWaitingTimeInterval(int32_t count) { return 0; } - return initial * (2<<(count - 2)); + return initial * ((2u)<<(count - 2)); } -static void tscSetDnodeEpSet(SSqlObj* pSql, SVgroupInfo* pVgroupInfo) { - assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0); - - SRpcEpSet* pEpSet = &pSql->epSet; +static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupInfo* pVgroupInfo) { + assert(pEpSet != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0); // Issue the query to one of the vnode among a vgroup randomly. // change the inUse property would not affect the isUse attribute of STableMeta pEpSet->inUse = rand() % pVgroupInfo->numOfEps; // apply the FQDN string length check here - bool hasFqdn = false; + bool existed = false; pEpSet->numOfEps = pVgroupInfo->numOfEps; for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) { - tstrncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(pEpSet->fqdn[i])); pEpSet->port[i] = pVgroupInfo->epAddr[i].port; - if (!hasFqdn) { - hasFqdn = (strlen(pEpSet->fqdn[i]) > 0); + int32_t len = (int32_t) strnlen(pVgroupInfo->epAddr[i].fqdn, TSDB_FQDN_LEN); + if (len > 0) { + tstrncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(pEpSet->fqdn[i])); + existed = true; } } - - assert(hasFqdn); + assert(existed); } static void tscDumpMgmtEpSet(SSqlObj *pSql) { @@ -102,7 +100,8 @@ void tscUpdateMgmtEpSet(SSqlObj *pSql, SRpcEpSet *pEpSet) { pCorEpSet->epSet = *pEpSet; taosCorEndWrite(&pCorEpSet->version); } -static void tscDumpEpSetFromVgroupInfo(SCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) { + +static void tscDumpEpSetFromVgroupInfo(SRpcEpSet *pEpSet, SCorVgroupInfo *pVgroupInfo) { if (pVgroupInfo == NULL) { return;} taosCorBeginRead(&pVgroupInfo->version); int8_t inUse = pVgroupInfo->inUse; @@ -515,8 +514,8 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } } else { STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; - pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId); - tscDebug("%p build fetch msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgroupInfo.vgId); + pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId); + tscDebug("%p build fetch msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgId); } pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg); @@ -535,7 +534,6 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { // NOTE: shell message size should not include SMsgDesc int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc); - int32_t vgId = pTableMeta->vgroupInfo.vgId; SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg; pMsgDesc->numOfVnodes = htonl(1); // always one vnode @@ -543,7 +541,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg += sizeof(SMsgDesc); SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg; - pShellMsg->header.vgId = htonl(vgId); + pShellMsg->header.vgId = htonl(pTableMeta->vgId); pShellMsg->header.contLen = htonl(size); // the length not includes the size of SMsgDesc pShellMsg->length = pShellMsg->header.contLen; @@ -551,9 +549,9 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { // pSql->cmd.payloadLen is set during copying data into payload pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; - tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epSet); + tscDumpEpSetFromVgroupInfo(&pSql->epSet, &pTableMeta->corVgroupInfo); - tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit, + tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, pTableMeta->vgId, pSql->cmd.numOfTablesInSubmit, pSql->epSet.numOfEps); return TSDB_CODE_SUCCESS; } @@ -597,24 +595,28 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) { - SVgroupInfo* pVgroupInfo = NULL; + int32_t vgId = -1; if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { int32_t index = pTableMetaInfo->vgroupIndex; assert(index >= 0); - + + SVgroupInfo* pVgroupInfo = NULL; if (pTableMetaInfo->vgroupList->numOfVgroups > 0) { assert(index < pTableMetaInfo->vgroupList->numOfVgroups); pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index]; } + + vgId = pVgroupInfo->vgId; + tscSetDnodeEpSet(&pSql->epSet, pVgroupInfo); tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups); } else { - pVgroupInfo = &pTableMeta->vgroupInfo; + vgId = pTableMeta->vgId; + tscDumpEpSetFromVgroupInfo(&pSql->epSet, &pTableMeta->corVgroupInfo); } - assert(pVgroupInfo != NULL); + pSql->epSet.inUse = rand()%pSql->epSet.numOfEps; - tscSetDnodeEpSet(pSql, pVgroupInfo); - pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId); + pQueryMsg->head.vgId = htonl(vgId); STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; pTableIdInfo->tid = htonl(pTableMeta->id.tid); @@ -633,7 +635,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index); // set the vgroup info - tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo); + tscSetDnodeEpSet(&pSql->epSet, &pTableIdList->vgInfo); pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId); int32_t numOfTables = (int32_t)taosArrayGetSize(pTableIdList->itemList); @@ -1448,48 +1450,11 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - tscDumpEpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->epSet); + tscDumpEpSetFromVgroupInfo(&pSql->epSet, &pTableMetaInfo->pTableMeta->corVgroupInfo); return TSDB_CODE_SUCCESS; } -//int tscBuildCancelQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { -// SCancelQueryMsg *pCancelMsg = (SCancelQueryMsg*) pSql->cmd.payload; -// pCancelMsg->qhandle = htobe64(pSql->res.qhandle); -// -// SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); -// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); -// -// if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { -// int32_t vgIndex = pTableMetaInfo->vgroupIndex; -// if (pTableMetaInfo->pVgroupTables == NULL) { -// SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList; -// assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups); -// -// pCancelMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId); -// tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex); -// } else { -// int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); -// assert(vgIndex >= 0 && vgIndex < numOfVgroups); -// -// SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex); -// -// pCancelMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId); -// tscDebug("%p build cancel query msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex); -// } -// } else { -// STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; -// pCancelMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId); -// tscDebug("%p build cancel query msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgroupInfo.vgId); -// } -// -// pSql->cmd.payloadLen = sizeof(SCancelQueryMsg); -// pSql->cmd.msgType = TSDB_MSG_TYPE_CANCEL_QUERY; -// -// pCancelMsg->header.contLen = htonl(sizeof(SCancelQueryMsg)); -// return TSDB_CODE_SUCCESS; -//} - int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; pCmd->payloadLen = sizeof(SAlterDbMsg); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 2991e89581..e7e0134f36 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -466,13 +466,6 @@ void tscFreeRegisteredSqlObj(void *pSql) { void tscFreeTableMetaHelper(void *pTableMeta) { STableMeta* p = (STableMeta*) pTableMeta; - int32_t numOfEps = p->vgroupInfo.numOfEps; - assert(numOfEps >= 0 && numOfEps <= TSDB_MAX_REPLICA); - - for(int32_t i = 0; i < numOfEps; ++i) { - tfree(p->vgroupInfo.epAddr[i].fqdn); - } - int32_t numOfEps1 = p->corVgroupInfo.numOfEps; assert(numOfEps1 >= 0 && numOfEps1 <= TSDB_MAX_REPLICA); @@ -1941,30 +1934,24 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm return NULL; } - pNew->fp = fp; + pNew->fp = fp; pNew->fetchFp = fp; - pNew->param = param; + pNew->param = param; + pNew->sqlstr = NULL; pNew->maxRetry = TSDB_MAX_REPLICA; - pNew->sqlstr = strdup(pSql->sqlstr); - if (pNew->sqlstr == NULL) { - tscError("%p new subquery failed", pSql); - tscFreeSqlObj(pNew); - return NULL; - } - SQueryInfo* pQueryInfo = tscGetQueryInfoDetailSafely(pCmd, 0); assert(pSql->cmd.clauseIndex == 0); STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0); tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL); - registerSqlObj(pNew); + return pNew; } -static void doSetSqlExprAndResultFieldInfo(SQueryInfo* pQueryInfo, SQueryInfo* pNewQueryInfo, int64_t uid) { +static void doSetSqlExprAndResultFieldInfo(SQueryInfo* pNewQueryInfo, int64_t uid) { int32_t numOfOutput = (int32_t)tscSqlExprNumOfExprs(pNewQueryInfo); if (numOfOutput == 0) { return; @@ -2017,15 +2004,9 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, tableIndex); - pNew->pTscObj = pSql->pTscObj; + pNew->pTscObj = pSql->pTscObj; pNew->signature = pNew; - - pNew->sqlstr = strdup(pSql->sqlstr); - if (pNew->sqlstr == NULL) { - tscError("%p new subquery failed, tableIndex:%d, vgroupIndex:%d", pSql, tableIndex, pTableMetaInfo->vgroupIndex); - terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; - goto _error; - } + pNew->sqlstr = NULL; SSqlCmd* pnCmd = &pNew->cmd; memcpy(pnCmd, pCmd, sizeof(SSqlCmd)); @@ -2113,23 +2094,22 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void goto _error; } - doSetSqlExprAndResultFieldInfo(pQueryInfo, pNewQueryInfo, uid); + doSetSqlExprAndResultFieldInfo(pNewQueryInfo, uid); - pNew->fp = fp; + pNew->fp = fp; pNew->fetchFp = fp; - - pNew->param = param; + pNew->param = param; pNew->maxRetry = TSDB_MAX_REPLICA; char* name = pTableMetaInfo->name; STableMetaInfo* pFinalInfo = NULL; - if (pPrevSql == NULL) { - STableMeta* pTableMeta = taosCacheAcquireByData(tscMetaCache, pTableMetaInfo->pTableMeta); // get by name may failed due to the cache cleanup + if (pPrevSql == NULL) { // get by name may failed due to the cache cleanup + STableMeta* pTableMeta = taosCacheAcquireByData(tscMetaCache, pTableMetaInfo->pTableMeta); assert(pTableMeta != NULL); pFinalInfo = tscAddTableMetaInfo(pNewQueryInfo, name, pTableMeta, pTableMetaInfo->vgroupList, - pTableMetaInfo->tagColList, pTableMetaInfo->pVgroupTables); + pTableMetaInfo->tagColList, pTableMetaInfo->pVgroupTables); } else { // transfer the ownership of pTableMeta to the newly create sql object. STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0); diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index 974d93f89c..4620e3d61e 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -34,17 +34,13 @@ int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size, int16_t void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo); void resetResultRowInfo(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo); -void popFrontResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int32_t num); -void clearClosedResultRows(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo *pResultRowInfo); int32_t numOfClosedResultRows(SResultRowInfo* pResultRowInfo); void closeAllResultRows(SResultRowInfo* pResultRowInfo); -void removeRedundantResultRows(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_t order); int32_t initResultRow(SResultRow *pResultRow); void closeResultRow(SResultRowInfo* pResultRowInfo, int32_t slot); bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot); void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, int16_t type); -void copyResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* dst, const SResultRow* src, int16_t type); SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 2b992a931d..64eba616ad 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -5182,10 +5182,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) { scanMultiTableDataBlocks(pQInfo); pQInfo->groupIndex += 1; - SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; + taosArrayDestroy(s); // no results generated for current group, continue to try the next group - taosArrayDestroy(s); + SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; if (pWindowResInfo->size <= 0) { continue; } @@ -5212,8 +5212,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { pQInfo->groupIndex = currentGroupIndex; // restore the group index assert(pQuery->rec.rows == pWindowResInfo->size); - - clearClosedResultRows(pRuntimeEnv, &pRuntimeEnv->windowResInfo); + resetResultRowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo); break; } } else if (pRuntimeEnv->queryWindowIdentical && pRuntimeEnv->pTsBuf == NULL && !isTSCompQuery(pQuery)) { diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index d09993ae4e..dc01de0f92 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -20,18 +20,6 @@ #include "qExecutor.h" #include "qUtil.h" -static int32_t getResultRowKeyInfo(SResultRow* pResult, int16_t type, char** key, int16_t* bytes) { - if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { - *key = varDataVal(pResult->key); - *bytes = varDataLen(pResult->key); - } else { - *key = (char*) &pResult->win.skey; - *bytes = tDataTypeDesc[type].nSize; - } - - return 0; -} - int32_t getOutputInterResultBufSize(SQuery* pQuery) { int32_t size = 0; @@ -99,73 +87,6 @@ void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRo pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL; } -void popFrontResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int32_t num) { - if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0 || pResultRowInfo->size == 0 || num == 0) { - return; - } - - int32_t numOfClosed = numOfClosedResultRows(pResultRowInfo); - assert(num >= 0 && num <= numOfClosed); - - int16_t type = pResultRowInfo->type; - int64_t uid = 0; - - char *key = NULL; - int16_t bytes = -1; - - for (int32_t i = 0; i < num; ++i) { - SResultRow *pResult = pResultRowInfo->pResult[i]; - if (pResult->closed) { // remove the window slot from hash table - getResultRowKeyInfo(pResult, type, &key, &bytes); - SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); - taosHashRemove(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); - } else { - break; - } - } - - int32_t remain = pResultRowInfo->size - num; - - // clear all the closed windows from the window list - for (int32_t k = 0; k < remain; ++k) { - copyResultRow(pRuntimeEnv, pResultRowInfo->pResult[k], pResultRowInfo->pResult[num + k], type); - } - - // move the unclosed window in the front of the window list - for (int32_t k = remain; k < pResultRowInfo->size; ++k) { - SResultRow *pWindowRes = pResultRowInfo->pResult[k]; - clearResultRow(pRuntimeEnv, pWindowRes, pResultRowInfo->type); - } - - pResultRowInfo->size = remain; - - for (int32_t k = 0; k < pResultRowInfo->size; ++k) { - SResultRow *pResult = pResultRowInfo->pResult[k]; - getResultRowKeyInfo(pResult, type, &key, &bytes); - SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); - - int32_t *p = (int32_t *)taosHashGet(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); - assert(p != NULL); - - int32_t v = (*p - num); - assert(v >= 0 && v <= pResultRowInfo->size); - - SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); - taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), (char *)&v, sizeof(int32_t)); - } - - pResultRowInfo->curIndex = -1; -} - -void clearClosedResultRows(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) { - if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0 || pResultRowInfo->size == 0) { - return; - } - - int32_t numOfClosed = numOfClosedResultRows(pResultRowInfo); - popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, numOfClosed); -} - int32_t numOfClosedResultRows(SResultRowInfo *pResultRowInfo) { int32_t i = 0; while (i < pResultRowInfo->size && pResultRowInfo->pResult[i]->closed) { @@ -188,40 +109,6 @@ void closeAllResultRows(SResultRowInfo *pResultRowInfo) { } } -/* - * remove the results that are not the FIRST time window that spreads beyond the - * the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time. - * NOTE: remove redundant, only when the result set order equals to traverse order - */ -void removeRedundantResultRows(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_t order) { - assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size); - if (pResultRowInfo->size <= 1) { - return; - } - - // get the result order - int32_t resultOrder = (pResultRowInfo->pResult[0]->win.skey < pResultRowInfo->pResult[1]->win.skey)? 1:-1; - if (order != resultOrder) { - return; - } - - int32_t i = 0; - if (order == QUERY_ASC_FORWARD_STEP) { - TSKEY ekey = pResultRowInfo->pResult[i]->win.ekey; - while (i < pResultRowInfo->size && (ekey < lastKey)) { - ++i; - } - } else if (order == QUERY_DESC_FORWARD_STEP) { - while (i < pResultRowInfo->size && (pResultRowInfo->pResult[i]->win.skey > lastKey)) { - ++i; - } - } - - if (i < pResultRowInfo->size) { - pResultRowInfo->size = (i + 1); - } -} - bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot) { return (getResultRow(pResultRowInfo, slot)->closed == true); } @@ -262,47 +149,6 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16 } } -/** - * The source window result pos attribution of the source window result does not assign to the destination, - * since the attribute of "Pos" is bound to each window result when the window result is created in the - * disk-based result buffer. - */ -void copyResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *dst, const SResultRow *src, int16_t type) { - dst->numOfRows = src->numOfRows; - - if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { - dst->key = realloc(dst->key, varDataTLen(src->key)); - varDataCopy(dst->key, src->key); - } else { - dst->win = src->win; - } - dst->closed = src->closed; - - int32_t nOutputCols = pRuntimeEnv->pQuery->numOfOutput; - - for (int32_t i = 0; i < nOutputCols; ++i) { - SResultRowCellInfo *pDst = getResultCell(pRuntimeEnv, dst, i); - SResultRowCellInfo *pSrc = getResultCell(pRuntimeEnv, src, i); - -// char *buf = pDst->interResultBuf; - memcpy(pDst, pSrc, sizeof(SResultRowCellInfo) + pRuntimeEnv->pCtx[i].interBufBytes); -// pDst->interResultBuf = buf; // restore the allocated buffer - - // copy the result info struct -// memcpy(pDst->interResultBuf, pSrc->interResultBuf, pRuntimeEnv->pCtx[i].interBufBytes); - - // copy the output buffer data from src to dst, the position info keep unchanged - tFilePage *dstpage = getResBufPage(pRuntimeEnv->pResultBuf, dst->pageId); - char * dstBuf = getPosInResultPage(pRuntimeEnv, i, dst, dstpage); - - tFilePage *srcpage = getResBufPage(pRuntimeEnv->pResultBuf, src->pageId); - char * srcBuf = getPosInResultPage(pRuntimeEnv, i, (SResultRow *)src, srcpage); - size_t s = pRuntimeEnv->pQuery->pExpr1[i].bytes; - - memcpy(dstBuf, srcBuf, s); - } -} - SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index) { assert(index >= 0 && index < pRuntimeEnv->pQuery->numOfOutput); return (SResultRowCellInfo*)((char*) pRow->pCellInfo + pRuntimeEnv->rowCellInfoOffset[index]); -- GitLab