提交 c2fa9ce2 编写于 作者: H Haojun Liao

[td-6260]fix the bug found by regression test.

上级 1bccf8a7
...@@ -1021,7 +1021,13 @@ static void ensureOutputBuf(SSLimitOperatorInfo * pInfo, SSDataBlock *pResultBlo ...@@ -1021,7 +1021,13 @@ static void ensureOutputBuf(SSLimitOperatorInfo * pInfo, SSDataBlock *pResultBlo
} }
} }
static void doSlimitImpl(SOperatorInfo* pOperator, SSLimitOperatorInfo* pInfo, SSDataBlock* pBlock) { enum {
BLOCK_NEW_GROUP = 1,
BLOCK_NO_GROUP = 2,
BLOCK_SAME_GROUP = 3,
};
static int32_t doSlimitImpl(SOperatorInfo* pOperator, SSLimitOperatorInfo* pInfo, SSDataBlock* pBlock) {
int32_t rowIndex = 0; int32_t rowIndex = 0;
while (rowIndex < pBlock->info.rows) { while (rowIndex < pBlock->info.rows) {
...@@ -1030,12 +1036,12 @@ static void doSlimitImpl(SOperatorInfo* pOperator, SSLimitOperatorInfo* pInfo, S ...@@ -1030,12 +1036,12 @@ static void doSlimitImpl(SOperatorInfo* pOperator, SSLimitOperatorInfo* pInfo, S
bool samegroup = true; bool samegroup = true;
if (pInfo->hasPrev) { if (pInfo->hasPrev) {
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColIndex * pIndex = taosArrayGet(pInfo->orderColumnList, i); SColIndex *pIndex = taosArrayGet(pInfo->orderColumnList, i);
SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, pIndex->colIndex); SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, pIndex->colIndex);
SColumnInfo *pColInfo = &pColInfoData->info; SColumnInfo *pColInfo = &pColInfoData->info;
char * d = rowIndex * pColInfo->bytes + (char *)pColInfoData->pData; char *d = rowIndex * pColInfo->bytes + (char *)pColInfoData->pData;
int32_t ret = columnValueAscendingComparator(pInfo->prevRow[i], d, pColInfo->type, pColInfo->bytes); int32_t ret = columnValueAscendingComparator(pInfo->prevRow[i], d, pColInfo->type, pColInfo->bytes);
if (ret != 0) { // it is a new group if (ret != 0) { // it is a new group
samegroup = false; samegroup = false;
...@@ -1063,10 +1069,17 @@ static void doSlimitImpl(SOperatorInfo* pOperator, SSLimitOperatorInfo* pInfo, S ...@@ -1063,10 +1069,17 @@ static void doSlimitImpl(SOperatorInfo* pOperator, SSLimitOperatorInfo* pInfo, S
if (pInfo->slimit.limit >= 0 && pInfo->groupTotal >= pInfo->slimit.limit) { if (pInfo->slimit.limit >= 0 && pInfo->groupTotal >= pInfo->slimit.limit) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
return; return BLOCK_NO_GROUP;
} }
pInfo->groupTotal += 1; pInfo->groupTotal += 1;
// data in current group not allowed, return if current result does not belong to the previous group.And there
// are results exists in current SSDataBlock
if (!pInfo->multigroupResult && !samegroup && pInfo->pRes->info.rows > 0) {
return BLOCK_NEW_GROUP;
}
doHandleDataInCurrentGroup(pInfo, pBlock, rowIndex); doHandleDataInCurrentGroup(pInfo, pBlock, rowIndex);
} else { // handle the offset in the same group } else { // handle the offset in the same group
...@@ -1081,6 +1094,8 @@ static void doSlimitImpl(SOperatorInfo* pOperator, SSLimitOperatorInfo* pInfo, S ...@@ -1081,6 +1094,8 @@ static void doSlimitImpl(SOperatorInfo* pOperator, SSLimitOperatorInfo* pInfo, S
rowIndex += 1; rowIndex += 1;
} }
return BLOCK_SAME_GROUP;
} }
SSDataBlock* doSLimit(void* param, bool* newgroup) { SSDataBlock* doSLimit(void* param, bool* newgroup) {
...@@ -1092,6 +1107,14 @@ SSDataBlock* doSLimit(void* param, bool* newgroup) { ...@@ -1092,6 +1107,14 @@ SSDataBlock* doSLimit(void* param, bool* newgroup) {
SSLimitOperatorInfo *pInfo = pOperator->info; SSLimitOperatorInfo *pInfo = pOperator->info;
pInfo->pRes->info.rows = 0; pInfo->pRes->info.rows = 0;
if (pInfo->pPrevBlock != NULL) {
ensureOutputBuf(pInfo, pInfo->pRes, pInfo->pPrevBlock->info.rows);
int32_t ret = doSlimitImpl(pOperator, pInfo, pInfo->pPrevBlock);
assert(ret != BLOCK_NEW_GROUP);
pInfo->pPrevBlock = NULL;
}
assert(pInfo->currentGroupOffset >= 0); assert(pInfo->currentGroupOffset >= 0);
while(1) { while(1) {
...@@ -1104,7 +1127,12 @@ SSDataBlock* doSLimit(void* param, bool* newgroup) { ...@@ -1104,7 +1127,12 @@ SSDataBlock* doSLimit(void* param, bool* newgroup) {
} }
ensureOutputBuf(pInfo, pInfo->pRes, pBlock->info.rows); ensureOutputBuf(pInfo, pInfo->pRes, pBlock->info.rows);
doSlimitImpl(pOperator, pInfo, pBlock); int32_t ret = doSlimitImpl(pOperator, pInfo, pBlock);
if (ret == BLOCK_NEW_GROUP) {
pInfo->pPrevBlock = pBlock;
return pInfo->pRes;
}
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
} }
......
...@@ -925,7 +925,6 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -925,7 +925,6 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
pQueryInfo = pCmd->active; pQueryInfo = pCmd->active;
pQueryInfo->pUdfInfo = pUdfInfo; pQueryInfo->pUdfInfo = pUdfInfo;
pQueryInfo->udfCopy = true; pQueryInfo->udfCopy = true;
} }
} }
...@@ -8696,6 +8695,7 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS ...@@ -8696,6 +8695,7 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS
if (taosArrayGetSize(subInfo->pSubquery) >= 2) { if (taosArrayGetSize(subInfo->pSubquery) >= 2) {
return invalidOperationMsg(msgBuf, "not support union in subquery"); return invalidOperationMsg(msgBuf, "not support union in subquery");
} }
SQueryInfo* pSub = calloc(1, sizeof(SQueryInfo)); SQueryInfo* pSub = calloc(1, sizeof(SQueryInfo));
tscInitQueryInfo(pSub); tscInitQueryInfo(pSub);
...@@ -8713,12 +8713,12 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS ...@@ -8713,12 +8713,12 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS
return code; return code;
} }
// create dummy table meta info // create dummy table meta info
STableMetaInfo* pTableMetaInfo1 = calloc(1, sizeof(STableMetaInfo)); STableMetaInfo* pTableMetaInfo1 = calloc(1, sizeof(STableMetaInfo));
if (pTableMetaInfo1 == NULL) { if (pTableMetaInfo1 == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
pTableMetaInfo1->pTableMeta = extractTempTableMetaFromSubquery(pSub); pTableMetaInfo1->pTableMeta = extractTempTableMetaFromSubquery(pSub);
pTableMetaInfo1->tableMetaCapacity = tscGetTableMetaSize(pTableMetaInfo1->pTableMeta); pTableMetaInfo1->tableMetaCapacity = tscGetTableMetaSize(pTableMetaInfo1->pTableMeta);
...@@ -8802,7 +8802,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -8802,7 +8802,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
// check if there is 3 level select // check if there is 3 level select
SRelElementPair* subInfo = taosArrayGet(pSqlNode->from->list, i); SRelElementPair* subInfo = taosArrayGet(pSqlNode->from->list, i);
SSqlNode* p = taosArrayGetP(subInfo->pSubquery, 0); SSqlNode* p = taosArrayGetP(subInfo->pSubquery, 0);
if (p->from->type == SQL_NODE_FROM_SUBQUERY){ if (p->from->type == SQL_NODE_FROM_SUBQUERY) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg9); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg9);
} }
...@@ -8895,6 +8895,15 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -8895,6 +8895,15 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
} }
} }
// disable group result mixed up if interval/session window query exists.
if (isTimeWindowQuery(pQueryInfo)) {
size_t num = taosArrayGetSize(pQueryInfo->pUpstream);
for(int32_t i = 0; i < num; ++i) {
SQueryInfo* pUp = taosArrayGetP(pQueryInfo->pUpstream, i);
pUp->multigroupResult = false;
}
}
// parse the having clause in the first place // parse the having clause in the first place
int32_t joinQuery = (pSqlNode->from != NULL && taosArrayGetSize(pSqlNode->from->list) > 1); int32_t joinQuery = (pSqlNode->from != NULL && taosArrayGetSize(pSqlNode->from->list) > 1);
if (validateHavingClause(pQueryInfo, pSqlNode->pHaving, pCmd, pSqlNode->pSelNodeList, joinQuery, timeWindowQuery) != if (validateHavingClause(pQueryInfo, pSqlNode->pHaving, pCmd, pSqlNode->pSelNodeList, joinQuery, timeWindowQuery) !=
......
...@@ -3128,6 +3128,7 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) { ...@@ -3128,6 +3128,7 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) {
pQueryInfo->slimit.offset = 0; pQueryInfo->slimit.offset = 0;
pQueryInfo->pUpstream = taosArrayInit(4, POINTER_BYTES); pQueryInfo->pUpstream = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->window = TSWINDOW_INITIALIZER; pQueryInfo->window = TSWINDOW_INITIALIZER;
pQueryInfo->multigroupResult = true;
} }
int32_t tscAddQueryInfo(SSqlCmd* pCmd) { int32_t tscAddQueryInfo(SSqlCmd* pCmd) {
...@@ -3139,7 +3140,6 @@ int32_t tscAddQueryInfo(SSqlCmd* pCmd) { ...@@ -3139,7 +3140,6 @@ int32_t tscAddQueryInfo(SSqlCmd* pCmd) {
} }
tscInitQueryInfo(pQueryInfo); tscInitQueryInfo(pQueryInfo);
pQueryInfo->msg = pCmd->payload; // pointer to the parent error message buffer pQueryInfo->msg = pCmd->payload; // pointer to the parent error message buffer
if (pCmd->pQueryInfo == NULL) { if (pCmd->pQueryInfo == NULL) {
...@@ -3222,6 +3222,7 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) { ...@@ -3222,6 +3222,7 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) {
pQueryInfo->window = pSrc->window; pQueryInfo->window = pSrc->window;
pQueryInfo->sessionWindow = pSrc->sessionWindow; pQueryInfo->sessionWindow = pSrc->sessionWindow;
pQueryInfo->pTableMetaInfo = NULL; pQueryInfo->pTableMetaInfo = NULL;
pQueryInfo->multigroupResult = pSrc->multigroupResult;
pQueryInfo->bufLen = pSrc->bufLen; pQueryInfo->bufLen = pSrc->bufLen;
pQueryInfo->orderProjectQuery = pSrc->orderProjectQuery; pQueryInfo->orderProjectQuery = pSrc->orderProjectQuery;
...@@ -3623,6 +3624,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t ...@@ -3623,6 +3624,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pNewQueryInfo->pTableMetaInfo = NULL; pNewQueryInfo->pTableMetaInfo = NULL;
pNewQueryInfo->bufLen = pQueryInfo->bufLen; pNewQueryInfo->bufLen = pQueryInfo->bufLen;
pNewQueryInfo->distinct = pQueryInfo->distinct; pNewQueryInfo->distinct = pQueryInfo->distinct;
pNewQueryInfo->multigroupResult = pQueryInfo->multigroupResult;
pNewQueryInfo->buf = malloc(pQueryInfo->bufLen); pNewQueryInfo->buf = malloc(pQueryInfo->bufLen);
if (pNewQueryInfo->buf == NULL) { if (pNewQueryInfo->buf == NULL) {
...@@ -4736,6 +4738,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt ...@@ -4736,6 +4738,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
pQueryAttr->distinct = pQueryInfo->distinct; pQueryAttr->distinct = pQueryInfo->distinct;
pQueryAttr->sw = pQueryInfo->sessionWindow; pQueryAttr->sw = pQueryInfo->sessionWindow;
pQueryAttr->stateWindow = pQueryInfo->stateWindow; pQueryAttr->stateWindow = pQueryInfo->stateWindow;
pQueryAttr->multigroupResult = pQueryInfo->multigroupResult;
pQueryAttr->numOfCols = numOfCols; pQueryAttr->numOfCols = numOfCols;
pQueryAttr->numOfOutput = numOfOutput; pQueryAttr->numOfOutput = numOfOutput;
......
...@@ -223,6 +223,7 @@ typedef struct SQueryAttr { ...@@ -223,6 +223,7 @@ typedef struct SQueryAttr {
bool distinct; // distinct query or not bool distinct; // distinct query or not
bool stateWindow; // window State on sub/normal table bool stateWindow; // window State on sub/normal table
bool createFilterOperator; // if filter operator is needed bool createFilterOperator; // if filter operator is needed
bool multigroupResult; // multigroup result can exist in one SSDataBlock
int32_t interBufSize; // intermediate buffer sizse int32_t interBufSize; // intermediate buffer sizse
int32_t havingNum; // having expr number int32_t havingNum; // having expr number
...@@ -469,19 +470,21 @@ typedef struct SLimitOperatorInfo { ...@@ -469,19 +470,21 @@ typedef struct SLimitOperatorInfo {
} SLimitOperatorInfo; } SLimitOperatorInfo;
typedef struct SSLimitOperatorInfo { typedef struct SSLimitOperatorInfo {
int64_t groupTotal; int64_t groupTotal;
int64_t currentGroupOffset; int64_t currentGroupOffset;
int64_t rowsTotal; int64_t rowsTotal;
int64_t currentOffset; int64_t currentOffset;
SLimitVal limit; SLimitVal limit;
SLimitVal slimit; SLimitVal slimit;
char **prevRow; char **prevRow;
SArray *orderColumnList; SArray *orderColumnList;
bool hasPrev; bool hasPrev;
bool ignoreCurrentGroup; bool ignoreCurrentGroup;
bool multigroupResult;
SSDataBlock *pRes; // result buffer SSDataBlock *pRes; // result buffer
SSDataBlock *pPrevBlock;
int64_t capacity; int64_t capacity;
int64_t threshold; int64_t threshold;
} SSLimitOperatorInfo; } SSLimitOperatorInfo;
...@@ -497,6 +500,7 @@ typedef struct SFillOperatorInfo { ...@@ -497,6 +500,7 @@ typedef struct SFillOperatorInfo {
int64_t totalInputRows; int64_t totalInputRows;
void **p; void **p;
SSDataBlock *existNewGroupBlock; SSDataBlock *existNewGroupBlock;
bool multigroupResult;
} SFillOperatorInfo; } SFillOperatorInfo;
typedef struct SGroupbyOperatorInfo { typedef struct SGroupbyOperatorInfo {
...@@ -582,7 +586,7 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI ...@@ -582,7 +586,7 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult);
SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
...@@ -594,7 +598,7 @@ SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SEx ...@@ -594,7 +598,7 @@ SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SEx
int32_t numOfRows, void* merger); int32_t numOfRows, void* merger);
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp); SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp);
SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger); SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger, bool multigroupResult);
SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter); int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter);
......
...@@ -165,6 +165,7 @@ typedef struct SQueryInfo { ...@@ -165,6 +165,7 @@ typedef struct SQueryInfo {
bool orderProjectQuery; bool orderProjectQuery;
bool stateWindow; bool stateWindow;
bool globalMerge; bool globalMerge;
bool multigroupResult;
} SQueryInfo; } SQueryInfo;
/** /**
......
...@@ -2247,7 +2247,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2247,7 +2247,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_Fill: { case OP_Fill: {
SOperatorInfo* pInfo = pRuntimeEnv->proot; SOperatorInfo* pInfo = pRuntimeEnv->proot;
pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput); pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput, pQueryAttr->multigroupResult);
break; break;
} }
...@@ -2257,16 +2257,20 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2257,16 +2257,20 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
} }
case OP_GlobalAggregate: { // If fill operator exists, the result rows of different group can not be in the same SSDataBlock. case OP_GlobalAggregate: { // If fill operator exists, the result rows of different group can not be in the same SSDataBlock.
bool groupResultMixedUp = (pQueryAttr->fillType == TSDB_FILL_NONE); bool multigroupResult = pQueryAttr->multigroupResult;
if (pQueryAttr->multigroupResult) {
multigroupResult = (pQueryAttr->fillType == TSDB_FILL_NONE);
}
pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo, groupResultMixedUp); pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo, multigroupResult);
break; break;
} }
case OP_SLimit: { case OP_SLimit: {
int32_t num = pRuntimeEnv->proot->numOfOutput; int32_t num = pRuntimeEnv->proot->numOfOutput;
SExprInfo* pExpr = pRuntimeEnv->proot->pExpr; SExprInfo* pExpr = pRuntimeEnv->proot->pExpr;
pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pExpr, num, merger); pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pExpr, num, merger, pQueryAttr->multigroupResult);
break; break;
} }
...@@ -6345,7 +6349,7 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SQueryRunt ...@@ -6345,7 +6349,7 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SQueryRunt
if (taosFillHasMoreResults(pInfo->pFillInfo)) { if (taosFillHasMoreResults(pInfo->pFillInfo)) {
*newgroup = false; *newgroup = false;
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity, pInfo->p); doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity, pInfo->p);
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) { if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult)) {
return; return;
} }
} }
...@@ -6377,7 +6381,7 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { ...@@ -6377,7 +6381,7 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup); doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup);
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) { if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) {
return pInfo->pRes; return pInfo->pRes;
} }
// if (taosFillHasMoreResults(pInfo->pFillInfo)) { // if (taosFillHasMoreResults(pInfo->pFillInfo)) {
...@@ -6414,8 +6418,8 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { ...@@ -6414,8 +6418,8 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
pInfo->existNewGroupBlock = pBlock; pInfo->existNewGroupBlock = pBlock;
*newgroup = false; *newgroup = false;
// fill the previous group data block // Fill the previous group data block, before handle the data block of new group.
// before handle a new data block, close the fill operation for previous group data block // Close the fill operation for previous group data block
taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey); taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey);
} else { } else {
if (pBlock == NULL) { if (pBlock == NULL) {
...@@ -6436,8 +6440,9 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { ...@@ -6436,8 +6440,9 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
// current group has no more result to return // current group has no more result to return
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
// the result in current group not reach the threshold of output result, continue // 1. The result in current group not reach the threshold of output result, continue
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL) { // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL || (!pInfo->multigroupResult)) {
return pInfo->pRes; return pInfo->pRes;
} }
...@@ -6932,10 +6937,10 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato ...@@ -6932,10 +6937,10 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
return pOperator; return pOperator;
} }
SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult) {
int32_t numOfOutput) {
SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo)); SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo));
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
pInfo->multigroupResult = multigroupResult;
{ {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
...@@ -6971,7 +6976,7 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn ...@@ -6971,7 +6976,7 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
return pOperator; return pOperator;
} }
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger) { SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) {
SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo)); SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo));
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
...@@ -6982,7 +6987,8 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator ...@@ -6982,7 +6987,8 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
pInfo->capacity = pRuntimeEnv->resultInfo.capacity; pInfo->capacity = pRuntimeEnv->resultInfo.capacity;
pInfo->threshold = pInfo->capacity * 0.8; pInfo->threshold = pInfo->capacity * 0.8;
pInfo->currentOffset = pQueryAttr->limit.offset; pInfo->currentOffset = pQueryAttr->limit.offset;
pInfo->currentGroupOffset = pQueryAttr->slimit.offset; pInfo->currentGroupOffset = pQueryAttr->slimit.offset;
pInfo->multigroupResult= multigroupResult;
// TODO refactor // TODO refactor
int32_t len = 0; int32_t len = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册