提交 7b0dfc6c 编写于 作者: H Haojun Liao

[td-13039] support limit/offset

上级 38462504
......@@ -179,12 +179,14 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock);
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows);
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows);
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
void blockDataCleanup(SSDataBlock* pDataBlock);
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
void* blockDataDestroy(SSDataBlock* pBlock);
int32_t blockDataTrimFirstNRows(SSDataBlock *pBlock, size_t n);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock);
void blockDebugShowData(const SArray* dataBlocks);
......
......@@ -1139,7 +1139,7 @@ void blockDataCleanup(SSDataBlock* pDataBlock) {
}
}
int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows) {
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows) {
if (0 == numOfRows) {
return TSDB_CODE_SUCCESS;
}
......@@ -1181,7 +1181,7 @@ int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
code = blockDataEnsureColumnCapacity(p, numOfRows);
code = colInfoDataEnsureCapacity(p, numOfRows);
if (code) {
return code;
}
......@@ -1233,6 +1233,63 @@ void colDataDestroy(SColumnInfoData* pColData) {
taosMemoryFree(pColData->pData);
}
static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
int32_t len = BitmapLen(total);
int32_t newLen = BitmapLen(total - n);
if (n%8 == 0) {
memmove(nullBitmap, nullBitmap + n/8, newLen);
} else {
int32_t tail = n % 8;
int32_t i = 0;
uint8_t* p = (uint8_t*) nullBitmap;
while(i < len) {
uint8_t v = p[i];
p[i] = 0;
p[i] = (v << tail);
if (i < len - 1) {
uint8_t next = p[i + 1];
p[i] |= (next >> (8 - tail));
}
i += 1;
}
}
}
static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[n], (total - n));
memset(&pColInfoData->varmeta.offset[total - n - 1], 0, n);
} else {
int32_t bytes = pColInfoData->info.bytes;
memmove(pColInfoData->pData, ((char*)pColInfoData->pData + n * bytes), (total - n) * bytes);
doShiftBitmap(pColInfoData->nullbitmap, n, total);
}
}
int32_t blockDataTrimFirstNRows(SSDataBlock *pBlock, size_t n) {
if (n == 0) {
return TSDB_CODE_SUCCESS;
}
if (pBlock->info.rows <= n) {
blockDataCleanup(pBlock);
} else {
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
colDataTrimFirstNRows(pColInfoData, n, pBlock->info.rows);
}
pBlock->info.rows -= n;
}
return TSDB_CODE_SUCCESS;
}
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
int64_t tbUid = pBlock->info.uid;
int16_t numOfCols = pBlock->info.numOfCols;
......@@ -1372,6 +1429,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
return buf;
}
void blockDebugShowData(const SArray* dataBlocks) {
char pBuf[128];
int32_t sz = taosArrayGetSize(dataBlocks);
......
......@@ -142,7 +142,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
colInfo.info.colId = pColSchema->colId;
colInfo.info.type = pColSchema->type;
if (blockDataEnsureColumnCapacity(&colInfo, numOfRows) < 0) {
if (colInfoDataEnsureCapacity(&colInfo, numOfRows) < 0) {
taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock);
return NULL;
}
......
......@@ -403,7 +403,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
SColumnInfoData colInfo = {{0}, 0};
colInfo.info = pCond->colList[i];
int32_t code = blockDataEnsureColumnCapacity(&colInfo, pReadHandle->outputCapacity);
int32_t code = colInfoDataEnsureCapacity(&colInfo, pReadHandle->outputCapacity);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
......
......@@ -496,6 +496,9 @@ typedef struct SProjectOperatorInfo {
SOptrBasicInfo binfo;
SSDataBlock *existDataBlock;
int32_t threshold;
SLimit limit;
int64_t curOffset;
int64_t curOutput;
} SProjectOperatorInfo;
typedef struct SLimitOperatorInfo {
......@@ -640,7 +643,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SExecTaskInfo* pTaskInfo);
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
......
......@@ -1023,7 +1023,7 @@ static void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQuer
pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
pColData->info.bytes = sizeof(int64_t);
blockDataEnsureColumnCapacity(pColData, 5);
colInfoDataEnsureCapacity(pColData, 5);
colDataAppendInt64(pColData, 0, &pQueryWindow->skey);
colDataAppendInt64(pColData, 1, &pQueryWindow->ekey);
......@@ -6802,7 +6802,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup)
SSDataBlock* pRes = pInfo->pRes;
blockDataCleanup(pRes);
#if 0
if (pProjectInfo->existDataBlock) { // TODO refactor
SSDataBlock* pBlock = pProjectInfo->existDataBlock;
pProjectInfo->existDataBlock = NULL;
......@@ -6824,6 +6824,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup)
return pRes;
}
}
#endif
SOperatorInfo* downstream = pOperator->pDownstream[0];
......@@ -6863,10 +6864,27 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup)
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput);
if (pProjectInfo->curOffset < pInfo->pRes->info.rows && pProjectInfo->curOffset > 0) {
blockDataTrimFirstNRows(pInfo->pRes, pProjectInfo->curOffset);
pProjectInfo->curOffset = 0;
break;
} else if (pProjectInfo->curOffset >= pInfo->pRes->info.rows) {
pProjectInfo->curOffset -= pInfo->pRes->info.rows;
blockDataCleanup(pInfo->pRes);
continue;
}
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
break;
}
}
if (pProjectInfo->curOutput + pInfo->pRes->info.rows >= pProjectInfo->limit.limit) {
pInfo->pRes->info.rows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput);
}
pProjectInfo->curOutput += pInfo->pRes->info.rows;
// copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL;
......@@ -7846,19 +7864,20 @@ _error:
}
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num,
SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) {
SSDataBlock* pResBlock, SLimit* pLimit, SExecTaskInfo* pTaskInfo) {
SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pInfo->limit = *pLimit;
pInfo->curOffset = pLimit->offset;
pInfo->binfo.pRes = pResBlock;
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, num, &pInfo->binfo.rowCellInfoOffset);
if (pInfo->binfo.pCtx == NULL) {
goto _error;
}
// initResultRowInfo(&pBInfo->resultRowInfo, 8);
// setFunctionResultOutput(pBInfo, MAIN_SCAN);
......@@ -8861,9 +8880,13 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(((SProjectPhysiNode*)pPhyNode)->pProjections, NULL, &num);
SProjectPhysiNode* pProjPhyNode = (SProjectPhysiNode*) pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo);
SLimit limit = {.limit = pProjPhyNode->limit, .offset = pProjPhyNode->offset};
return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, &limit, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_AGG == nodeType(pPhyNode)) {
size_t size = LIST_LENGTH(pPhyNode->pChildren);
assert(size == 1);
......
......@@ -29,7 +29,7 @@ SColumnInfoData* createColumnInfoData(SDataType* pType, int32_t numOfRows) {
pColumnData->info.scale = pType->scale;
pColumnData->info.precision = pType->precision;
int32_t code = blockDataEnsureColumnCapacity(pColumnData, numOfRows);
int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows);
if (code != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pColumnData);
......@@ -44,7 +44,7 @@ int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out) {
in.columnData = createColumnInfoData(&pValueNode->node.resType, 1);
colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false);
blockDataEnsureColumnCapacity(out->columnData, 1);
colInfoDataEnsureCapacity(out->columnData, 1);
int32_t code = vectorConvertImpl(&in, out);
sclFreeParam(&in);
......
......@@ -155,7 +155,7 @@ void flttMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType, in
res->info.numOfCols++;
SColumnInfoData *pColumn = (SColumnInfoData *)taosArrayGetLast(res->pDataBlock);
blockDataEnsureColumnCapacity(pColumn, rowNum);
colInfoDataEnsureCapacity(pColumn, rowNum);
for (int32_t i = 0; i < rowNum; ++i) {
colDataAppend(pColumn, i, (const char *)value, false);
......
......@@ -99,7 +99,7 @@ void scltAppendReservedSlot(SArray *pBlockList, int16_t *dataBlockId, int16_t *s
SColumnInfoData idata = {0};
idata.info = *colInfo;
blockDataEnsureColumnCapacity(&idata, rows);
colInfoDataEnsureCapacity(&idata, rows);
taosArrayPush(res->pDataBlock, &idata);
......@@ -186,7 +186,7 @@ void scltMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType, in
res->info.numOfCols++;
SColumnInfoData *pColumn = (SColumnInfoData *)taosArrayGetLast(res->pDataBlock);
blockDataEnsureColumnCapacity(pColumn, rowNum);
colInfoDataEnsureCapacity(pColumn, rowNum);
for (int32_t i = 0; i < rowNum; ++i) {
colDataAppend(pColumn, i, (const char *)value, false);
......@@ -1467,7 +1467,7 @@ void scltMakeDataBlock(SScalarParam **pInput, int32_t type, void *pVal, int32_t
input->numOfRows = num;
input->columnData->info = createColumnInfo(0, type, bytes);
blockDataEnsureColumnCapacity(input->columnData, num);
colInfoDataEnsureCapacity(input->columnData, num);
if (setVal) {
for (int32_t i = 0; i < num; ++i) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册