未验证 提交 bc9157a6 编写于 作者: wmmhello's avatar wmmhello 提交者: GitHub

Merge pull request #11387 from taosdata/feature/TD-14222-new

refactor: add encode/decode logic for operator result info
......@@ -232,9 +232,11 @@ typedef struct STaskAttr {
} STaskAttr;
struct SOperatorInfo;
struct SAggSupporter;
struct SOptrBasicInfo;
typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char **result, int32_t *length);
typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char *result, int32_t length);
typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter *pSup, struct SOptrBasicInfo *pInfo, char **result, int32_t *length);
typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter *pSup, struct SOptrBasicInfo *pInfo, char *result, int32_t length);
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr);
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr, bool* newgroup);
......@@ -753,6 +755,9 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, EOPTR_EXEC_MODEL model);
int32_t getOperatorExplainExecInfo(SOperatorInfo *operatorInfo, SExplainExecInfo **pRes, int32_t *capacity, int32_t *resNum);
bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char* result, int32_t length);
void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char **result, int32_t *length);
#ifdef __cplusplus
}
#endif
......
......@@ -300,10 +300,6 @@ SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) {
}
taosArrayPush(pBlock->pDataBlock, &idata);
if (IS_VAR_DATA_TYPE(idata.info.type)) {
pBlock->info.hasVarCol = true;
}
}
return pBlock;
......@@ -363,7 +359,7 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, jmp_buf env)
pResultRowInfo->pPosition = taosMemoryRealloc(pResultRowInfo->pPosition, newCapacity * sizeof(SResultRowPosition));
int32_t inc = (int32_t)newCapacity - pResultRowInfo->capacity;
memset(&pResultRowInfo->pPosition[pResultRowInfo->capacity], 0, sizeof(SResultRowPosition));
memset(&pResultRowInfo->pPosition[pResultRowInfo->capacity], 0, sizeof(SResultRowPosition) * inc);
pResultRowInfo->capacity = (int32_t)newCapacity;
}
......@@ -419,7 +415,7 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId,
pData = getBufPage(pResultBuf, getPageId(pi));
pageId = getPageId(pi);
if (pData->num + interBufSize + sizeof(SResultRow) > getBufPageSize(pResultBuf)) {
if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
// release current page first, and prepare the next one
releaseBufPageInfo(pResultBuf, pi);
......@@ -439,7 +435,7 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId,
pResultRow->pageId = pageId;
pResultRow->offset = (int32_t)pData->num;
pData->num += interBufSize + sizeof(SResultRow);
pData->num += interBufSize;
return pResultRow;
}
......@@ -507,7 +503,7 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR
// add a new result set for a new group
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, POINTER_BYTES);
taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, sizeof(SResultRowPosition));
SResultRowCell cell = {.groupId = tableGroupId, .pos = pos};
taosArrayPush(pSup->pResultRowArrayList, &cell);
} else {
......@@ -4979,6 +4975,21 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
doAggregateImpl(pOperator, 0, pInfo->pCtx);
#if 0 // test for encode/decode result info
if(pOperator->encodeResultRow){
char *result = NULL;
int32_t length = 0;
SAggSupporter *pSup = &pAggInfo->aggSup;
pOperator->encodeResultRow(pOperator, pSup, pInfo, &result, &length);
taosHashClear(pSup->pResultRowHashTable);
pInfo->resultRowInfo.size = 0;
pOperator->decodeResultRow(pOperator, pSup, pInfo, result, length);
if(result){
taosMemoryFree(result);
}
}
#endif
}
finalizeQueryResult(pInfo->pCtx, pOperator->numOfOutput);
......@@ -5007,24 +5018,33 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator, bool* newgroup)
return (blockDataGetNumOfRows(pInfo->pRes) != 0) ? pInfo->pRes : NULL;
}
static void aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length) {
SAggOperatorInfo* pAggInfo = pOperator->info;
SAggSupporter* pSup = &pAggInfo->aggSup;
void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char **result, int32_t *length) {
int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
size_t keyLen = POINTER_BYTES; // estimate the key length
size_t keyLen = sizeof(uint64_t) * 2; // estimate the key length
int32_t totalSize = sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize);
*result = taosMemoryCalloc(1, totalSize);
if (*result == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return;
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
}
*(int32_t*)(*result) = size;
int32_t offset = sizeof(int32_t);
// prepare memory
SResultRowPosition* pos = &pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.curPos];
void* pPage = getBufPage(pSup->pResultBuf, pos->pageId);
SResultRow* pRow = (SResultRow*)((char*)pPage + pos->offset);
setBufPageDirty(pPage, true);
releaseBufPage(pSup->pResultBuf, pPage);
void* pIter = taosHashIterate(pSup->pResultRowHashTable, NULL);
while (pIter) {
void* key = taosHashGetKey(pIter, &keyLen);
SResultRow** p1 = (SResultRow**)pIter;
SResultRowPosition* p1 = (SResultRowPosition*)pIter;
pPage = (SFilePage*) getBufPage(pSup->pResultBuf, p1->pageId);
pRow = (SResultRow*)((char*)pPage + p1->offset);
setBufPageDirty(pPage, true);
releaseBufPage(pSup->pResultBuf, pPage);
// recalculate the result size
int32_t realTotalSize = offset + sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize;
......@@ -5034,7 +5054,7 @@ static void aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t*
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(*result);
*result = NULL;
return;
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
} else {
*result = tmp;
}
......@@ -5048,7 +5068,7 @@ static void aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t*
// save value
*(int32_t*)(*result + offset) = pSup->resultRowSize;
offset += sizeof(int32_t);
memcpy(*result + offset, *p1, pSup->resultRowSize);
memcpy(*result + offset, pRow, pSup->resultRowSize);
offset += pSup->resultRowSize;
pIter = taosHashIterate(pSup->pResultRowHashTable, pIter);
......@@ -5060,15 +5080,11 @@ static void aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t*
return;
}
static bool aggDecodeResultRow(SOperatorInfo* pOperator, char* result, int32_t length) {
bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char* result, int32_t length) {
if (!result || length <= 0) {
return false;
}
SAggOperatorInfo* pAggInfo = pOperator->info;
SAggSupporter* pSup = &pAggInfo->aggSup;
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
// int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
int32_t count = *(int32_t*)(result);
......@@ -5080,17 +5096,16 @@ static bool aggDecodeResultRow(SOperatorInfo* pOperator, char* result, int32_t l
uint64_t tableGroupId = *(uint64_t*)(result + offset);
SResultRow* resultRow = getNewResultRow_rv(pSup->pResultBuf, tableGroupId, pSup->resultRowSize);
if (!resultRow) {
terrno = TSDB_CODE_TSC_INVALID_INPUT;
return false;
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT);
}
// add a new result set for a new group
taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &resultRow, POINTER_BYTES);
SResultRowPosition pos = {.pageId = resultRow->pageId, .offset = resultRow->offset};
taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition));
offset += keyLen;
int32_t valueLen = *(int32_t*)(result + offset);
if (valueLen != pSup->resultRowSize) {
terrno = TSDB_CODE_TSC_INVALID_INPUT;
return false;
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT);
}
offset += sizeof(int32_t);
int32_t pageId = resultRow->pageId;
......@@ -5101,13 +5116,13 @@ static bool aggDecodeResultRow(SOperatorInfo* pOperator, char* result, int32_t l
offset += valueLen;
initResultRow(resultRow);
pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] =
(SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset};
prepareResultListBuffer(&pInfo->resultRowInfo, pOperator->pTaskInfo->env);
pInfo->resultRowInfo.curPos = pInfo->resultRowInfo.size;
pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] = (SResultRowPosition) {.pageId = resultRow->pageId, .offset = resultRow->offset};
}
if (offset != length) {
terrno = TSDB_CODE_TSC_INVALID_INPUT;
return false;
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT);
}
return true;
}
......@@ -5333,6 +5348,21 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
#if 0 // test for encode/decode result info
if(pOperator->encodeResultRow){
char *result = NULL;
int32_t length = 0;
SAggSupporter *pSup = &pInfo->aggSup;
pOperator->encodeResultRow(pOperator, pSup, &pInfo->binfo, &result, &length);
taosHashClear(pSup->pResultRowHashTable);
pInfo->binfo.resultRowInfo.size = 0;
pOperator->decodeResultRow(pOperator, pSup, &pInfo->binfo, result, length);
if(result){
taosMemoryFree(result);
}
}
#endif
}
closeAllResultRows(&pInfo->binfo.resultRowInfo);
......@@ -5752,7 +5782,6 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup)
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
}
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup,
SExecTaskInfo* pTaskInfo) {
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
......@@ -6218,6 +6247,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->getNextFn = doBuildIntervalResult;
pOperator->getStreamResFn= doStreamIntervalAgg;
pOperator->closeFn = destroyIntervalOperatorInfo;
pOperator->encodeResultRow = aggEncodeResultRow;
pOperator->decodeResultRow = aggDecodeResultRow;
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
......@@ -6287,6 +6318,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
pOperator->info = pInfo;
pOperator->getNextFn = doStateWindowAgg;
pOperator->closeFn = destroyStateWindowOperatorInfo;
pOperator->encodeResultRow = aggEncodeResultRow;
pOperator->decodeResultRow = aggDecodeResultRow;
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
......@@ -6327,6 +6360,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
pOperator->info = pInfo;
pOperator->getNextFn = doSessionWindowAgg;
pOperator->closeFn = destroySWindowOperatorInfo;
pOperator->encodeResultRow = aggEncodeResultRow;
pOperator->decodeResultRow = aggDecodeResultRow;
pOperator->pTaskInfo = pTaskInfo;
code = appendDownstream(pOperator, &downstream, 1);
......@@ -6379,6 +6414,7 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun
pOperator->name = "AllMultiTableTimeIntervalOperator";
// pOperator->operatorType = OP_AllMultiTableTimeInterval;
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
......
......@@ -166,7 +166,7 @@ static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
// assign the group keys or user input constant values if required
static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t totalRows, int32_t rowIndex) {
for (int32_t i = 0; i < numOfOutput; ++i) {
if (pCtx[i].functionId == -1) {
if (pCtx[i].functionId == -1) { // select count(*),key from t group by key.
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[i]);
SColumnInfoData* pColInfoData = pCtx[i].input.pData[0];
......@@ -344,7 +344,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
pOperator->name = "GroupbyAggOperator";
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = OP_Groupby;
// pOperator->operatorType = OP_Groupby;
pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo;
......@@ -352,6 +352,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = hashGroupbyAggregate;
pOperator->closeFn = destroyGroupOperatorInfo;
pOperator->encodeResultRow = aggEncodeResultRow;
pOperator->decodeResultRow = aggDecodeResultRow;
code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册