From 38f0eccaadf8070c7f68512c0d302cc34ce2e99a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 19 Aug 2022 20:31:28 +0800 Subject: [PATCH] opti: group by tag --- source/libs/executor/inc/executil.h | 1 + source/libs/executor/src/executil.c | 221 +++++++++++++++++++++++- source/libs/executor/src/executorimpl.c | 46 ++--- 3 files changed, 232 insertions(+), 36 deletions(-) diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 52c73f85f5..4da4747108 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -115,6 +115,7 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode); EDealRes doTranslateTagExpr(SNode** pNode, void* pContext); int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, STableListInfo* pListInfo); int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId); +int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableListInfo* pTableListInfo); size_t getTableTagsBufLen(const SNodeList* pGroups); SArray* createSortInfo(SNodeList* pNodeList); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index baeb972e05..7f61eb678f 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -428,8 +428,6 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray // int64_t st = taosGetTimestampUs(); for (int32_t i = 0; i < rows; i++) { int64_t* uid = taosArrayGet(uidList, i); - void* tag = taosHashGet(tags, uid, sizeof(int64_t)); - ASSERT(tag); for(int32_t j = 0; j < taosArrayGetSize(pResBlock->pDataBlock); j++){ SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j); @@ -441,6 +439,8 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray qDebug("tagfilter uid:%ld, tbname:%s", *uid, str+2); #endif }else{ + void* tag = taosHashGet(tags, uid, sizeof(int64_t)); + ASSERT(tag); STagVal tagVal = {0}; tagVal.cid = pColInfo->info.colId; const char* p = metaGetTableTagVal(tag, pColInfo->info.type, &tagVal); @@ -501,6 +501,223 @@ end: return output.columnData; } +static void releaseColInfoData(void* pCol) { + if(pCol){ + SColumnInfoData* col = (SColumnInfoData*) pCol; + colDataDestroy(col); + taosMemoryFree(col); + } +} + +int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableListInfo* pTableListInfo){ + int32_t code = TSDB_CODE_SUCCESS; + SArray *pBlockList = NULL; + SSDataBlock *pResBlock = NULL; + SHashObj *tags = NULL; + SArray *uidList = NULL; + void *keyBuf = NULL; + SArray *groupData = NULL; + + int32_t rows = taosArrayGetSize(pTableListInfo->pTableList); + if(rows == 0){ + return TDB_CODE_SUCCESS; + } + + tagFilterAssist ctx = {0}; + ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK); + if(ctx.colHash == NULL){ + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + ctx.index = 0; + ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo)); + if(ctx.cInfoList == NULL){ + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + + SNode* pNode = NULL; + FOREACH(pNode, group) { + nodesRewriteExprPostOrder(&pNode, getColumn, (void *)&ctx); + REPLACE_NODE(pNode); + } + + pResBlock = createDataBlock(); + if (pResBlock == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + + for (int32_t i = 0; i < taosArrayGetSize(ctx.cInfoList); ++i) { + SColumnInfoData colInfo = {{0}, 0}; + colInfo.info = *(SColumnInfo*)taosArrayGet(ctx.cInfoList, i); + blockDataAppendColInfo(pResBlock, &colInfo); + } + + uidList = taosArrayInit(rows, sizeof(uint64_t)); + for (int32_t i = 0; i < rows; ++i) { + STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i); + taosArrayPush(uidList, &pkeyInfo->uid); + } + +// int64_t stt = taosGetTimestampUs(); + tags = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + code = metaGetTableTags(metaHandle, pTableListInfo->suid, uidList, tags); + if (code != TSDB_CODE_SUCCESS) { + goto end; + } + +// int64_t stt1 = taosGetTimestampUs(); +// qDebug("generate tag meta rows:%d, cost:%ld us", rows, stt1-stt); + + code = blockDataEnsureCapacity(pResBlock, rows); + if (code != TSDB_CODE_SUCCESS) { + goto end; + } + +// int64_t st = taosGetTimestampUs(); + for (int32_t i = 0; i < rows; i++) { + int64_t* uid = taosArrayGet(uidList, i); + for(int32_t j = 0; j < taosArrayGetSize(pResBlock->pDataBlock); j++){ + SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j); + + if(pColInfo->info.colId == -1){ // tbname + char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + metaGetTableNameByUid(metaHandle, *uid, str); + colDataAppend(pColInfo, i, str, false); +#if TAG_FILTER_DEBUG + qDebug("tagfilter uid:%ld, tbname:%s", *uid, str+2); +#endif + }else{ + void* tag = taosHashGet(tags, uid, sizeof(int64_t)); + ASSERT(tag); + STagVal tagVal = {0}; + tagVal.cid = pColInfo->info.colId; + const char* p = metaGetTableTagVal(tag, pColInfo->info.type, &tagVal); + + if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)){ + colDataAppend(pColInfo, i, p, true); + } else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) { + colDataAppend(pColInfo, i, p, false); + } else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) { + char *tmp = taosMemoryCalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1, 1); + varDataSetLen(tmp, tagVal.nData); + memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData); + colDataAppend(pColInfo, i, tmp, false); +#if TAG_FILTER_DEBUG + qDebug("tagfilter varch:%s", tmp+2); +#endif + taosMemoryFree(tmp); + } else { + colDataAppend(pColInfo, i, (const char*)&tagVal.i64, false); +#if TAG_FILTER_DEBUG + if(pColInfo->info.type == TSDB_DATA_TYPE_INT){ + qDebug("tagfilter int:%d", *(int*)(&tagVal.i64)); + }else if(pColInfo->info.type == TSDB_DATA_TYPE_DOUBLE){ + qDebug("tagfilter double:%f", *(double *)(&tagVal.i64)); + } +#endif + } + } + } + } + pResBlock->info.rows = rows; + +// int64_t st1 = taosGetTimestampUs(); +// qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st); + + pBlockList = taosArrayInit(2, POINTER_BYTES); + taosArrayPush(pBlockList, &pResBlock); + + groupData = taosArrayInit(2, POINTER_BYTES); + FOREACH(pNode, group) { + SScalarParam output = {0}; + + switch (nodeType(pNode)) { + case QUERY_NODE_COLUMN: + case QUERY_NODE_VALUE: + case QUERY_NODE_OPERATOR: + case QUERY_NODE_FUNCTION: + case QUERY_NODE_LOGIC_CONDITION:{ + SExprNode* expNode = (SExprNode*)pNode; + code = createResultData(&expNode->resType, rows, &output); + if (code != TSDB_CODE_SUCCESS) { + goto end; + } + break; + } + default: + ASSERT(0); + } + code = scalarCalculate(pNode, pBlockList, &output); + if(code != TSDB_CODE_SUCCESS){ + releaseColInfoData(output.columnData); + goto end; + } + taosArrayPush(groupData, &output.columnData); + } + + int32_t keyLen = 0; + SNode* node; + FOREACH(node, group) { + SExprNode* pExpr = (SExprNode*)node; + keyLen += pExpr->resType.bytes; + } + + int32_t nullFlagSize = sizeof(int8_t) * LIST_LENGTH(group); + keyLen += nullFlagSize; + + keyBuf = taosMemoryCalloc(1, keyLen); + if (keyBuf == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + for(int i = 0; i < rows; i++){ + STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); + + char* isNull = (char*)keyBuf; + char* pStart = (char*)keyBuf + sizeof(int8_t) * LIST_LENGTH(group); + for(int j = 0; j < taosArrayGetSize(groupData); j++){ + SColumnInfoData* pValue = (SColumnInfoData*)taosArrayGetP(groupData, j); + + ASSERT(pValue->info.type != TSDB_DATA_TYPE_JSON); + + if (pValue->info.type == TSDB_DATA_TYPE_NULL || colDataIsNull_s(pValue, i)) { + isNull[j] = 1; + continue; + } else { + isNull[j] = 0; + char* data = colDataGetData(pValue, i); + if (IS_VAR_DATA_TYPE(pValue->info.type)) { + memcpy(pStart, data, varDataTLen(data)); + pStart += varDataTLen(data); + } else { + memcpy(pStart, data, pValue->info.bytes); + pStart += pValue->info.type; + } + } + } + + int32_t len = (int32_t)(pStart - (char*)keyBuf); + info->groupId = calcGroupId(keyBuf, len); + taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t)); + } + +// int64_t st2 = taosGetTimestampUs(); +// qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1); + + end: + taosMemoryFreeClear(keyBuf); + taosHashCleanup(tags); + taosHashCleanup(ctx.colHash); + taosArrayDestroy(ctx.cInfoList); + blockDataDestroy(pResBlock); + taosArrayDestroy(pBlockList); + taosArrayDestroy(uidList); + taosArrayDestroyP(groupData, releaseColInfoData); + return code; +} + int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, STableListInfo* pListInfo) { int32_t code = TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 9ff5b5d759..9a512b3c2b 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3874,9 +3874,9 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) { tDeleteSSchemaWrapper(pSchemaInfo->qsw); } -static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) { +static int32_t sortTableGroup(STableListInfo* pTableListInfo) { taosArrayClear(pTableListInfo->pGroupList); - SArray* sortSupport = taosArrayInit(groupNum, sizeof(uint64_t)); + SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t)); if (sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY; for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) { STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); @@ -3954,48 +3954,26 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, if (pTableListInfo->map == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - int32_t keyLen = 0; - void* keyBuf = NULL; - - SNode* node; - FOREACH(node, group) { - SExprNode* pExpr = (SExprNode*)node; - keyLen += pExpr->resType.bytes; - } - - int32_t nullFlagSize = sizeof(int8_t) * LIST_LENGTH(group); - keyLen += nullFlagSize; - - keyBuf = taosMemoryCalloc(1, keyLen); - if (keyBuf == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } bool assignUid = groupbyTbname(group); - int32_t groupNum = 0; size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); - for (int32_t i = 0; i < numOfTables; i++) { - STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); - - if (assignUid) { + if(assignUid){ + for (int32_t i = 0; i < numOfTables; i++) { + STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); info->groupId = info->uid; - } else { - int32_t code = getGroupIdFromTagsVal(pHandle->meta, info->uid, group, keyBuf, &info->groupId); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t)); + } + }else{ + int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; } - - taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t)); - groupNum++; } - taosMemoryFree(keyBuf); - if (pTableListInfo->needSortTableByGroupId) { - return sortTableGroup(pTableListInfo, groupNum); + return sortTableGroup(pTableListInfo); } return TDB_CODE_SUCCESS; -- GitLab