未验证 提交 afe99b1f 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #10980 from taosdata/feature/TD-14222-new

add encode/decode resultRow function
...@@ -243,6 +243,8 @@ typedef struct STaskAttr { ...@@ -243,6 +243,8 @@ typedef struct STaskAttr {
struct SOperatorInfo; struct SOperatorInfo;
typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char **result, int32_t *length);
typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char *result, int32_t length);
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* param); typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* param);
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* param, bool* newgroup); typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* param, bool* newgroup);
typedef void (*__optr_close_fn_t)(void* param, int32_t num); typedef void (*__optr_close_fn_t)(void* param, int32_t num);
...@@ -332,6 +334,8 @@ typedef struct SOperatorInfo { ...@@ -332,6 +334,8 @@ typedef struct SOperatorInfo {
__optr_fn_t cleanupFn; __optr_fn_t cleanupFn;
__optr_close_fn_t closeFn; __optr_close_fn_t closeFn;
__optr_open_fn_t _openFn; // DO NOT invoke this function directly __optr_open_fn_t _openFn; // DO NOT invoke this function directly
__optr_encode_fn_t encodeResultRow; //
__optr_decode_fn_t decodeResultRow;
} SOperatorInfo; } SOperatorInfo;
typedef struct { typedef struct {
......
...@@ -6391,6 +6391,111 @@ static SSDataBlock* getAggregateResult(SOperatorInfo *pOperator, bool* newgroup) ...@@ -6391,6 +6391,111 @@ static SSDataBlock* getAggregateResult(SOperatorInfo *pOperator, bool* newgroup)
return (blockDataGetNumOfRows(pInfo->pRes) != 0)? pInfo->pRes:NULL; return (blockDataGetNumOfRows(pInfo->pRes) != 0)? pInfo->pRes:NULL;
} }
static void aggEncodeResultRow(SOperatorInfo* pOperator, char **result, int32_t *length) {
SAggOperatorInfo *pAggInfo = pOperator->info;
SAggSupporter *pSup = &pAggInfo->aggSup;
int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
size_t keyLen = POINTER_BYTES; // estimate the key length
int32_t totalSize = sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize);
*result = calloc(1, totalSize);
if(*result == NULL){
terrno = TSDB_CODE_OUT_OF_MEMORY;
return;
}
*(int32_t*)(*result) = size;
int32_t offset = sizeof(int32_t);
void *pIter = taosHashIterate(pSup->pResultRowHashTable, NULL);
while (pIter) {
void *key = taosHashGetKey(pIter, &keyLen);
SResultRow **p1 = (SResultRow **)pIter;
// recalculate the result size
int32_t realTotalSize = offset + sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize;
if (realTotalSize > totalSize){
char *tmp = realloc(*result, realTotalSize);
if (tmp == NULL){
terrno = TSDB_CODE_OUT_OF_MEMORY;
free(*result);
*result = NULL;
return;
}else{
*result = tmp;
}
}
// save key
*(int32_t*)(*result + offset) = keyLen;
offset += sizeof(int32_t);
memcpy(*result + offset, key, keyLen);
offset += keyLen;
// save value
*(int32_t*)(*result + offset) = pSup->resultRowSize;
offset += sizeof(int32_t);
memcpy(*result + offset, *p1, pSup->resultRowSize);
offset += pSup->resultRowSize;
pIter = taosHashIterate(pSup->pResultRowHashTable, pIter);
}
if(length) {
*length = offset;
}
return;
}
static bool aggDecodeResultRow(SOperatorInfo* pOperator, char *result, int32_t length) {
if (!result || length <= 0){
return false;
}
SAggOperatorInfo *pAggInfo = pOperator->info;
SAggSupporter *pSup = &pAggInfo->aggSup;
SOptrBasicInfo *pInfo = &pAggInfo->binfo;
// int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
int32_t count = *(int32_t*)(result);
int32_t offset = sizeof(int32_t);
while(count-- > 0 && length > offset){
int32_t keyLen = *(int32_t*)(result + offset);
offset += sizeof(int32_t);
uint64_t tableGroupId = *(uint64_t *)(result + offset);
SResultRow *resultRow = getNewResultRow_rv(pSup->pResultBuf, tableGroupId, pSup->resultRowSize);
if (!resultRow){
terrno = TSDB_CODE_TSC_INVALID_INPUT;
return false;
}
// add a new result set for a new group
taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &resultRow, POINTER_BYTES);
offset += keyLen;
int32_t valueLen = *(int32_t*)(result + offset);
if (valueLen != pSup->resultRowSize){
terrno = TSDB_CODE_TSC_INVALID_INPUT;
return false;
}
offset += sizeof(int32_t);
int32_t pageId = resultRow->pageId;
int32_t pOffset = resultRow->offset;
memcpy(resultRow, result + offset, valueLen);
resultRow->pageId = pageId;
resultRow->offset = pOffset;
offset += valueLen;
initResultRow(resultRow);
pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] = (SResultRowPosition) {.pageId = resultRow->pageId, .offset = resultRow->offset};
}
if (offset != length){
terrno = TSDB_CODE_TSC_INVALID_INPUT;
return false;
}
return true;
}
static SSDataBlock* doMultiTableAggregate(SOperatorInfo *pOperator, bool* newgroup) { static SSDataBlock* doMultiTableAggregate(SOperatorInfo *pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
...@@ -7312,6 +7417,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -7312,6 +7417,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->_openFn = doOpenAggregateOptr; pOperator->_openFn = doOpenAggregateOptr;
pOperator->getNextFn = getAggregateResult; pOperator->getNextFn = getAggregateResult;
pOperator->closeFn = destroyAggOperatorInfo; pOperator->closeFn = destroyAggOperatorInfo;
pOperator->encodeResultRow = aggEncodeResultRow;
pOperator->decodeResultRow = aggDecodeResultRow;
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册