提交 a0c21661 编写于 作者: H Haojun Liao

[td-225]fix bugs.

上级 cbe712b4
......@@ -1134,7 +1134,8 @@ static void savePrevOrderColumns(char** prevRow, SArray* pColumnList, SSDataBloc
(*hasPrev) = true;
static void doExecuteFinalMergeRv(SMultiwayMergeInfo* pInfo, int32_t numOfExpr, SSDataBlock* pBlock, bool needInit) {
static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock* pBlock) {
SMultiwayMergeInfo* pInfo = pOperator->info;
SQLFunctionCtx* pCtx = pInfo->binfo.pCtx;
char** add = calloc(pBlock->info.numOfCols, POINTER_BYTES);
......@@ -1165,7 +1166,8 @@ static void doExecuteFinalMergeRv(SMultiwayMergeInfo* pInfo, int32_t numOfExpr,
pInfo->binfo.pRes->info.rows += 1;
int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput);
pInfo->binfo.pRes->info.rows += numOfRows;
if (i == 0) {
for(int32_t j = 0; j < numOfExpr; ++j) {
......@@ -1935,55 +1937,72 @@ SSDataBlock* doMultiwaySort(void* param) {
return (pInfo->binfo.pRes->info.rows > 0)? pInfo->binfo.pRes:NULL;
static bool isSameGroupRv(SArray* orderColumnList, SSDataBlock* pBlock, char** dataCols) {
int32_t numOfCols = (int32_t) taosArrayGetSize(orderColumnList);
for (int32_t i = 0; i < numOfCols; ++i) {
SColIndex *pIndex = taosArrayGet(orderColumnList, i);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, pIndex->colIndex);
assert(pIndex->colId == pColInfo->info.colId);
char *data = dataCols[i];
int32_t ret = columnValueAscendingComparator(data, pColInfo->pData, pColInfo->info.type, pColInfo->info.bytes);
if (ret == 0) {
} else {
return false;
return true;
SSDataBlock* doGlobalAggregate(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
SMultiwayMergeInfo* pAggInfo = pOperator->info;
// SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
SMultiwayMergeInfo *pAggInfo = pOperator->info;
SOperatorInfo *upstream = pOperator->upstream;
bool handleData = false;
pAggInfo->binfo.pRes->info.rows = 0;
if (pAggInfo->hasDataBlockForNewGroup) {
pAggInfo->binfo.pRes->info.rows = 0;
pAggInfo->hasPrev = false; // now we start from a new group data set.
// not belongs to the same group, return the result of current group;
setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pAggInfo->pExistBlock, TSDB_ORDER_ASC);
updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pAggInfo->pExistBlock->info.rows);
doExecuteFinalMergeRv(pAggInfo, pOperator->numOfOutput, pAggInfo->pExistBlock, false);
{ // reset output buffer
for(int32_t j = 0; j < pOperator->numOfOutput; ++j) {
doExecuteFinalMergeRv(pOperator, pOperator->numOfOutput, pAggInfo->pExistBlock);
savePrevOrderColumns(pAggInfo->groupPrevRow, pAggInfo->groupColumnList, pAggInfo->pExistBlock, 0,
savePrevOrderColumns(pAggInfo->currentGroupColData, pAggInfo->groupColumnList, pAggInfo->pExistBlock, 0,
pAggInfo->pExistBlock = NULL;
pAggInfo->hasDataBlockForNewGroup = false;
handleData = true;
SSDataBlock* pBlock = NULL;
while(1) {
SSDataBlock* pBlock = upstream->exec(upstream);
pBlock = upstream->exec(upstream);
if (pBlock == NULL) {
if (pAggInfo->hasPrev) {
bool sameGroup = true;
int32_t numOfCols = (int32_t) taosArrayGetSize(pAggInfo->groupColumnList);
for (int32_t i = 0; i < numOfCols; ++i) {
SColIndex *pIndex = taosArrayGet(pAggInfo->groupColumnList, i);
SColumnInfoData *pColInfo = taosArrayGet(pAggInfo->binfo.pRes->pDataBlock, pIndex->colIndex);
char *data = pAggInfo->groupPrevRow[i];
int32_t ret = columnValueAscendingComparator(data, pColInfo->pData, pColInfo->info.type, pColInfo->info.bytes);
if (ret == 0) {
} else {
sameGroup = false;
if (pAggInfo->hasGroupColData) {
bool sameGroup = isSameGroupRv(pAggInfo->groupColumnList, pBlock, pAggInfo->currentGroupColData);
if (!sameGroup) {
pAggInfo->hasDataBlockForNewGroup = true;
pAggInfo->pExistBlock = pBlock;
......@@ -1992,13 +2011,18 @@ SSDataBlock* doGlobalAggregate(void* param) {
// not belongs to the same group, return the result of current group;
// not belongs to the same group, return the result of current group
setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC);
// handle the output buffer problem
updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows);
doExecuteFinalMergeRv(pAggInfo, pOperator->numOfOutput, pBlock, false);
doExecuteFinalMergeRv(pOperator, pOperator->numOfOutput, pBlock);
savePrevOrderColumns(pAggInfo->currentGroupColData, pAggInfo->groupColumnList, pBlock, 0, &pAggInfo->hasGroupColData);
handleData = true;
if (handleData) { // data in current group is all handled
for(int32_t j = 0; j < pOperator->numOfOutput; ++j) {
int32_t functionId = pAggInfo->binfo.pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
......@@ -2008,12 +2032,11 @@ SSDataBlock* doGlobalAggregate(void* param) {
pAggInfo->binfo.pRes->info.rows += 1;
// pOperator->status = OP_EXEC_DONE;
// setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pAggInfo->binfo.pCtx, pOperator->numOfOutput);
pAggInfo->binfo.pRes->info.rows += numOfRows;
return pAggInfo->binfo.pRes;
return (pAggInfo->binfo.pRes->info.rows != 0)? pAggInfo->binfo.pRes:NULL;
SSDataBlock* doSLimit(void* param) {
......@@ -2033,6 +2056,16 @@ SSDataBlock* doSLimit(void* param) {
return NULL;
if (!pInfo->hasPrev) {
savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, 0, &pInfo->hasPrev);
} else {
bool sameGroup = isSameGroupRv(pInfo->orderColumnList, pBlock, pInfo->prevRow);
if (!sameGroup) { // reset info for new group data
pInfo->currentOffset = pInfo->limit.offset; // reset the offset value for a new group
pInfo->rowsTotal = 0;
if (pInfo->currentGroupOffset == 0) {
if (pInfo->currentOffset == 0) { // TODO refactor
......@@ -2056,28 +2089,14 @@ SSDataBlock* doSLimit(void* param) {
} else {
if (pInfo->hasPrev) {
// Check if current data block belongs to current result group or not
// if (needToMergeRv(pBlock, pInfo->pMerger, 0, pInfo->prevRow)) {
bool sameGroup = true;
int32_t numOfCols = (int32_t) taosArrayGetSize(pInfo->orderColumnList);
for (int32_t i = 0; i < numOfCols; ++i) {
SColIndex *pIndex = taosArrayGet(pInfo->orderColumnList, i);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, pIndex->colIndex);
char *data = pInfo->prevRow[i];
int32_t ret = columnValueAscendingComparator(data, pColInfo->pData, pColInfo->info.type, pColInfo->info.bytes);
if (ret == 0) {
} else {
sameGroup = false;
bool sameGroup = isSameGroupRv(pInfo->orderColumnList, pBlock, pInfo->prevRow);
if (sameGroup) {
continue; // ignore the data block of the same group and try next
} else {
//update the group column data by using the current group.
savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, 0, &pInfo->hasPrev);
pInfo->currentOffset = pInfo->limit.offset; // set the offset value for a new group
pInfo->currentOffset = pInfo->limit.offset; // reset the offset value for a new group
pInfo->rowsTotal = 0;
if ((--pInfo->currentGroupOffset) == 0) {
......@@ -2108,7 +2127,7 @@ SSDataBlock* doSLimit(void* param) {
if (!pInfo->hasPrev || !needToMergeRv(pBlock, pInfo->pMerger, 0, pInfo->prevRow)) {
if (!pInfo->hasPrev || !isSameGroupRv(pInfo->orderColumnList, pBlock, pInfo->prevRow)) {
pInfo->groupTotal += 1;
if (pInfo->groupTotal > pInfo->slimit.limit) { // reach the group limit, abort
return NULL;
......@@ -464,7 +464,8 @@ typedef struct SMultiwayMergeInfo {
char **prevRow;
SArray *orderColumnList;
char **groupPrevRow;
bool hasGroupColData;
char **currentGroupColData;
SArray *groupColumnList;
bool hasDataBlockForNewGroup;
SSDataBlock *pExistBlock;
......@@ -2902,7 +2902,7 @@ int32_t initResultRow(SResultRow *pResultRow) {
* +------------+-------------------------------------------+-------------------------------------------+
* offset[0] offset[1]
void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, int64_t uid) {
void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, int64_t uid, int32_t stage) {
SQLFunctionCtx* pCtx = pInfo->pCtx;
SSDataBlock* pDataBlock = pInfo->pRes;
int32_t* rowCellInfoOffset = pInfo->rowCellInfoOffset;
......@@ -2923,6 +2923,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i
pCtx[i].resultInfo = pCellInfo;
pCtx[i].pOutput = pData->pData;
pCtx[i].currentStage = stage;
assert(pCtx[i].pOutput != NULL);
// set the timestamp output buffer for top/bottom/diff query
......@@ -4315,6 +4316,8 @@ SArray* getOrderCheckColumns(SQueryAttr* pQuery) {
SArray* pOrderColumns = NULL;
if (numOfCols > 0) {
pOrderColumns = taosArrayDup(pQuery->pGroupbyExpr->columnInfo);
} else {
pOrderColumns = taosArrayInit(4, sizeof(SColIndex));
if (pQuery->interval.interval > 0) {
......@@ -4347,6 +4350,8 @@ SArray* getResultGroupCheckColumns(SQueryAttr* pQuery) {
SArray* pOrderColumns = NULL;
if (numOfCols > 0) {
pOrderColumns = taosArrayDup(pQuery->pGroupbyExpr->columnInfo);
} else {
pOrderColumns = taosArrayInit(4, sizeof(SColIndex));
for(int32_t i = 0; i < numOfCols; ++i) {
......@@ -4369,6 +4374,8 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
// int32_t numOfRows =
// (int32_t)(GET_ROW_PARAM_FOR_MULTIOUTPUT(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
pRuntimeEnv->scanFlag = MERGE_STAGE; // TODO init when creating pCtx
pInfo->pMerge = param;
pInfo->bufCapacity = 4096;
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity);
......@@ -4395,11 +4402,11 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
numOfCols = (pInfo->groupColumnList != NULL)? taosArrayGetSize(pInfo->groupColumnList):0;
pInfo->groupPrevRow = calloc(1, (POINTER_BYTES * numOfCols + len));
pInfo->currentGroupColData = calloc(1, (POINTER_BYTES * numOfCols + len));
offset = POINTER_BYTES * numOfOutput;
for(int32_t i = 0; i < numOfCols; ++i) {
pInfo->groupPrevRow[i] = (char*)pInfo->groupPrevRow + offset;
pInfo->currentGroupColData[i] = (char*)pInfo->currentGroupColData + offset;
SColIndex* index = taosArrayGet(pInfo->groupColumnList, i);
offset += pExpr[index->colIndex].base.resBytes;
......@@ -4408,7 +4415,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
pInfo->seed = rand();
setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed);
setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MERGE_STAGE);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "GlobalAggregate";
......@@ -4971,7 +4978,7 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
pInfo->seed = rand();
setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed);
setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MASTER_SCAN);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableAggregate";
......@@ -5063,7 +5070,7 @@ SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset);
initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed);
setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed, MASTER_SCAN);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "ArithmeticOperator";
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册