From 6c38d716a1ef60837e9745205d6cb995aee14a10 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 30 May 2022 12:00:51 +0800 Subject: [PATCH] feat:add test for operator encode/decode --- source/libs/executor/inc/executorimpl.h | 4 +-- source/libs/executor/src/executorimpl.c | 29 +++++++++++-------- source/libs/executor/src/groupoperator.c | 15 +++++++++- source/libs/executor/src/timewindowoperator.c | 6 ++-- 4 files changed, 36 insertions(+), 18 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 88f4bdbd3d..cb45fa9730 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -160,7 +160,7 @@ struct SOperatorInfo; //struct SOptrBasicInfo; typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length); -typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* result, int32_t length); +typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* result); typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr); typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr); @@ -821,7 +821,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity, int32_t* resNum); -int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result, int32_t length); +int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result); int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length); STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index ce46573830..29047d294f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3448,14 +3448,14 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } #if 0 // test for encode/decode result info - if(pOperator->encodeResultRow){ + if(pOperator->fpSet.encodeResultRow){ char *result = NULL; int32_t length = 0; - SAggSupporter *pSup = &pAggInfo->aggSup; - pOperator->encodeResultRow(pOperator, pSup, pInfo, &result, &length); + pOperator->fpSet.encodeResultRow(pOperator, &result, &length); + SAggSupporter* pSup = &pAggInfo->aggSup; taosHashClear(pSup->pResultRowHashTable); pInfo->resultRowInfo.size = 0; - pOperator->decodeResultRow(pOperator, pSup, pInfo, result, length); + pOperator->fpSet.decodeResultRow(pOperator, result); if(result){ taosMemoryFree(result); } @@ -3567,17 +3567,20 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len return TDB_CODE_SUCCESS; } -int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result, int32_t length) { - if(result == NULL || length <= 0){ +int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) { + if(result == NULL){ return TSDB_CODE_TSC_INVALID_INPUT; } SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info); SAggSupporter* pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo)); // int32_t size = taosHashGetSize(pSup->pResultRowHashTable); - int32_t count = *(int32_t*)(result); - + int32_t length = *(int32_t*)(result); int32_t offset = sizeof(int32_t); + + int32_t count = *(int32_t*)(result + offset); + offset += sizeof(int32_t); + while (count-- > 0 && length > offset) { int32_t keyLen = *(int32_t*)(result + offset); offset += sizeof(int32_t); @@ -5048,17 +5051,19 @@ int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t *length){ int32_t decodeOperator(SOperatorInfo* ops, char* result, int32_t length){ int32_t code = TDB_CODE_SUCCESS; if(ops->fpSet.decodeResultRow){ - if(result == NULL || length <= 0){ + if(result == NULL){ return TSDB_CODE_TSC_INVALID_INPUT; } - char* data = result + 2 * sizeof(int32_t); - int32_t dataLength = *(int32_t*)(result + sizeof(int32_t)); - code = ops->fpSet.decodeResultRow(ops, data, dataLength - sizeof(int32_t)); + ASSERT(length == *(int32_t*)result); + char* data = result + sizeof(int32_t); + code = ops->fpSet.decodeResultRow(ops, data); if(code != TDB_CODE_SUCCESS){ return code; } int32_t totalLength = *(int32_t*)result; + int32_t dataLength = *(int32_t*)data; + if(totalLength == dataLength + sizeof(int32_t)) { // the last data result = NULL; length = 0; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index ef770e8afc..d388b802f3 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -318,7 +318,20 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { // updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfExprs, &pInfo->binfo.resultRowInfo, // pInfo->binfo.rowCellInfoOffset); // } - +#if 0 + if(pOperator->fpSet.encodeResultRow){ + char *result = NULL; + int32_t length = 0; + pOperator->fpSet.encodeResultRow(pOperator, &result, &length); + SAggSupporter* pSup = &pInfo->aggSup; + taosHashClear(pSup->pResultRowHashTable); + pInfo->binfo.resultRowInfo.size = 0; + pOperator->fpSet.decodeResultRow(pOperator, result); + if(result){ + taosMemoryFree(result); + } + } +#endif blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 829968d37f..9b8c2f75ba 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -880,14 +880,14 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, pBlock->info.groupId, NULL); #if 0 // test for encode/decode result info - if(pOperator->encodeResultRow){ + if(pOperator->fpSet.encodeResultRow){ char *result = NULL; int32_t length = 0; SAggSupporter *pSup = &pInfo->aggSup; - pOperator->encodeResultRow(pOperator, pSup, &pInfo->binfo, &result, &length); + pOperator->fpSet.encodeResultRow(pOperator, &result, &length); taosHashClear(pSup->pResultRowHashTable); pInfo->binfo.resultRowInfo.size = 0; - pOperator->decodeResultRow(pOperator, pSup, &pInfo->binfo, result, length); + pOperator->fpSet.decodeResultRow(pOperator, result); if(result){ taosMemoryFree(result); } -- GitLab