未验证 提交 32fbb8aa 编写于 作者: wmmhello's avatar wmmhello 提交者: GitHub

Merge pull request #13801 from taosdata/feature/TD-15956

opt: optimize generating groupid in partition/group by tag
......@@ -695,6 +695,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3150)
#define TSDB_CODE_RSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3151)
//index
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
#ifdef __cplusplus
}
......
......@@ -27,10 +27,7 @@ typedef struct {
int32_t bytes;
} SGroupKeys, SStateKeys;
int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList);
uint64_t calcGroupId(char* pData, int32_t len);
void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex);
int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals);
#ifdef __cplusplus
}
#endif
......
......@@ -336,12 +336,6 @@ typedef struct STableScanInfo {
int32_t dataBlockLoadFlag;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded.
SArray* pGroupCols;
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width
SHashObj* pGroupSet; // quick locate the window object for each result
SSampleExecInfo sample; // sample execution info
int32_t curTWinIdx;
} STableScanInfo;
......@@ -789,7 +783,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SArray* groupKyes, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, SExecTaskInfo* pTaskInfo);
......@@ -827,7 +821,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, SArray* pTableIdList,
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle,
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup);
......
......@@ -4558,7 +4558,6 @@ static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRead
static int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STableListInfo* pListInfo,
SNode* pTagCond);
static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo);
static SArray* extractColumnInfo(SNodeList* pNodeList);
static SArray* createSortInfo(SNodeList* pNodeList);
......@@ -4592,6 +4591,85 @@ int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskI
return TSDB_CODE_SUCCESS;
}
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SArray* groupKey){
if(groupKey == NULL) {
return TDB_CODE_SUCCESS;
}
pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (pTableListInfo->map == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t keyLen = 0;
void *keyBuf = NULL;
int32_t numOfGroupCols = taosArrayGetSize(groupKey);
for (int32_t j = 0; j < numOfGroupCols; ++j) {
SColumn* pCol = taosArrayGet(groupKey, j);
keyLen += pCol->bytes; // actual data + null_flag
}
int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
keyLen += nullFlagSize;
keyBuf = taosMemoryCalloc(1, keyLen);
if (keyBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for(int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++){
STableKeyInfo *info = taosArrayGet(pTableListInfo->pTableList, i);
SMetaReader mr = {0};
metaReaderInit(&mr, pHandle->meta, 0);
metaGetTableEntryByUid(&mr, info->uid);
char* isNull = (char*)keyBuf;
char* pStart = (char*)keyBuf + sizeof(int8_t) * numOfGroupCols;
for (int32_t j = 0; j < numOfGroupCols; ++j) {
SColumn* pCol = taosArrayGet(groupKey, j);
if(strcmp(pCol->name, "tbname") == 0){
isNull[i] = 0;
memcpy(pStart, mr.me.name, strlen(mr.me.name));
pStart += strlen(mr.me.name);
}else{
STagVal tagVal = {0};
tagVal.cid = pCol->colId;
const char* p = metaGetTableTagVal(&mr.me, pCol->type, &tagVal);
if(p == NULL){
isNull[j] = 1;
continue;
}
isNull[i] = 0;
if (pCol->type == TSDB_DATA_TYPE_JSON) {
// int32_t dataLen = getJsonValueLen(pkey->pData);
// memcpy(pStart, (pkey->pData), dataLen);
// pStart += dataLen;
} else if (IS_VAR_DATA_TYPE(pCol->type)) {
memcpy(pStart, tagVal.pData, tagVal.nData);
pStart += tagVal.nData;
ASSERT(tagVal.nData <= pCol->bytes);
} else {
memcpy(pStart, &(tagVal.i64), pCol->bytes);
pStart += pCol->bytes;
}
}
}
int32_t len = (int32_t) (pStart - (char*)keyBuf);
uint64_t* groupId = taosHashGet(pTableListInfo->map, keyBuf, len);
if (groupId) {
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), groupId, sizeof(uint64_t));
} else {
uint64_t tmpId = calcGroupId(keyBuf, len);
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &tmpId, sizeof(uint64_t));
}
metaReaderClear(&mr);
}
taosMemoryFree(keyBuf);
return TDB_CODE_SUCCESS;
}
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo, SNode* pTagCond) {
int32_t type = nodeType(pPhyNode);
......@@ -4605,15 +4683,23 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
if (pDataReader == NULL && terrno != 0) {
return NULL;
}
SArray* groupKyes = extractPartitionColInfo(pTableScanNode->pPartitionKeys);
int32_t code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
if (code) {
tsdbCleanupReadHandle(pDataReader);
return NULL;
}
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionKeys);
code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); //todo for json
taosArrayDestroy(groupKeys);
if (code){
tsdbCleanupReadHandle(pDataReader);
return NULL;
}
SOperatorInfo* pOperator =
createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, groupKyes, pTaskInfo);
createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
......@@ -4639,12 +4725,18 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else {
qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo));
}
SArray* tableIdList = extractTableIdList(pTableListInfo);
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionKeys);
int32_t code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); //todo for json
taosArrayDestroy(groupKeys);
if (code){
tsdbCleanupReadHandle(pDataReader);
return NULL;
}
SOperatorInfo* pOperator =
createStreamScanOperatorInfo(pDataReader, pHandle, tableIdList, pTableScanNode, pTaskInfo, &twSup);
createStreamScanOperatorInfo(pDataReader, pHandle, pTableScanNode, pTaskInfo, &twSup);
taosArrayDestroy(tableIdList);
return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
......@@ -4969,6 +5061,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
}
SArray* extractPartitionColInfo(SNodeList* pNodeList) {
if(!pNodeList) return NULL;
size_t numOfCols = LIST_LENGTH(pNodeList);
SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
if (pList == NULL) {
......@@ -5073,7 +5166,9 @@ int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STa
SArray* res = taosArrayInit(8, sizeof(uint64_t));
code = doFilterTag(pTagCond, &metaArg, res);
if (code != TSDB_CODE_SUCCESS) {
if (code == TSDB_CODE_INDEX_REBUILDING){ // todo
// doFilter();
} else if (code != TSDB_CODE_SUCCESS) {
qError("failed to get tableIds, reason: %s, suid: %" PRIu64 "", tstrerror(code), tableUid);
taosArrayDestroy(res);
terrno = code;
......@@ -5081,6 +5176,7 @@ int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STa
} else {
qDebug("sucess to get tableIds, size: %d, suid: %" PRIu64 "", (int)taosArrayGetSize(res), tableUid);
}
for (int i = 0; i < taosArrayGetSize(res); i++) {
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, .uid = *(uint64_t*)taosArrayGet(res, i)};
taosArrayPush(pListInfo->pTableList, &info);
......@@ -5097,18 +5193,6 @@ int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STa
return code;
}
SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));
// Transfer the Array of STableKeyInfo into uid list.
for (int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pTableList); ++i) {
STableKeyInfo* pkeyInfo = taosArrayGet(pTableGroupInfo->pTableList, i);
taosArrayPush(tableIdList, &pkeyInfo->uid);
}
return tableIdList;
}
tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond) {
int32_t code =
......
......@@ -37,7 +37,7 @@ static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pGroupColVals);
}
int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
*pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys));
if ((*pGroupColVals) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -118,7 +118,7 @@ static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlo
return true;
}
void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex) {
static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex) {
SColumnDataAgg* pColAgg = NULL;
size_t numOfGroupCols = taosArrayGetSize(pGroupCols);
......@@ -150,7 +150,7 @@ void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock*
}
}
int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
ASSERT(pKey != NULL);
size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);
......
......@@ -391,22 +391,16 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
longjmp(pOperator->pTaskInfo->env, code);
}
recordNewGroupKeys(pTableScanInfo->pGroupCols, pTableScanInfo->pGroupColVals, pBlock, 0);
int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals);
uint64_t* groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len);
if (groupId) {
pBlock->info.groupId = *groupId;
} else if (len != 0) {
pBlock->info.groupId = calcGroupId(pTableScanInfo->keyBuf, len);
taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &pBlock->info.groupId, sizeof(uint64_t));
}
// current block is filter out according to filter condition, continue load the next block
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
continue;
}
uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
if (groupId) {
pBlock->info.groupId = *groupId;
}
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
......@@ -530,21 +524,13 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) {
tsdbCleanupReadHandle(pTableScanInfo->dataReader);
taosArrayDestroy(pTableScanInfo->pGroupCols);
for (int i = 0; i < taosArrayGetSize(pTableScanInfo->pGroupColVals); i++) {
SGroupKeys key = *(SGroupKeys*)taosArrayGet(pTableScanInfo->pGroupColVals, i);
taosMemoryFree(key.pData);
}
taosArrayDestroy(pTableScanInfo->pGroupColVals);
taosMemoryFree(pTableScanInfo->keyBuf);
taosHashCleanup(pTableScanInfo->pGroupSet);
if (pTableScanInfo->pColMatchInfo != NULL) {
taosArrayDestroy(pTableScanInfo->pColMatchInfo);
}
}
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader,
SReadHandle* readHandle, SArray* groupKyes, SExecTaskInfo* pTaskInfo) {
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -591,18 +577,6 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pOperator->numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
// for table group
pInfo->pGroupCols = groupKyes;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
if (pInfo->pGroupSet == NULL) {
goto _error;
}
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, groupKyes);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo,
NULL, NULL, getTableScannerExecInfo);
......@@ -912,6 +886,11 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
pInfo->pRes->info.groupId = groupId;
}
uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t));
if (groupIdPre) {
pInfo->pRes->info.groupId = *groupIdPre;
}
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) {
SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i);
if (!pColMatchInfo->output) {
......@@ -979,11 +958,24 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
}
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, SArray* pTableIdList,
static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));
// Transfer the Array of STableKeyInfo into uid list.
for (int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pTableList); ++i) {
STableKeyInfo* pkeyInfo = taosArrayGet(pTableGroupInfo->pTableList, i);
taosArrayPush(tableIdList, &pkeyInfo->uid);
}
return tableIdList;
}
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle,
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo,
STimeWindowAggSupp* pTwSup) {
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _error;
......@@ -992,7 +984,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan;
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, NULL, pTaskInfo);
SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info;
......@@ -1014,10 +1006,13 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
// set the extract column id to streamHandle
tqReadHandleSetColIdList((STqReadHandle*)pHandle->reader, pColIds);
int32_t code = tqReadHandleSetTbUidList(pHandle->reader, pTableIdList);
SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList);
int32_t code = tqReadHandleSetTbUidList(pHandle->reader, tableIdList);
if (code != 0) {
taosArrayDestroy(tableIdList);
goto _error;
}
taosArrayDestroy(tableIdList);
pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES);
if (pInfo->pBlockLists == NULL) {
......
......@@ -566,6 +566,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_RM_SKEY_IN_HASH, "Rm tsma skey in cac
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state")
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
#ifdef TAOS_ERROR_C
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册