提交 95567927 编写于 作者: wmmhello's avatar wmmhello

feat:optimize for group by tag

上级 1d20f738
......@@ -211,6 +211,7 @@ typedef struct STableScanPhysiNode {
double ratio;
int32_t dataRequired;
SNodeList* pDynamicScanFuncs;
SNodeList* pPartitionKeys;
int64_t interval;
int64_t offset;
int64_t sliding;
......
......@@ -27,6 +27,10 @@ typedef struct {
int32_t bytes;
} SGroupKeys, SStateKeys;
int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList);
uint64_t calcGroupId(char* pData, int32_t len);
void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols);
int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals);
#ifdef __cplusplus
}
#endif
......
......@@ -334,6 +334,12 @@ typedef struct STableScanInfo {
int32_t dataBlockLoadFlag;
double sampleRatio; // data block sample ratio, 1 by default
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded.
SArray* pGroupCols;
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width
SHashObj* pGroupSet; // quick locate the window object for each result
} STableScanInfo;
typedef struct STagScanInfo {
......@@ -704,7 +710,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
char* pData, int16_t bytes, bool masterscan, uint64_t groupId,
SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup);
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SArray* groupKyes, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
......
......@@ -4477,9 +4477,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
if (pDataReader == NULL && terrno != 0) {
return NULL;
}
SArray* groupKyes = extractPartitionColInfo(pTableScanNode->pPartitionKeys);
extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, groupKyes, pTaskInfo);
STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
......@@ -4510,7 +4510,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, NULL, pTaskInfo);
SArray* tableIdList = extractTableIdList(pTableListInfo);
......
......@@ -24,10 +24,10 @@
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
#include "executorInt.h"
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
static uint64_t calcGroupId(char* pData, int32_t len);
static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
......@@ -37,7 +37,7 @@ static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pGroupColVals);
}
static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
*pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys));
if ((*pGroupColVals) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -110,7 +110,7 @@ static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlo
return true;
}
static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) {
void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) {
SColumnDataAgg* pColAgg = NULL;
for (int32_t i = 0; i < numOfGroupCols; ++i) {
......@@ -137,7 +137,7 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData
}
}
static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
ASSERT(pKey != NULL);
size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);
......@@ -607,8 +607,13 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
taosArrayDestroy(pInfo->pGroupCols);
for(int i = 0; i < taosArrayGetSize(pInfo->pGroupColVals); i++){
SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->pGroupColVals, i);
taosMemoryFree(key.pData);
}
taosArrayDestroy(pInfo->pGroupColVals);
taosMemoryFree(pInfo->keyBuf);
taosHashCleanup(pInfo->pGroupSet);
taosMemoryFree(pInfo->columnOffset);
}
......
......@@ -33,6 +33,8 @@
#include "ttypes.h"
#include "vnode.h"
#include "executorInt.h"
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
......@@ -369,6 +371,19 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
longjmp(pOperator->pTaskInfo->env, code);
}
int32_t numOfGroupCols = taosArrayGetSize(pTableScanInfo->pGroupCols);
recordNewGroupKeys(pTableScanInfo->pGroupCols, pTableScanInfo->pGroupColVals, pBlock, 0, numOfGroupCols);
int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals);
uint64_t *groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len);
if (!groupId) {
pBlock->info.groupId = *groupId;
}else{
pBlock->info.groupId = calcGroupId(pTableScanInfo->keyBuf, len);
taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &pBlock->info.groupId, sizeof(uint64_t));
}
// current block is filter out according to filter condition, continue load the next block
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
continue;
......@@ -479,21 +494,25 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFree(pTableScanInfo->pResBlock);
tsdbCleanupReadHandle(pTableScanInfo->dataReader);
taosArrayDestroy(pTableScanInfo->pGroupCols);
for(int i = 0; i < taosArrayGetSize(pTableScanInfo->pGroupColVals); i++){
SGroupKeys key = *(SGroupKeys*)taosArrayGet(pTableScanInfo->pGroupColVals, i);
taosMemoryFree(key.pData);
}
taosArrayDestroy(pTableScanInfo->pGroupColVals);
taosMemoryFree(pTableScanInfo->keyBuf);
taosHashCleanup(pTableScanInfo->pGroupSet);
if (pTableScanInfo->pColMatchInfo != NULL) {
taosArrayDestroy(pTableScanInfo->pColMatchInfo);
}
}
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader,
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
SReadHandle* readHandle, SArray* groupKyes, SExecTaskInfo* pTaskInfo) {
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
goto _error;
}
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
......@@ -504,7 +523,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
goto _error;
}
if (pTableScanNode->scan.pScanPseudoCols != NULL) {
......@@ -533,6 +552,18 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pOperator->numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
// for table group
pInfo->pGroupCols = groupKyes;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
if (pInfo->pGroupSet == NULL) {
goto _error;
}
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, groupKyes);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo,
NULL, NULL, getTableScannerExecInfo);
......@@ -540,6 +571,12 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pOperator->cost.openCost = 0;
return pOperator;
_error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
return NULL;
}
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册