提交 5dadb260 编写于 作者: H Haojun Liao

fix(query): set the correct state value key index.

上级 86031c19
...@@ -564,7 +564,7 @@ typedef struct SStateWindowOperatorInfo { ...@@ -564,7 +564,7 @@ typedef struct SStateWindowOperatorInfo {
SAggSupporter aggSup; SAggSupporter aggSup;
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
SWindowRowsSup winSup; SWindowRowsSup winSup;
int32_t colIndex; // start row index SColumn stateCol; // start row index
bool hasKey; bool hasKey;
SStateKeys stateKey; SStateKeys stateKey;
int32_t tsSlotId; // primary timestamp column slot id int32_t tsSlotId; // primary timestamp column slot id
...@@ -636,7 +636,7 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t ...@@ -636,7 +636,7 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey); SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey);
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows); void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
void doBuildResultDatablock(SExecTaskInfo *taskInfo, SOptrBasicInfo *pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf); void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf);
void finalizeMultiTupleQueryResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); void finalizeMultiTupleQueryResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
...@@ -659,6 +659,7 @@ void cleanupAggSup(SAggSupporter* pAggSup); ...@@ -659,6 +659,7 @@ void cleanupAggSup(SAggSupporter* pAggSup);
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode); SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode);
SColumn extractColumnFromColumnNode(SColumnNode* pColNode);
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo); SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo);
SSDataBlock* loadNextDataBlock(void* param); SSDataBlock* loadNextDataBlock(void* param);
...@@ -712,7 +713,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp ...@@ -712,7 +713,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal, SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal,
bool multigroupResult, SExecTaskInfo* pTaskInfo); bool multigroupResult, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SExecTaskInfo* pTaskInfo); SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SColumn* pStateKeyCol, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo, SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo,
......
...@@ -155,9 +155,8 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, ...@@ -155,9 +155,8 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
void operatorDummyCloseFn(void* param, int32_t numOfCols) {} void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
static int32_t doCopyToSDataBlock(SExecTaskInfo *taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
SGroupResInfo* pGroupResInfo, int32_t orderType, int32_t* rowCellOffset, int32_t orderType, int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs);
SqlFunctionCtx* pCtx);
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo); static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo);
...@@ -2214,7 +2213,7 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p ...@@ -2214,7 +2213,7 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p
* @param result * @param result
*/ */
int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
int32_t orderType, int32_t* rowCellOffset, SqlFunctionCtx* pCtx) { int32_t orderType, int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs) {
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
int32_t numOfResult = pBlock->info.rows; // there are already exists result rows int32_t numOfResult = pBlock->info.rows; // there are already exists result rows
...@@ -2248,13 +2247,12 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn ...@@ -2248,13 +2247,12 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
pGroupResInfo->index += 1; pGroupResInfo->index += 1;
for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) { for (int32_t j = 0; j < numOfExprs; ++j) {
int32_t slotId = pExprInfo[j].base.resSchema.slotId; int32_t slotId = pExprInfo[j].base.resSchema.slotId;
pCtx[j].resultInfo = getResultCell(pRow, j, rowCellOffset); pCtx[j].resultInfo = getResultCell(pRow, j, rowCellOffset);
if (pCtx[j].fpSet.finalize) { if (pCtx[j].fpSet.finalize) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
if (TAOS_FAILED(code)) { if (TAOS_FAILED(code)) {
qError("%s build result data block error, code %s", GET_TASKID(taskInfo), tstrerror(code)); qError("%s build result data block error, code %s", GET_TASKID(taskInfo), tstrerror(code));
taskInfo->code = code; taskInfo->code = code;
...@@ -2286,10 +2284,13 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn ...@@ -2286,10 +2284,13 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
return 0; return 0;
} }
void doBuildResultDatablock(SExecTaskInfo *taskInfo, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf) {
SDiskbasedBuf* pBuf) {
assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup); assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
SExprInfo* pExprInfo = pOperator->pExpr;
int32_t numOfExprs = pOperator->numOfExprs;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t* rowCellOffset = pbInfo->rowCellInfoOffset; int32_t* rowCellOffset = pbInfo->rowCellInfoOffset;
SSDataBlock* pBlock = pbInfo->pRes; SSDataBlock* pBlock = pbInfo->pRes;
SqlFunctionCtx* pCtx = pbInfo->pCtx; SqlFunctionCtx* pCtx = pbInfo->pCtx;
...@@ -2300,7 +2301,7 @@ void doBuildResultDatablock(SExecTaskInfo *taskInfo, SOptrBasicInfo* pbInfo, SGr ...@@ -2300,7 +2301,7 @@ void doBuildResultDatablock(SExecTaskInfo *taskInfo, SOptrBasicInfo* pbInfo, SGr
} }
int32_t orderType = TSDB_ORDER_ASC; int32_t orderType = TSDB_ORDER_ASC;
doCopyToSDataBlock(taskInfo, pBlock, pExprInfo, pBuf, pGroupResInfo, orderType, rowCellOffset, pCtx); doCopyToSDataBlock(pTaskInfo, pBlock, pExprInfo, pBuf, pGroupResInfo, orderType, rowCellOffset, pCtx, numOfExprs);
// add condition (pBlock->info.rows >= 1) just to runtime happy // add condition (pBlock->info.rows >= 1) just to runtime happy
blockDataUpdateTsWindow(pBlock); blockDataUpdateTsWindow(pBlock);
...@@ -3803,7 +3804,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { ...@@ -3803,7 +3804,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
} }
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pTaskInfo, pInfo, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) { if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
...@@ -4974,7 +4975,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4974,7 +4975,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId; int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, pTaskInfo); SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
SColumn col = extractColumnFromColumnNode(pColNode);
pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) {
SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*)pPhyNode; SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
...@@ -5039,6 +5042,17 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi ...@@ -5039,6 +5042,17 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SColumn extractColumnFromColumnNode(SColumnNode* pColNode) {
SColumn c = {0};
c.slotId = pColNode->slotId;
c.colId = pColNode->colId;
c.type = pColNode->node.resType.type;
c.bytes = pColNode->node.resType.bytes;
c.scale = pColNode->node.resType.scale;
c.precision = pColNode->node.resType.precision;
return c;
}
SArray* extractColumnInfo(SNodeList* pNodeList) { SArray* extractColumnInfo(SNodeList* pNodeList) {
size_t numOfCols = LIST_LENGTH(pNodeList); size_t numOfCols = LIST_LENGTH(pNodeList);
SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn)); SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
...@@ -5053,15 +5067,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { ...@@ -5053,15 +5067,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) { if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
// todo extract method SColumn c = extractColumnFromColumnNode(pColNode);
SColumn c = {0};
c.slotId = pColNode->slotId;
c.colId = pColNode->colId;
c.type = pColNode->node.resType.type;
c.bytes = pColNode->node.resType.bytes;
c.scale = pColNode->node.resType.scale;
c.precision = pColNode->node.resType.precision;
taosArrayPush(pList, &c); taosArrayPush(pList, &c);
} else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) { } else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) {
SValueNode* pValNode = (SValueNode*)pNode->pExpr; SValueNode* pValNode = (SValueNode*)pNode->pExpr;
......
...@@ -268,7 +268,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { ...@@ -268,7 +268,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
SSDataBlock* pRes = pInfo->binfo.pRes; SSDataBlock* pRes = pInfo->binfo.pRes;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
...@@ -317,7 +317,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { ...@@ -317,7 +317,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, false); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, false);
while(1) { while(1) {
doBuildResultDatablock(pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes); doFilter(pInfo->pCondition, pRes);
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo); bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);
......
...@@ -806,10 +806,22 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { ...@@ -806,10 +806,22 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static bool compareVal(const char* v, const SStateKeys* pKey) {
if (IS_VAR_DATA_TYPE(pKey->type)) {
if (varDataLen(v) != varDataLen(pKey->pData)) {
return false;
} else {
return strncmp(varDataVal(v), varDataVal(pKey->pData), varDataLen(v)) == 0;
}
} else {
return memcmp(pKey->pData, v, pKey->bytes) == 0;
}
}
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) { static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex); SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId);
int64_t gid = pBlock->info.groupId; int64_t gid = pBlock->info.groupId;
bool masterScan = true; bool masterScan = true;
...@@ -822,20 +834,28 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI ...@@ -822,20 +834,28 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
SWindowRowsSup* pRowSup = &pInfo->winSup; SWindowRowsSup* pRowSup = &pInfo->winSup;
pRowSup->numOfRows = 0; pRowSup->numOfRows = 0;
struct SColumnDataAgg* pAgg = NULL;
for (int32_t j = 0; j < pBlock->info.rows; ++j) { for (int32_t j = 0; j < pBlock->info.rows; ++j) {
if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pBlock->pBlockAgg[pInfo->colIndex])) { pAgg = (pBlock->pBlockAgg != NULL)? pBlock->pBlockAgg[pInfo->stateCol.slotId]: NULL;
if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) {
continue; continue;
} }
char* val = colDataGetData(pStateColInfoData, j); char* val = colDataGetData(pStateColInfoData, j);
if (!pInfo->hasKey) { if (!pInfo->hasKey) {
// todo extract method
if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
varDataCopy(pInfo->stateKey.pData, val);
} else {
memcpy(pInfo->stateKey.pData, val, bytes); memcpy(pInfo->stateKey.pData, val, bytes);
}
pInfo->hasKey = true; pInfo->hasKey = true;
doKeepNewWindowStartInfo(pRowSup, tsList, j); doKeepNewWindowStartInfo(pRowSup, tsList, j);
doKeepTuple(pRowSup, tsList[j]); doKeepTuple(pRowSup, tsList[j]);
} else if (memcmp(pInfo->stateKey.pData, val, bytes) == 0) { } else if (compareVal(val, &pInfo->stateKey)) {
doKeepTuple(pRowSup, tsList[j]); doKeepTuple(pRowSup, tsList[j]);
if (j == 0 && pRowSup->startRowIndex != 0) { if (j == 0 && pRowSup->startRowIndex != 0) {
pRowSup->startRowIndex = 0; pRowSup->startRowIndex = 0;
...@@ -861,6 +881,13 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI ...@@ -861,6 +881,13 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
// here we start a new session window // here we start a new session window
doKeepNewWindowStartInfo(pRowSup, tsList, j); doKeepNewWindowStartInfo(pRowSup, tsList, j);
doKeepTuple(pRowSup, tsList[j]); doKeepTuple(pRowSup, tsList[j]);
// todo extract method
if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
varDataCopy(pInfo->stateKey.pData, val);
} else {
memcpy(pInfo->stateKey.pData, val, bytes);
}
} }
} }
...@@ -888,7 +915,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { ...@@ -888,7 +915,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
SOptrBasicInfo* pBInfo = &pInfo->binfo; SOptrBasicInfo* pBInfo = &pInfo->binfo;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
return NULL; return NULL;
...@@ -921,7 +948,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { ...@@ -921,7 +948,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
...@@ -948,7 +975,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { ...@@ -948,7 +975,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
} }
blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity);
doBuildResultDatablock(pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pBlock->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { if (pBlock->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
...@@ -1012,7 +1039,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1012,7 +1039,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
} }
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pOperator->pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
...@@ -1053,7 +1080,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1053,7 +1080,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pOperator->pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
// TODO: remove for stream // TODO: remove for stream
/*ASSERT(pInfo->binfo.pRes->info.rows > 0);*/ /*ASSERT(pInfo->binfo.pRes->info.rows > 0);*/
...@@ -1283,7 +1310,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { ...@@ -1283,7 +1310,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
SOptrBasicInfo* pBInfo = &pInfo->binfo; SOptrBasicInfo* pBInfo = &pInfo->binfo;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pOperator->pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
return NULL; return NULL;
...@@ -1316,7 +1343,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { ...@@ -1316,7 +1343,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pOperator->pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
...@@ -1406,14 +1433,21 @@ _error: ...@@ -1406,14 +1433,21 @@ _error:
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SSDataBlock* pResBlock, STimeWindowAggSupp* pTwAggSup, int32_t tsSlotId, SSDataBlock* pResBlock, STimeWindowAggSupp* pTwAggSup, int32_t tsSlotId,
SExecTaskInfo* pTaskInfo) { SColumn* pStateKeyCol, SExecTaskInfo* pTaskInfo) {
SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo)); SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
goto _error; goto _error;
} }
pInfo->colIndex = -1; pInfo->stateCol = *pStateKeyCol;
pInfo->stateKey.type = pInfo->stateCol.type;
pInfo->stateKey.bytes = pInfo->stateCol.bytes;
pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
if (pInfo->stateKey.pData == NULL) {
goto _error;
}
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, 4096); initResultSizeInfo(pOperator, 4096);
......
...@@ -1538,7 +1538,9 @@ static EDealRes checkStateExpr(SNode* pNode, void* pContext) { ...@@ -1538,7 +1538,9 @@ static EDealRes checkStateExpr(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) { if (QUERY_NODE_COLUMN == nodeType(pNode)) {
STranslateContext* pCxt = pContext; STranslateContext* pCxt = pContext;
SColumnNode* pCol = (SColumnNode*)pNode; SColumnNode* pCol = (SColumnNode*)pNode;
if (!IS_INTEGER_TYPE(pCol->node.resType.type)) {
int32_t type = pCol->node.resType.type;
if (!IS_INTEGER_TYPE(type) && type != TSDB_DATA_TYPE_BOOL && !IS_VAR_DATA_TYPE(type)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_STATE_WIN_TYPE); return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_STATE_WIN_TYPE);
} }
if (COLUMN_TYPE_TAG == pCol->colType) { if (COLUMN_TYPE_TAG == pCol->colType) {
......
...@@ -91,7 +91,7 @@ static char* getSyntaxErrFormat(int32_t errCode) { ...@@ -91,7 +91,7 @@ static char* getSyntaxErrFormat(int32_t errCode) {
case TSDB_CODE_PAR_AGG_FUNC_NESTING: case TSDB_CODE_PAR_AGG_FUNC_NESTING:
return "Aggregate functions do not support nesting"; return "Aggregate functions do not support nesting";
case TSDB_CODE_PAR_INVALID_STATE_WIN_TYPE: case TSDB_CODE_PAR_INVALID_STATE_WIN_TYPE:
return "Only support STATE_WINDOW on integer column"; return "Only support STATE_WINDOW on integer/bool/varchar column";
case TSDB_CODE_PAR_INVALID_STATE_WIN_COL: case TSDB_CODE_PAR_INVALID_STATE_WIN_COL:
return "Not support STATE_WINDOW on tag column"; return "Not support STATE_WINDOW on tag column";
case TSDB_CODE_PAR_INVALID_STATE_WIN_TABLE: case TSDB_CODE_PAR_INVALID_STATE_WIN_TABLE:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册