提交 c941a2c7 编写于 作者: wmmhello's avatar wmmhello

add test for encode/decode ResultRow in group by and interval

上级 22674396
...@@ -141,7 +141,8 @@ typedef enum ENodeType { ...@@ -141,7 +141,8 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_DISPATCH, QUERY_NODE_PHYSICAL_PLAN_DISPATCH,
QUERY_NODE_PHYSICAL_PLAN_INSERT, QUERY_NODE_PHYSICAL_PLAN_INSERT,
QUERY_NODE_PHYSICAL_SUBPLAN, QUERY_NODE_PHYSICAL_SUBPLAN,
QUERY_NODE_PHYSICAL_PLAN QUERY_NODE_PHYSICAL_PLAN,
QUERY_NODE_PHYSICAL_PLAN_GROUPBY
} ENodeType; } ENodeType;
/** /**
......
...@@ -6346,7 +6346,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo *pOperator) { ...@@ -6346,7 +6346,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo *pOperator) {
taosHashClear(pSup->pResultRowHashTable); taosHashClear(pSup->pResultRowHashTable);
pOperator->decodeResultRow(pOperator, result, length); pOperator->decodeResultRow(pOperator, result, length);
if(result){ if(result){
free(result); taosMemoryFree(result);
} }
} }
...@@ -6377,8 +6377,27 @@ static SSDataBlock* getAggregateResult(SOperatorInfo *pOperator, bool* newgroup) ...@@ -6377,8 +6377,27 @@ static SSDataBlock* getAggregateResult(SOperatorInfo *pOperator, bool* newgroup)
} }
static void aggEncodeResultRow(SOperatorInfo* pOperator, char **result, int32_t *length) { static void aggEncodeResultRow(SOperatorInfo* pOperator, char **result, int32_t *length) {
SAggOperatorInfo *pAggInfo = pOperator->info; SAggSupporter *pSup = NULL;
SAggSupporter *pSup = &pAggInfo->aggSup; switch(pOperator->operatorType){
case QUERY_NODE_PHYSICAL_PLAN_AGG:{
SAggOperatorInfo *pAggInfo = pOperator->info;
pSup = &pAggInfo->aggSup;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_GROUPBY:{
SGroupbyOperatorInfo *pAggInfo = pOperator->info;
pSup = &pAggInfo->aggSup;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:{
STableIntervalOperatorInfo *pAggInfo = pOperator->info;
pSup = &pAggInfo->aggSup;
break;
}
default:{
qDebug("invalid operatorType: %d", pOperator->operatorType);
}
}
int32_t size = taosHashGetSize(pSup->pResultRowHashTable); int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
size_t keyLen = POINTER_BYTES; // estimate the key length size_t keyLen = POINTER_BYTES; // estimate the key length
...@@ -6434,9 +6453,28 @@ static bool aggDecodeResultRow(SOperatorInfo* pOperator, char *result, int32_t l ...@@ -6434,9 +6453,28 @@ static bool aggDecodeResultRow(SOperatorInfo* pOperator, char *result, int32_t l
return false; return false;
} }
SAggOperatorInfo *pAggInfo = pOperator->info; SAggSupporter *pSup = NULL;
SAggSupporter *pSup = &pAggInfo->aggSup; switch(pOperator->operatorType){
SOptrBasicInfo *pInfo = &pAggInfo->binfo; case QUERY_NODE_PHYSICAL_PLAN_AGG:{
SAggOperatorInfo *pAggInfo = pOperator->info;
//SOptrBasicInfo *pInfo = &pAggInfo->binfo;
pSup = &pAggInfo->aggSup;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_GROUPBY:{
SGroupbyOperatorInfo *pAggInfo = pOperator->info;
pSup = &pAggInfo->aggSup;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:{
STableIntervalOperatorInfo *pAggInfo = pOperator->info;
pSup = &pAggInfo->aggSup;
break;
}
default:{
qDebug("invalid operatorType: %d", pOperator->operatorType);
}
}
// int32_t size = taosHashGetSize(pSup->pResultRowHashTable); // int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
int32_t count = *(int32_t*)(result); int32_t count = *(int32_t*)(result);
...@@ -6733,6 +6771,16 @@ static int32_t doOpenIntervalAgg(SOperatorInfo *pOperator) { ...@@ -6733,6 +6771,16 @@ static int32_t doOpenIntervalAgg(SOperatorInfo *pOperator) {
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
char *result = NULL;
int32_t length = 0;
pOperator->encodeResultRow(pOperator, &result, &length);
SAggSupporter *pSup = &pInfo->aggSup;
taosHashClear(pSup->pResultRowHashTable);
pOperator->decodeResultRow(pOperator, result, length);
if(result){
taosMemoryFree(result);
}
} }
closeAllResultRows(&pInfo->binfo.resultRowInfo); closeAllResultRows(&pInfo->binfo.resultRowInfo);
...@@ -7149,6 +7197,16 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgrou ...@@ -7149,6 +7197,16 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgrou
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order);
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput); // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput);
doHashGroupbyAgg(pOperator, pBlock); doHashGroupbyAgg(pOperator, pBlock);
char *result = NULL;
int32_t length = 0;
pOperator->encodeResultRow(pOperator, &result, &length);
SAggSupporter *pSup = &pInfo->aggSup;
taosHashClear(pSup->pResultRowHashTable);
pOperator->decodeResultRow(pOperator, result, length);
if(result){
taosMemoryFree(result);
}
} }
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
...@@ -7693,6 +7751,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -7693,6 +7751,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->_openFn = doOpenIntervalAgg; pOperator->_openFn = doOpenIntervalAgg;
pOperator->getNextFn = doBuildIntervalResult; pOperator->getNextFn = doBuildIntervalResult;
pOperator->closeFn = destroyIntervalOperatorInfo; pOperator->closeFn = destroyIntervalOperatorInfo;
pOperator->encodeResultRow = aggEncodeResultRow;
pOperator->decodeResultRow = aggDecodeResultRow;
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -7903,13 +7963,15 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx ...@@ -7903,13 +7963,15 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
pOperator->name = "GroupbyAggOperator"; pOperator->name = "GroupbyAggOperator";
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = OP_Groupby; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_GROUPBY;
pOperator->pExpr = pExprInfo; pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfCols; pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->_openFn = operatorDummyOpenFn; pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = hashGroupbyAggregate; pOperator->getNextFn = hashGroupbyAggregate;
pOperator->closeFn = destroyGroupbyOperatorInfo; pOperator->closeFn = destroyGroupbyOperatorInfo;
pOperator->encodeResultRow = aggEncodeResultRow;
pOperator->decodeResultRow = aggDecodeResultRow;
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册