提交 6c332af1 编写于 作者: H Haojun Liao

[td-13039] support sort.

上级 4b041a8b
...@@ -167,10 +167,10 @@ TEST(testCase, Datablock_test) { ...@@ -167,10 +167,10 @@ TEST(testCase, Datablock_test) {
printf("the second row of binary:%s, length:%d\n", (char*)varDataVal(pData), varDataLen(pData)); printf("the second row of binary:%s, length:%d\n", (char*)varDataVal(pData), varDataLen(pData));
SArray* pOrderInfo = taosArrayInit(3, sizeof(SBlockOrderInfo)); SArray* pOrderInfo = taosArrayInit(3, sizeof(SBlockOrderInfo));
SBlockOrderInfo order = {.order = TSDB_ORDER_ASC, .colIndex = 0}; SBlockOrderInfo order = {.nullFirst = true, .order = TSDB_ORDER_ASC, .slotId = 0};
taosArrayPush(pOrderInfo, &order); taosArrayPush(pOrderInfo, &order);
blockDataSort(b, pOrderInfo, true); blockDataSort(b, pOrderInfo);
blockDataDestroy(b); blockDataDestroy(b);
taosArrayDestroy(pOrderInfo); taosArrayDestroy(pOrderInfo);
......
...@@ -577,6 +577,7 @@ typedef struct SSessionAggOperatorInfo { ...@@ -577,6 +577,7 @@ typedef struct SSessionAggOperatorInfo {
int32_t numOfRows; // number of rows int32_t numOfRows; // number of rows
int32_t start; // start row index int32_t start; // start row index
bool reptScan; // next round scan bool reptScan; // next round scan
int64_t gap; // session window gap
} SSessionAggOperatorInfo; } SSessionAggOperatorInfo;
typedef struct SStateWindowOperatorInfo { typedef struct SStateWindowOperatorInfo {
...@@ -660,7 +661,7 @@ SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit ...@@ -660,7 +661,7 @@ SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval,
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock, SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock,
......
...@@ -336,6 +336,9 @@ SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) { ...@@ -336,6 +336,9 @@ SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) {
SColumnInfoData idata = {{0}}; SColumnInfoData idata = {{0}};
SSlotDescNode* pDescNode = nodesListGetNode(pNode->pSlots, i); SSlotDescNode* pDescNode = nodesListGetNode(pNode->pSlots, i);
idata.info.type = pDescNode->dataType.type; idata.info.type = pDescNode->dataType.type;
if (pDescNode->dataType.bytes > 1000) {
pDescNode->dataType.bytes = 4;
}
idata.info.bytes = pDescNode->dataType.bytes; idata.info.bytes = pDescNode->dataType.bytes;
idata.info.scale = pDescNode->dataType.scale; idata.info.scale = pDescNode->dataType.scale;
idata.info.slotId = pDescNode->slotId; idata.info.slotId = pDescNode->slotId;
...@@ -1794,18 +1797,17 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock *pBlock) { ...@@ -1794,18 +1797,17 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock *pBlock) {
} }
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableQueryInfo* item = pRuntimeEnv->current;
// primary timestamp column // primary timestamp column
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0); SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0);
bool masterScan = IS_MAIN_SCAN(pRuntimeEnv); // bool masterScan = IS_MAIN_SCAN(pRuntimeEnv);
SOptrBasicInfo* pBInfo = &pInfo->binfo; SOptrBasicInfo* pBInfo = &pInfo->binfo;
int64_t gap = pOperator->pRuntimeEnv->pQueryAttr->sw.gap; int64_t gap = pInfo->gap;
pInfo->numOfRows = 0; pInfo->numOfRows = 0;
if (IS_REPEAT_SCAN(pRuntimeEnv) && !pInfo->reptScan) { if (/*IS_REPEAT_SCAN(pRuntimeEnv) && */!pInfo->reptScan) {
pInfo->reptScan = true; pInfo->reptScan = true;
pInfo->prevTs = INT64_MIN; pInfo->prevTs = INT64_MIN;
} }
...@@ -1829,16 +1831,15 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator ...@@ -1829,16 +1831,15 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
} else { // start a new session window } else { // start a new session window
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
int64_t gid = pSDataBlock->info.groupId;
pInfo->curWindow.ekey = pInfo->curWindow.skey; pInfo->curWindow.ekey = pInfo->curWindow.skey;
int32_t ret = setResultOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.uid, &pInfo->curWindow, masterScan, // int32_t ret = setResultOutputBufByKey_rv(pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, gid, pInfo->binfo.pCtx,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput, // numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
pBInfo->rowCellInfoOffset); // if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code // longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); // }
}
// doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList, // doApplyFunctions(pInfo->binfo.pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
// pSDataBlock->info.rows, pOperator->numOfOutput);
pInfo->curWindow.skey = tsList[j]; pInfo->curWindow.skey = tsList[j];
pInfo->curWindow.ekey = tsList[j]; pInfo->curWindow.ekey = tsList[j];
...@@ -1851,15 +1852,14 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator ...@@ -1851,15 +1852,14 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
pInfo->curWindow.ekey = pInfo->curWindow.skey; pInfo->curWindow.ekey = pInfo->curWindow.skey;
int32_t ret = setResultOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.uid, &pInfo->curWindow, masterScan, // int32_t ret = setResultOutputBufByKey_rv(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.uid, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput, // &pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset); // pBInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code // if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); // longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
} // }
// doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList, // doApplyFunctions(pInfo->binfo.pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
// pSDataBlock->info.rows, pOperator->numOfOutput);
} }
static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
...@@ -7719,25 +7719,27 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper ...@@ -7719,25 +7719,27 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper
return pOperator; return pOperator;
} }
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo) {
SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo)); SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
goto _error; goto _error;
} }
int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfCols, pTaskInfo->id.str); int32_t numOfRows = 4096;
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
pInfo->gap = gap;
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->prevTs = INT64_MIN; pInfo->prevTs = INT64_MIN;
pInfo->reptScan = false; pInfo->reptScan = false;
pOperator->name = "SessionWindowAggOperator"; pOperator->name = "SessionWindowAggOperator";
// pOperator->operatorType = OP_SessionWindow; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo; pOperator->pExpr = pExprInfo;
...@@ -8577,18 +8579,30 @@ static SArray* createSortInfo(SNodeList* pNodeList); ...@@ -8577,18 +8579,30 @@ static SArray* createSortInfo(SNodeList* pNodeList);
size_t size = LIST_LENGTH(pPhyNode->pChildren); size_t size = LIST_LENGTH(pPhyNode->pChildren);
assert(size == 1); assert(size == 1);
for (int32_t i = 0; i < size; ++i) { SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
int32_t num = 0; int32_t num = 0;
SExprInfo *pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &num);
SSDataBlock *pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SArray *info = createSortInfo(pSortPhyNode->pSortKeys); SArray* info = createSortInfo(pSortPhyNode->pSortKeys);
return createSortOperatorInfo(op, pExprInfo, num, pResBlock, info, pTaskInfo);
} return createSortOperatorInfo(op, pExprInfo, num, pResBlock, info, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == nodeType(pPhyNode)) {
size_t size = LIST_LENGTH(pPhyNode->pChildren);
assert(size == 1);
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
return createSessionAggOperatorInfo(op, pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo);
} else { } else {
ASSERT(0); ASSERT(0);
}/*else if (pPhyNode->info.type == OP_MultiTableAggregate) { }/*else if (pPhyNode->info.type == OP_MultiTableAggregate) {
......
...@@ -98,14 +98,14 @@ int32_t docomp(const void* p1, const void* p2, void* param) { ...@@ -98,14 +98,14 @@ int32_t docomp(const void* p1, const void* p2, void* param) {
for(int32_t i = 0; i < pInfo->size; ++i) { for(int32_t i = 0; i < pInfo->size; ++i) {
SBlockOrderInfo* pOrder = (SBlockOrderInfo*)TARRAY_GET_ELEM(pInfo, i); SBlockOrderInfo* pOrder = (SBlockOrderInfo*)TARRAY_GET_ELEM(pInfo, i);
SColumnInfoData* pLeftColInfoData = (SColumnInfoData*)TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->colIndex); SColumnInfoData* pLeftColInfoData = (SColumnInfoData*)TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
bool leftNull = false; bool leftNull = false;
if (pLeftColInfoData->hasNull) { if (pLeftColInfoData->hasNull) {
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg); leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg);
} }
SColumnInfoData* pRightColInfoData = (SColumnInfoData*) TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->colIndex); SColumnInfoData* pRightColInfoData = (SColumnInfoData*) TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
bool rightNull = false; bool rightNull = false;
if (pRightColInfoData->hasNull) { if (pRightColInfoData->hasNull) {
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg); rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册