提交 cf9e0be1 编写于 作者: H Haojun Liao

[td-14493] support slimit/soffset

上级 e77ecba7
......@@ -71,7 +71,7 @@ typedef struct SDataBlockInfo {
int64_t uid;
int64_t blockId;
};
int64_t groupId; // no need to serialize
uint64_t groupId; // no need to serialize
} SDataBlockInfo;
typedef struct SSDataBlock {
......
......@@ -511,6 +511,12 @@ typedef struct SProjectOperatorInfo {
SSDataBlock *existDataBlock;
SArray *pPseudoColInfo;
SLimit limit;
SLimit slimit;
uint64_t groupId;
int64_t curSOffset;
int64_t curGroupOutput;
int64_t curOffset;
int64_t curOutput;
} SProjectOperatorInfo;
......@@ -673,7 +679,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SExecTaskInfo* pTaskInfo);
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
......
......@@ -1263,6 +1263,7 @@ static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, S
static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
int32_t numOfOutput, SArray* pPseudoList) {
setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
pResult->info.groupId = pSrcBlock->info.groupId;
for (int32_t k = 0; k < numOfOutput; ++k) {
if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query
......@@ -5426,7 +5427,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup)
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
assert(*newgroup == false);
*newgroup = prevVal;
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
break;
......@@ -5454,6 +5454,38 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup)
projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput, pProjectInfo->pPseudoColInfo);
if (pProjectInfo->curSOffset > 0) {
if (pProjectInfo->groupId == 0) { // it is the first group
pProjectInfo->groupId = pBlock->info.groupId;
blockDataCleanup(pInfo->pRes);
continue;
} else if (pProjectInfo->groupId != pBlock->info.groupId) {
pProjectInfo->curSOffset -= 1;
// ignore data block in current group
if (pProjectInfo->curSOffset > 0) {
blockDataCleanup(pInfo->pRes);
continue;
}
}
pProjectInfo->groupId = pBlock->info.groupId;
}
if (pProjectInfo->groupId != 0 && pProjectInfo->groupId != pBlock->info.groupId) {
pProjectInfo->curGroupOutput += 1;
if ((pProjectInfo->slimit.limit > 0) && (pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput)) {
pOperator->status = OP_EXEC_DONE;
return NULL;
}
// reset the value for a new group data
pProjectInfo->curOffset = 0;
pProjectInfo->curOutput = 0;
}
pProjectInfo->groupId = pBlock->info.groupId;
// todo extract method
if (pProjectInfo->curOffset < pInfo->pRes->info.rows && pProjectInfo->curOffset > 0) {
blockDataTrimFirstNRows(pInfo->pRes, pProjectInfo->curOffset);
......@@ -6321,7 +6353,7 @@ static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols)
}
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num,
SSDataBlock* pResBlock, SLimit* pLimit, SExecTaskInfo* pTaskInfo) {
SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SExecTaskInfo* pTaskInfo) {
SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -6329,7 +6361,10 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
}
pInfo->limit = *pLimit;
pInfo->slimit = *pSlimit;
pInfo->curOffset = pLimit->offset;
pInfo->curSOffset = pSlimit->offset;
pInfo->binfo.pRes = pResBlock;
int32_t numOfCols = num;
......@@ -7117,8 +7152,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SLimit limit = {.limit = pProjPhyNode->limit, .offset = pProjPhyNode->offset};
return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, &limit, pTaskInfo);
SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset};
return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, &limit, &slimit, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_AGG == type) {
int32_t num = 0;
......
......@@ -26,6 +26,8 @@
#include "ttypes.h"
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
static uint64_t calcGroupId(char* pData, int32_t len);
static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
......@@ -353,7 +355,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
}
static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
// SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SPartitionOperatorInfo* pInfo = pOperator->info;
......@@ -362,39 +364,14 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
SDataGroupInfo* p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);
SDataGroupInfo* pGInfo = NULL;
void *pPage = getCurrentDataGroupInfo(pInfo, &pGInfo, len);
void* pPage = NULL;
if (p == NULL) { // it is a new group
SDataGroupInfo gi = {0};
gi.pPageList = taosArrayInit(100, sizeof(int32_t));
taosHashPut(pInfo->pGroupSet, pInfo->keyBuf, len, &gi, sizeof(SDataGroupInfo));
p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);
int32_t pageId = 0;
pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
taosArrayPush(p->pPageList, &pageId);
*(int32_t *) pPage = 0;
} else {
int32_t* curId = taosArrayGetLast(p->pPageList);
pPage = getBufPage(pInfo->pBuf, *curId);
int32_t *rows = (int32_t*) pPage;
if (*rows >= pInfo->rowCapacity) {
// add a new page for current group
int32_t pageId = 0;
pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
taosArrayPush(p->pPageList, &pageId);
*(int32_t*) pPage = 0;
}
pGInfo->numOfRows += 1;
if (pGInfo->groupId == 0) {
pGInfo->groupId = calcGroupId(pInfo->keyBuf, len);
}
// add one for this group
p->numOfRows += 1;
int32_t* rows = (int32_t*) pPage;
size_t numOfCols = pOperator->numOfOutput;
......@@ -446,8 +423,53 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
setBufPageDirty(pPage, true);
releaseBufPage(pInfo->pBuf, pPage);
}
}
void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len) {
SDataGroupInfo* p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);
void* pPage = NULL;
if (p == NULL) { // it is a new group
SDataGroupInfo gi = {0};
gi.pPageList = taosArrayInit(100, sizeof(int32_t));
taosHashPut(pInfo->pGroupSet, pInfo->keyBuf, len, &gi, sizeof(SDataGroupInfo));
p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);
int32_t pageId = 0;
pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
taosArrayPush(p->pPageList, &pageId);
*(int32_t *) pPage = 0;
} else {
int32_t* curId = taosArrayGetLast(p->pPageList);
pPage = getBufPage(pInfo->pBuf, *curId);
int32_t *rows = (int32_t*) pPage;
if (*rows >= pInfo->rowCapacity) {
// add a new page for current group
int32_t pageId = 0;
pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
taosArrayPush(p->pPageList, &pageId);
*(int32_t*) pPage = 0;
}
}
// todo set the consistent group id according to the group keys
*pGroupInfo = p;
return pPage;
}
uint64_t calcGroupId(char* pData, int32_t len) {
T_MD5_CTX context;
tMD5Init(&context);
tMD5Update(&context, (uint8_t*)pData, len);
tMD5Final(&context);
// NOTE: only extract the initial 8 bytes of the final MD5 digest
uint64_t id = 0;
memcpy(&id, context.digest, sizeof(uint64_t));
return id;
}
int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
......@@ -496,6 +518,8 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity);
pInfo->pageIndex += 1;
pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId;
return pInfo->binfo.pRes;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册