diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 88f4bdbd3db6aef21f8df64744128a9e89743466..cb45fa97306ac7cc40dccb6f10dafc89ad0c117f 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -160,7 +160,7 @@ struct SOperatorInfo; //struct SOptrBasicInfo; typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length); -typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* result, int32_t length); +typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* result); typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr); typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr); @@ -821,7 +821,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity, int32_t* resNum); -int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result, int32_t length); +int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result); int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length); STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index ce46573830e8b1a25a1893191710166e78a6a89d..29047d294f146f577843b8306f25ba875368f20f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3448,14 +3448,14 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } #if 0 // test for encode/decode result info - if(pOperator->encodeResultRow){ + if(pOperator->fpSet.encodeResultRow){ char *result = NULL; int32_t length = 0; - SAggSupporter *pSup = &pAggInfo->aggSup; - pOperator->encodeResultRow(pOperator, pSup, pInfo, &result, &length); + pOperator->fpSet.encodeResultRow(pOperator, &result, &length); + SAggSupporter* pSup = &pAggInfo->aggSup; taosHashClear(pSup->pResultRowHashTable); pInfo->resultRowInfo.size = 0; - pOperator->decodeResultRow(pOperator, pSup, pInfo, result, length); + pOperator->fpSet.decodeResultRow(pOperator, result); if(result){ taosMemoryFree(result); } @@ -3567,17 +3567,20 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len return TDB_CODE_SUCCESS; } -int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result, int32_t length) { - if(result == NULL || length <= 0){ +int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) { + if(result == NULL){ return TSDB_CODE_TSC_INVALID_INPUT; } SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info); SAggSupporter* pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo)); // int32_t size = taosHashGetSize(pSup->pResultRowHashTable); - int32_t count = *(int32_t*)(result); - + int32_t length = *(int32_t*)(result); int32_t offset = sizeof(int32_t); + + int32_t count = *(int32_t*)(result + offset); + offset += sizeof(int32_t); + while (count-- > 0 && length > offset) { int32_t keyLen = *(int32_t*)(result + offset); offset += sizeof(int32_t); @@ -5048,17 +5051,19 @@ int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t *length){ int32_t decodeOperator(SOperatorInfo* ops, char* result, int32_t length){ int32_t code = TDB_CODE_SUCCESS; if(ops->fpSet.decodeResultRow){ - if(result == NULL || length <= 0){ + if(result == NULL){ return TSDB_CODE_TSC_INVALID_INPUT; } - char* data = result + 2 * sizeof(int32_t); - int32_t dataLength = *(int32_t*)(result + sizeof(int32_t)); - code = ops->fpSet.decodeResultRow(ops, data, dataLength - sizeof(int32_t)); + ASSERT(length == *(int32_t*)result); + char* data = result + sizeof(int32_t); + code = ops->fpSet.decodeResultRow(ops, data); if(code != TDB_CODE_SUCCESS){ return code; } int32_t totalLength = *(int32_t*)result; + int32_t dataLength = *(int32_t*)data; + if(totalLength == dataLength + sizeof(int32_t)) { // the last data result = NULL; length = 0; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index ef770e8afc2b5f657187682cc9adf33235dedf48..d388b802f3ae22c7c0cf135c9eb70111be1d79a4 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -318,7 +318,20 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { // updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfExprs, &pInfo->binfo.resultRowInfo, // pInfo->binfo.rowCellInfoOffset); // } - +#if 0 + if(pOperator->fpSet.encodeResultRow){ + char *result = NULL; + int32_t length = 0; + pOperator->fpSet.encodeResultRow(pOperator, &result, &length); + SAggSupporter* pSup = &pInfo->aggSup; + taosHashClear(pSup->pResultRowHashTable); + pInfo->binfo.resultRowInfo.size = 0; + pOperator->fpSet.decodeResultRow(pOperator, result); + if(result){ + taosMemoryFree(result); + } + } +#endif blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 829968d37f9a8a97cf1f256b493035ad0129f71a..9b8c2f75bab32f37e6d7dd8719e50e6a77c8bca6 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -880,14 +880,14 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, pBlock->info.groupId, NULL); #if 0 // test for encode/decode result info - if(pOperator->encodeResultRow){ + if(pOperator->fpSet.encodeResultRow){ char *result = NULL; int32_t length = 0; SAggSupporter *pSup = &pInfo->aggSup; - pOperator->encodeResultRow(pOperator, pSup, &pInfo->binfo, &result, &length); + pOperator->fpSet.encodeResultRow(pOperator, &result, &length); taosHashClear(pSup->pResultRowHashTable); pInfo->binfo.resultRowInfo.size = 0; - pOperator->decodeResultRow(pOperator, pSup, &pInfo->binfo, result, length); + pOperator->fpSet.decodeResultRow(pOperator, result); if(result){ taosMemoryFree(result); }