未验证 提交 aaec70eb 编写于 作者: wmmhello's avatar wmmhello 提交者: GitHub

Merge pull request #13217 from taosdata/feature/TD-14222-new

feat:add test for operator encode/decode
...@@ -160,7 +160,7 @@ struct SOperatorInfo; ...@@ -160,7 +160,7 @@ struct SOperatorInfo;
//struct SOptrBasicInfo; //struct SOptrBasicInfo;
typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length); typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length);
typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* result, int32_t length); typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* result);
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr); typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr);
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr); typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr);
...@@ -821,7 +821,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead ...@@ -821,7 +821,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity, int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity,
int32_t* resNum); int32_t* resNum);
int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result, int32_t length); int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result);
int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length); int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length);
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts,
......
...@@ -3448,14 +3448,14 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { ...@@ -3448,14 +3448,14 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
} }
#if 0 // test for encode/decode result info #if 0 // test for encode/decode result info
if(pOperator->encodeResultRow){ if(pOperator->fpSet.encodeResultRow){
char *result = NULL; char *result = NULL;
int32_t length = 0; int32_t length = 0;
SAggSupporter *pSup = &pAggInfo->aggSup; pOperator->fpSet.encodeResultRow(pOperator, &result, &length);
pOperator->encodeResultRow(pOperator, pSup, pInfo, &result, &length); SAggSupporter* pSup = &pAggInfo->aggSup;
taosHashClear(pSup->pResultRowHashTable); taosHashClear(pSup->pResultRowHashTable);
pInfo->resultRowInfo.size = 0; pInfo->resultRowInfo.size = 0;
pOperator->decodeResultRow(pOperator, pSup, pInfo, result, length); pOperator->fpSet.decodeResultRow(pOperator, result);
if(result){ if(result){
taosMemoryFree(result); taosMemoryFree(result);
} }
...@@ -3567,17 +3567,20 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len ...@@ -3567,17 +3567,20 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
} }
int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result, int32_t length) { int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
if(result == NULL || length <= 0){ if(result == NULL){
return TSDB_CODE_TSC_INVALID_INPUT; return TSDB_CODE_TSC_INVALID_INPUT;
} }
SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info); SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
SAggSupporter* pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo)); SAggSupporter* pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
// int32_t size = taosHashGetSize(pSup->pResultRowHashTable); // int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
int32_t count = *(int32_t*)(result); int32_t length = *(int32_t*)(result);
int32_t offset = sizeof(int32_t); int32_t offset = sizeof(int32_t);
int32_t count = *(int32_t*)(result + offset);
offset += sizeof(int32_t);
while (count-- > 0 && length > offset) { while (count-- > 0 && length > offset) {
int32_t keyLen = *(int32_t*)(result + offset); int32_t keyLen = *(int32_t*)(result + offset);
offset += sizeof(int32_t); offset += sizeof(int32_t);
...@@ -5048,17 +5051,19 @@ int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t *length){ ...@@ -5048,17 +5051,19 @@ int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t *length){
int32_t decodeOperator(SOperatorInfo* ops, char* result, int32_t length){ int32_t decodeOperator(SOperatorInfo* ops, char* result, int32_t length){
int32_t code = TDB_CODE_SUCCESS; int32_t code = TDB_CODE_SUCCESS;
if(ops->fpSet.decodeResultRow){ if(ops->fpSet.decodeResultRow){
if(result == NULL || length <= 0){ if(result == NULL){
return TSDB_CODE_TSC_INVALID_INPUT; return TSDB_CODE_TSC_INVALID_INPUT;
} }
char* data = result + 2 * sizeof(int32_t); ASSERT(length == *(int32_t*)result);
int32_t dataLength = *(int32_t*)(result + sizeof(int32_t)); char* data = result + sizeof(int32_t);
code = ops->fpSet.decodeResultRow(ops, data, dataLength - sizeof(int32_t)); code = ops->fpSet.decodeResultRow(ops, data);
if(code != TDB_CODE_SUCCESS){ if(code != TDB_CODE_SUCCESS){
return code; return code;
} }
int32_t totalLength = *(int32_t*)result; int32_t totalLength = *(int32_t*)result;
int32_t dataLength = *(int32_t*)data;
if(totalLength == dataLength + sizeof(int32_t)) { // the last data if(totalLength == dataLength + sizeof(int32_t)) { // the last data
result = NULL; result = NULL;
length = 0; length = 0;
......
...@@ -318,7 +318,20 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { ...@@ -318,7 +318,20 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
// updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfExprs, &pInfo->binfo.resultRowInfo, // updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfExprs, &pInfo->binfo.resultRowInfo,
// pInfo->binfo.rowCellInfoOffset); // pInfo->binfo.rowCellInfoOffset);
// } // }
#if 0
if(pOperator->fpSet.encodeResultRow){
char *result = NULL;
int32_t length = 0;
pOperator->fpSet.encodeResultRow(pOperator, &result, &length);
SAggSupporter* pSup = &pInfo->aggSup;
taosHashClear(pSup->pResultRowHashTable);
pInfo->binfo.resultRowInfo.size = 0;
pOperator->fpSet.decodeResultRow(pOperator, result);
if(result){
taosMemoryFree(result);
}
}
#endif
blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0);
......
...@@ -880,14 +880,14 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { ...@@ -880,14 +880,14 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, pBlock->info.groupId, NULL); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, pBlock->info.groupId, NULL);
#if 0 // test for encode/decode result info #if 0 // test for encode/decode result info
if(pOperator->encodeResultRow){ if(pOperator->fpSet.encodeResultRow){
char *result = NULL; char *result = NULL;
int32_t length = 0; int32_t length = 0;
SAggSupporter *pSup = &pInfo->aggSup; SAggSupporter *pSup = &pInfo->aggSup;
pOperator->encodeResultRow(pOperator, pSup, &pInfo->binfo, &result, &length); pOperator->fpSet.encodeResultRow(pOperator, &result, &length);
taosHashClear(pSup->pResultRowHashTable); taosHashClear(pSup->pResultRowHashTable);
pInfo->binfo.resultRowInfo.size = 0; pInfo->binfo.resultRowInfo.size = 0;
pOperator->decodeResultRow(pOperator, pSup, &pInfo->binfo, result, length); pOperator->fpSet.decodeResultRow(pOperator, result);
if(result){ if(result){
taosMemoryFree(result); taosMemoryFree(result);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册